# Install required library
%pip install pysunspec2 -q
print("✓ Installation complete")Note: you may need to restart the kernel to use updated packages.
✓ Installation complete
SunSpec Modbus Integration for C-Battery Systems
October 20, 2024
This notebook provides a comprehensive guide to integrating with C-Battery energy storage systems using the SunSpec Modbus protocol.
What you’ll learn: - Connect to C-Battery via Modbus TCP - Read battery status and measurements - Control power setpoints - Monitor in real-time - Handle multiple batteries - Implement production-ready code
Network Access Required: This notebook requires network connectivity to a C-Battery device running the Modbus TCP server. Update the CBAT_IP configuration to match your installation.
This notebook uses pysunspec2, a Python library for SunSpec-compliant devices.
10.31.0.25)import sunspec2.modbus.client as sunspec_client
import time
from datetime import datetime
print("✓ Libraries imported successfully")✓ Libraries imported successfully
# Configuration - Update these values for your setup
CBAT_IP = '10.31.0.25' # C-Battery IP address
CBAT_PORT = 502 # Modbus TCP port (default: 502)
DEVICE_ID = 1 # Device ID (1 for main battery, 101-199 for individual modules)
print(f"Configuration:")
print(f" IP Address: {CBAT_IP}")
print(f" Port: {CBAT_PORT}")
print(f" Device ID: {DEVICE_ID}")Configuration:
IP Address: 10.31.0.25
Port: 502
Device ID: 1
The sunspec2 library handles the Modbus TCP connection and automatically discovers available SunSpec models.
CBAT_IP in the configuration cell above to match your C-Battery’s IPIf the connection fails, you’ll see detailed troubleshooting steps in the output.
# Create device connection with error handling
try:
device = sunspec_client.SunSpecModbusClientDeviceTCP(
slave_id=DEVICE_ID,
ipaddr=CBAT_IP,
ipport=CBAT_PORT
)
# Scan for available models
print("Scanning for SunSpec models...")
device.scan()
# Display discovered models
print(f"\n✓ Connected to C-Battery at {CBAT_IP}:{CBAT_PORT}")
print(f"\nAvailable models: {list(device.models.keys())}")
# Model mapping
model_names = {
1: 'Common (Device Information)',
701: 'DERMeasureAC (AC Measurements)',
702: 'DERCapacity (Capacity Ratings)',
704: 'DERCtlAC (Power Control)',
713: 'DERStorageCapacity (Battery Status)',
714: 'DERMeasureDC (DC Measurements)',
715: 'DERCtl (Advanced Control)',
802: 'Battery (Battery Details)'
}
print("\nModel Details:")
for model_id in device.models.keys():
if isinstance(model_id, int) and model_id in model_names:
print(f" {model_id}: {model_names[model_id]}")
print("\n✓ Successfully connected! You can now run the cells below.")
except Exception as e:
print(f"❌ Connection failed: {e}")
print("\n" + "=" * 70)
print("TROUBLESHOOTING STEPS:")
print("=" * 70)
print("1. Verify the IP address is correct for your C-Battery installation")
print(f" Current IP: {CBAT_IP}")
print("\n2. Check network connectivity:")
print(f" Run in terminal: ping {CBAT_IP}")
print("\n3. Ensure you're on the same network as the C-Battery device")
print("\n4. Verify the Modbus server is running on the C-Battery")
print(f" Expected port: {CBAT_PORT}")
print("\n5. Check firewall settings allow Modbus TCP traffic (port 502)")
print("=" * 70)
print("\nNote: If you're not physically connected to a C-Battery device,")
print("this connection will fail. This notebook is designed to run on")
print("a system that has network access to the C-Battery hardware.")
print("=" * 70)
# Set device to None so subsequent cells can check
device = NoneScanning for SunSpec models...
✓ Connected to C-Battery at 10.31.0.25:502
Available models: [1, 'common', 701, 'DERMeasureAC', 702, 'DERCapacity', 704, 'DERCtlAC', 713, 'DERStorageCapacity', 714, 'DERMeasureDC', 715, 'DERCtl', 802, 'battery']
Model Details:
1: Common (Device Information)
701: DERMeasureAC (AC Measurements)
702: DERCapacity (Capacity Ratings)
704: DERCtlAC (Power Control)
713: DERStorageCapacity (Battery Status)
714: DERMeasureDC (DC Measurements)
715: DERCtl (Advanced Control)
802: Battery (Battery Details)
✓ Successfully connected! You can now run the cells below.
# Check connection status
if device is not None:
print("✅ CONNECTION SUCCESSFUL!")
print("\nYou can now proceed with the cells below to:")
print(" • Read device information")
print(" • Monitor battery status")
print(" • Read measurements")
print(" • Control power setpoints")
print("\nTip: Run cells sequentially for the best experience.")
else:
print("⚠️ CONNECTION NOT ESTABLISHED")
print("\nThe following cells will not work until you successfully connect.")
print("\nTo connect:")
print("1. Update CBAT_IP in the configuration cell above")
print("2. Ensure network connectivity to the C-Battery device")
print("3. Re-run the connection cell")
print("\nAlternatively, you can review the code structure and examples")
print("in the cells below to understand how the integration works.")✅ CONNECTION SUCCESSFUL!
You can now proceed with the cells below to:
• Read device information
• Monitor battery status
• Read measurements
• Control power setpoints
Tip: Run cells sequentially for the best experience.
If you want to see what attributes are available in each model, use the helper function below:
def print_model_points(model, title="MODEL DATA", show_all=False):
"""
Helper function to print all points in a model using library metadata.
Args:
model: SunSpec model instance
title: Title to display
show_all: If True, show all points including implementation details (ID, L, etc.)
"""
print("=" * 60)
print(title)
print("=" * 60)
# Iterate through all points in the model
for point_name in model.points:
# Skip implementation points unless show_all is True
if not show_all and point_name in ['ID', 'L', 'Pad']:
continue
point = model.points[point_name]
try:
# Get the value - prefer cvalue (scaled) over value (raw)
value = None
if hasattr(point, 'cvalue') and point.cvalue is not None:
value = point.cvalue
elif hasattr(point, 'value') and point.value is not None:
value = point.value
if value is not None:
# Get label and units from point definition
label = point.pdef.get('label', point_name) if hasattr(point, 'pdef') else point_name
units = point.pdef.get('units', '') if hasattr(point, 'pdef') else ''
# Format output
display_units = f" {units}" if units else ""
print(f"{label:<30} {value}{display_units}")
except Exception as e:
# Debug: show what went wrong
print(f"{point_name:<30} (error: {e})")
print("=" * 60)
def explore_model(model_id, show_all=False):
"""
Explore available attributes in a SunSpec model.
Args:
model_id (int): Model ID to explore (e.g., 713, 714, 701)
show_all (bool): Show all points including implementation details
"""
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
return
if model_id not in device.models:
print(f"❌ Model {model_id} not available")
print(f"Available models: {[m for m in device.models.keys() if isinstance(m, int)]}")
return
model = device.models[model_id][0]
model.read()
print_model_points(model, f"MODEL {model_id} - ALL AVAILABLE POINTS", show_all=show_all)
print("✓ Helper functions defined")
print("\nExample usage:")
print(" explore_model(713) # Explore battery status model")
print(" explore_model(714, show_all=True) # Show all points including ID, L, etc.")✓ Helper functions defined
Example usage:
explore_model(713) # Explore battery status model
explore_model(714, show_all=True) # Show all points including ID, L, etc.
# Example: Explore Model 713 attributes (uncomment to use)
explore_model(713, show_all=True)
# print("Uncomment the line above to explore available model attributes")============================================================
MODEL 713 - ALL AVAILABLE POINTS
============================================================
Model ID 713
Model Length 7
Energy Rating 67500.0 WH
Energy Available 1400.0 WH
State of Charge 21.0 Pct
State of Health 10.0 Pct
Status 0
Energy Scale Factor 2
Percent Scale Factor -1
============================================================
Model 1 provides basic device information like manufacturer, model, serial number, and firmware version.
# Check connection before proceeding
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
else:
# Access Model 1 (Common) and use helper function
common = device.models[1][0]
common.read()
print_model_points(common, "MODEL 1 - DEVICE INFORMATION")============================================================
MODEL 1 - DEVICE INFORMATION
============================================================
Manufacturer C-Battery
Model CBII75
Version 2.3.0
Serial Number 10-016-00000-POC
Device Address 1
============================================================
Model 713 provides critical battery information: - SoC - State of Charge (percentage) - SoH - State of Health (percentage) - Status - Battery operational status - Capacity - Available energy in Wh
# Check connection before proceeding
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
else:
# Access Model 713 (Storage Capacity) and use helper function
storage = device.models[713][0]
storage.read()
print_model_points(storage, "MODEL 713 - BATTERY STATUS")============================================================
MODEL 713 - BATTERY STATUS
============================================================
Energy Rating 67500.0 WH
Energy Available 1400.0 WH
State of Charge 21.0 Pct
State of Health 10.0 Pct
Status 0
Energy Scale Factor 2
Percent Scale Factor -1
============================================================
Model 714 provides DC-side measurements: - DC Current - Current flow (A) - DC Power - Power (W) - Energy Injected - Total energy discharged (Wh) - Energy Absorbed - Total energy charged (Wh) - Per-port measurements - Individual port data
# Check connection before proceeding
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
else:
# Access Model 714 (DC Measurements) and use helper function
dc_meas = device.models[714][0]
dc_meas.read()
print_model_points(dc_meas, "MODEL 714 - DC MEASUREMENTS (AGGREGATE)")============================================================
MODEL 714 - DC MEASUREMENTS (AGGREGATE)
============================================================
Number Of Ports 1
DC Current 15.0 A
DC Power 11300.0 W
DC Current Scale Factor -1
DC Voltage Scale Factor -1
DC Power Scale Factor 2
============================================================
Model 701 provides AC-side measurements: - AC Current - Grid current (A) - AC Voltage - Grid voltage (V) - AC Power - Grid power (W) - AC Frequency - Grid frequency (Hz) - Power Factor - Grid power factor
# Check connection before proceeding
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
else:
# Access Model 701 (AC Measurements) and use helper function
ac_meas = device.models[701][0]
ac_meas.read()
print_model_points(ac_meas, "MODEL 701 - AC MEASUREMENTS")============================================================
MODEL 701 - AC MEASUREMENTS
============================================================
AC Wiring Type 2
Operating State 0
Inverter State 3
Grid Connection State 0
Alarm Bitfield 0
DER Operational Characteristics 0
Active Power 11700.0 W
Apparent Power 0.0 VA
Reactive Power 0.0 Var
Power Factor 1.0
Total AC Current 56.0 A
Voltage LL 415.0 V
Frequency 50.0 Hz
Amps L1 18.0 A
Phase Voltage L1-L2 413.0 V
Phase Voltage L1-N 0.0 V
Amps L2 19.0 A
Phase Voltage L2-L3 415.0 V
Phase Voltage L2-N 0.0 V
Amps L3 19.0 A
Phase Voltage L3-L1 416.0 V
Phase Voltage L3-N 0.0 V
Current Scale Factor -1
Voltage Scale Factor -1
Frequency Scale Factor -3
Active Power Scale Factor 2
Power Factor Scale Factor -3
Apparent Power Scale Factor 2
Reactive Power Scale Factor 2
============================================================
Model 802 provides battery capability information: - Maximum Charge Power - Max charging rate - Maximum Discharge Power - Max discharging rate - Rated Capacity - Total battery capacity
# Check connection before proceeding
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
else:
# Access Model 802 (Battery Details) and use helper function
battery_info = device.models[802][0]
battery_info.read()
print_model_points(battery_info, "MODEL 802 - BATTERY CAPABILITIES")============================================================
MODEL 802 - BATTERY CAPABILITIES
============================================================
Nameplate Max Charge Rate 34600.0 W
Nameplate Max Discharge Rate 20800.0 W
Max Charge Current 47.0 A
Max Discharge Current 29.0 A
WChaDisChaMax_SF 2
AMax_SF -1
============================================================
Model 704 allows you to control the battery: - Enable/Disable - Turn control on/off - Set Power - Command charge/discharge power - Control Modes - Various control strategies
# Check connection before proceeding
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
else:
# Access Model 704 (Power Control) and use helper function
control = device.models[704][0]
control.read()
print_model_points(control, "MODEL 704 - POWER CONTROL STATUS")============================================================
MODEL 704 - POWER CONTROL STATUS
============================================================
Set Active Power Enable 1
Set Active Power Mode 1
Active Power Setpoint (W) 11000.0 W
Active Power Scale Factor 2
============================================================
def set_battery_power(power_watts):
"""
Set battery power setpoint.
Args:
power_watts (float): Power in watts
Positive = Discharge (battery to grid)
Negative = Charge (grid to battery)
Example:
set_battery_power(5000) # Discharge 5kW
set_battery_power(-3000) # Charge 3kW
set_battery_power(0) # Standby
"""
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
return False
control = device.models[704][0]
# Enable control
control.WSetEna.value = 1
# Set mode to constant power (1)
control.WSetMod.value = 1
# Set power setpoint (library handles scale factors)
control.WSet.cvalue = power_watts
# Write to device
control.write()
print(f"✓ Power setpoint set to {power_watts} W")
if power_watts > 0:
print(f" → Discharging (battery to grid)")
elif power_watts < 0:
print(f" → Charging (grid to battery)")
else:
print(f" → Standby")
return True
print("✓ Power control function defined")
print("\nExample usage:")
print(" set_battery_power(5000) # Discharge 5kW")
print(" set_battery_power(-3000) # Charge 3kW")
print(" set_battery_power(0) # Standby")✓ Power control function defined
Example usage:
set_battery_power(5000) # Discharge 5kW
set_battery_power(-3000) # Charge 3kW
set_battery_power(0) # Standby
Monitor battery performance in real-time with continuous data polling.
def monitor_battery(duration=60, interval=5):
"""
Monitor battery in real-time.
Args:
duration (int): Total monitoring time in seconds
interval (int): Polling interval in seconds (recommended: 5-10s)
"""
if device is None:
print("❌ Not connected to C-Battery device")
print("Please run the connection cell in Section 3 first.")
return
storage = device.models[713][0]
dc_meas = device.models[714][0]
ac_meas = device.models[701][0]
print(f"Monitoring battery for {duration}s (polling every {interval}s)")
print("Press Ctrl+C to stop\n")
print(f"{'Time':<12} {'SoC (%)':<10} {'DC Power (W)':<15} {'AC Power (W)':<15}")
print("-" * 55)
start = time.time()
try:
while (time.time() - start) < duration:
timestamp = datetime.now().strftime("%H:%M:%S")
# Read all models
storage.read()
dc_meas.read()
ac_meas.read()
# Get values safely using points dictionary
soc = storage.points['SoC'].cvalue if 'SoC' in storage.points else 0
dc_power = dc_meas.points['DCW'].cvalue if 'DCW' in dc_meas.points else 0
ac_power = ac_meas.points['W'].cvalue if 'W' in ac_meas.points else 0
print(f"{timestamp:<12} {soc:<10.1f} {dc_power:<15.0f} {ac_power:<15.0f}")
time.sleep(interval)
except KeyboardInterrupt:
print("\n⚠ Stopped by user")
except Exception as e:
print(f"\n❌ Error: {e}")
print("\n✓ Monitoring complete")
print("✓ Monitoring function defined")
print("\nExample usage:")
print(" monitor_battery(duration=30, interval=5)")✓ Monitoring function defined
Example usage:
monitor_battery(duration=30, interval=5)
# Example: Monitor for 30 seconds (uncomment to use)
monitor_battery(duration=30, interval=1)
print("Uncomment the line above to start monitoring")Monitoring battery for 30s (polling every 1s)
Press Ctrl+C to stop
Time SoC (%) DC Power (W) AC Power (W)
-------------------------------------------------------
14:07:40 21.0 11300 11700
14:07:42 21.0 11300 11700
14:07:43 21.0 11300 11700
14:07:45 21.0 11300 11700
14:07:46 21.0 11300 11700
14:07:48 21.0 11300 11700
14:07:50 21.0 11300 11700
14:07:52 21.0 11300 11700
14:07:53 21.0 11300 11700
14:07:55 21.0 11300 11700
14:07:57 21.0 11300 11700
14:07:59 21.0 11300 11700
14:08:01 21.0 11300 11700
14:08:03 21.0 11300 11700
14:08:04 21.0 11300 11700
14:08:06 21.0 11300 11700
14:08:08 21.0 11300 11700
14:08:09 21.0 11300 11700
✓ Monitoring complete
Uncomment the line above to start monitoring
A complete class for production use with error handling, reconnection logic, and clean API.
class CBatteryClient:
"""
Production-ready C-Battery client with error handling.
Features:
- Automatic reconnection
- Error handling
- Clean API
- Uses library metadata (no hardcoded field names)
- Comprehensive logging
"""
def __init__(self, ip, port=502, device_id=1, auto_connect=True):
"""
Initialize C-Battery client.
Args:
ip (str): C-Battery IP address
port (int): Modbus TCP port
device_id (int): Device ID (1 for main, 101-199 for modules)
auto_connect (bool): Connect automatically
"""
self.ip = ip
self.port = port
self.device_id = device_id
self.device = None
self.connected = False
if auto_connect:
self.connect()
def connect(self):
"""Connect to C-Battery and scan models."""
try:
self.device = sunspec_client.SunSpecModbusClientDeviceTCP(
slave_id=self.device_id,
ipaddr=self.ip,
ipport=self.port
)
self.device.scan()
self.connected = True
print(f"✓ Connected to {self.ip}:{self.port} (Device ID: {self.device_id})")
print(f" Available models: {list(self.device.models.keys())}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
self.connected = False
return False
def _ensure_connected(self):
"""Ensure connection is active, reconnect if needed."""
if not self.connected:
print("⚠ Not connected, attempting to reconnect...")
return self.connect()
return True
def _get_model_data(self, model_id):
"""
Get all data from a model as a dictionary using library metadata.
Args:
model_id (int): SunSpec model ID
Returns:
dict: Dictionary of point_name: value pairs
"""
if not self._ensure_connected():
return None
if model_id not in self.device.models:
return None
try:
model = self.device.models[model_id][0]
model.read()
data = {}
for point_name, point in model.points.items():
# Skip implementation details
if point_name in ['ID', 'L', 'Pad']:
continue
try:
value = point.cvalue if hasattr(point, 'cvalue') and point.cvalue is not None else point.value
if value is not None:
data[point_name] = value
except:
pass
return data
except Exception as e:
print(f"❌ Error reading model {model_id}: {e}")
return None
def get_device_info(self):
"""Get device information (Model 1)."""
return self._get_model_data(1)
def get_battery_status(self):
"""Get battery status (Model 713)."""
return self._get_model_data(713)
def get_dc_measurements(self):
"""Get DC measurements (Model 714)."""
return self._get_model_data(714)
def get_ac_measurements(self):
"""Get AC measurements (Model 701)."""
return self._get_model_data(701)
def get_battery_capabilities(self):
"""Get battery capabilities (Model 802)."""
return self._get_model_data(802)
def set_power(self, power_watts):
"""
Set power setpoint (Model 704).
Args:
power_watts (float): Power in watts
Positive = Discharge
Negative = Charge
Returns:
bool: Success status
"""
if not self._ensure_connected():
return False
try:
control = self.device.models[704][0]
# Enable and set power using points dictionary
control.points['WSetEna'].value = 1
control.points['WSetMod'].value = 1
control.points['WSet'].cvalue = power_watts
control.write()
print(f"✓ Power set to {power_watts} W")
return True
except Exception as e:
print(f"❌ Error setting power: {e}")
return False
def get_all_data(self):
"""Get all available data from all models."""
return {
'device_info': self.get_device_info(),
'battery_status': self.get_battery_status(),
'dc_measurements': self.get_dc_measurements(),
'ac_measurements': self.get_ac_measurements(),
'battery_capabilities': self.get_battery_capabilities()
}
def disconnect(self):
"""Disconnect from device."""
if self.device:
self.device.close()
self.connected = False
print("✓ Disconnected")
print("✓ CBatteryClient class defined")
print("\nExample usage:")
print(" battery = CBatteryClient('10.31.0.25')")
print(" status = battery.get_battery_status()")
print(" print(status) # Returns dict with all available points")
print(" battery.set_power(5000) # Discharge 5kW")
print(" battery.disconnect()")✓ CBatteryClient class defined
Example usage:
battery = CBatteryClient('10.31.0.25')
status = battery.get_battery_status()
print(status) # Returns dict with all available points
battery.set_power(5000) # Discharge 5kW
battery.disconnect()
# Example: Use production class (uncomment to use)
# battery_client = CBatteryClient(CBAT_IP, device_id=DEVICE_ID)
#
# # Get all data
# all_data = battery_client.get_all_data()
# print("\nAll Data:")
# for key, value in all_data.items():
# print(f"\n{key}:")
# print(value)
#
# # Clean up
# battery_client.disconnect()
print("Uncomment the lines above to test the production class")Uncomment the lines above to test the production class
# 1. Import library
import sunspec2.modbus.client as sunspec_client
# 2. Connect
device = sunspec_client.SunSpecModbusClientDeviceTCP(
slave_id=1,
ipaddr='10.31.0.25',
ipport=502
)
device.scan()
# 3. Read battery status
storage = device.models[713][0]
storage.read()
print(f"SoC: {storage.SoC.cvalue}%")
# 4. Set power
control = device.models[704][0]
control.WSetEna.value = 1
control.WSetMod.value = 1
control.WSet.cvalue = 5000 # 5kW discharge
control.write()| Model | Name | Key Points |
|---|---|---|
| 1 | Common | Device info, manufacturer, serial |
| 701 | AC Measurements | AC power, voltage, current, frequency |
| 704 | Power Control | Set power setpoints, enable control |
| 713 | Battery Status | SoC, SoH, battery state |
| 714 | DC Measurements | DC power, current, energy counters |
| 802 | Battery Capabilities | Max power, capacity limits |
Version: 1.0
Last Updated: 20-10-2025
Status: Production Ready
Always close connections when done.