Notifications
Clear all
Feb 27, 2026 7:26 pm
from roboclaw_msgs.msg import MotorState
from tcr_roboclaw import Roboclaw
from rclpy.node import Node
from copy import deepcopy
class ElectricalWrapper:
def __init__(self, node: Node, driver: Roboclaw, pub_elec: bool, swap_motors: bool):
"""Electrical Wrapper
Args:
node (rclpy.node.Node): ROS 2 node
driver (tcr_roboclaw.Roboclaw): RoboClaw driver
pub_elec (bool): Publish electrical data
swap_motors (bool): Swap the left and right motors
"""
# Node parameters
self.driver = driver
self.node = node
self.clock = self.node.get_clock()
self.logger = self.node.get_logger()
self.PUB_ELEC = pub_elec
self.swap_motors = swap_motors
# Electrical data
self.timestamp = self.clock.now()
self.voltage = 0.0
self.currents = [0.0, 0.0]
self.temperature = 0.0
self.duty_cycles = [0.0, 0.0]
self.poll_electrical_data()
# Publishers
self.left_elec_pub = self.node.create_publisher(
MotorState,
"/motors/left/electrical",
1,
)
self.right_elec_pub = self.node.create_publisher(
MotorState,
"/motors/right/electrical",
1,
)
def update_and_publish(self):
"""Update and publish electrical data"""
if self.poll_electrical_data():
if self.PUB_ELEC:
self.publish_elec()
else:
self.logger.warn("Failed to poll electrical data.")
def poll_electrical_data(self):
"""Poll electrical data from the Roboclaw"""
status1, status2, status3, status4 = 0, 0, 0, 0
try:
self.timestamp = self.clock.now()
status1, *currents = self.driver.ReadCurrents()
status2, voltage = self.driver.ReadMainBatteryVoltage()
status3, *pwms = self.driver.ReadPWMs()
status4, temp = self.driver.ReadTemp()
except Exception as e:
self.logger.warn(f"Read electrical data error: {str(e)}")
if status1 == 1:
self.currents = [curr / 100 for curr in currents]
if status2 == 1:
self.voltage = voltage / 10
if status3 == 1:
self.duty_cycles = [pwm / 327.67 for pwm in pwms]
if status4 == 1:
self.temperature = temp / 10
return status1 and status2 and status3 and status4
def publish_elec(self):
"""Publish electrical data in two MotorState messages"""
motor_state = MotorState()
motor_state.header.stamp = self.timestamp.to_msg()
motor_state.header.frame_id = "base_link"
motor_state.voltage = self.voltage
motor_state.temperature = self.temperature
motor_state1, motor_state2 = deepcopy(motor_state), deepcopy(motor_state)
motor_state1.current = self.currents[0]
motor_state1.duty = self.duty_cycles[0]
motor_state2.current = self.currents[1]
motor_state2.duty = self.duty_cycles[1]
if self.swap_motors:
self.right_elec_pub.publish(motor_state2)
self.left_elec_pub.publish(motor_state1)
else:
self.right_elec_pub.publish(motor_state1)
self.left_elec_pub.publish(motor_state2)
2 Replies
Feb 27, 2026 7:29 pm
encoder_wrapper.py
f
from math import cos, pi, sin, fmod
from tcr_roboclaw import Roboclaw
from rclpy.node import Node
from geometry_msgs.msg import Quaternion, TransformStamped
from nav_msgs.msg import Odometry
from roboclaw_msgs.msg import EncoderState
from tf_transformations import quaternion_from_euler
from tf2_ros import TransformBroadcaster
COUNTER_MAX = 2**32
class EncoderWrapper:
def __init__(
self,
node: Node,
driver: Roboclaw,
ticks_per_meter: float,
ticks_per_rotation: float,
base_width: float,
pub_odom: bool,
pub_encoders: bool,
pub_tf: bool,
swap_motors: bool,
):
"""Encoder Wrapper
Args:
node (rclpy.node.Node): ROS 2 node
driver (tcr_roboclaw.Roboclaw): Roboclaw driver
ticks_per_meter (float): Encoder ticks per meter
ticks_per_rotation (float): Encoder ticks per rotation
base_width (float): Distance between the two wheels
pub_odom (bool): Publish odometry data
pub_encoders (bool): Publish encoder data
pub_tf (bool): Publish the transform from odom to base_link
swap_motors (bool): Swap the left and right motors
"""
self.TICKS_PER_METER = ticks_per_meter
self.TICKS_PER_ROTATION = ticks_per_rotation
self.BASE_WIDTH = base_width
self.driver = driver
# Node parameters
self.node = node
self.clock = self.node.get_clock()
self.logger = self.node.get_logger()
self.PUB_ODOM = pub_odom
self.PUB_ENCODERS = pub_encoders
self.PUB_TF = pub_tf
self.swap_motors = swap_motors
# Encoder data
self.timestamp = self.clock.now()
self.left_ticks = [0, 0]
self.right_ticks = [0, 0]
self.left_velocity = 0
self.right_velocity = 0
self.poll_encoders()
# Odometry data
self.pose_x = 0
self.pose_y = 0
self.pose_theta = 0.0
self.vel_x = 0.0
self.vel_theta = 0.0
# Publishers
# TODO: Topics should be parameters
self.left_encoder_pub = self.node.create_publisher(
EncoderState,
"/motors/left/encoder",
10,
)
self.right_encoder_pub = self.node.create_publisher(
EncoderState,
"/motors/right/encoder",
10,
)
self.odom_pub = self.node.create_publisher(Odometry, "/motors/odometry", 10)
self.tf_broadcaster = TransformBroadcaster(self.node)
@staticmethod
def normalize_angle(angle):
return fmod(angle + pi, 2.0 * pi) - pi
def update_and_publish(self):
"""Update the odometry and publish the data"""
if self.poll_encoders():
self.update_odometry()
if self.PUB_ODOM:
self.publish_odometry()
if self.PUB_ENCODERS:
self.publish_encoder_data()
if self.PUB_TF:
self.broadcast_tf()
else:
self.logger.warn("Failed to poll encoders.")
def poll_encoders(self):
"""Poll the encoder data from the hardware"""
status_enc, status_speed = 0, 0
try:
self.timestamp = self.clock.now()
status_enc, ticks1, ticks2 = self.driver.GetEncoderCounters()
status_speed, speed1, speed2 = self.driver.GetMotorAverageSpeeds()
except Exception as e:
self.logger.warn(f"Read encoders error: {str(e)}")
# Update the encoder ticks
if status_enc == 1:
if self.swap_motors:
self.right_ticks = [self.right_ticks[1], ticks1]
self.left_ticks = [self.left_ticks[1], ticks2]
else:
self.right_ticks = [self.right_ticks[1], ticks2]
self.left_ticks = [self.left_ticks[1], ticks1]
# Update the motor velocities
if status_speed == 1:
if self.swap_motors:
self.right_velocity = speed1
self.left_velocity = speed2
else:
self.right_velocity = speed2
self.left_velocity = speed1
return status_enc and status_speed
def update_odometry(self):
"""Update the odometry estimation"""
# Compute the ticks difference (handle overflow)
delta_ticks_left = self.left_ticks[1] - self.left_ticks[0]
delta_ticks_right = self.right_ticks[1] - self.right_ticks[0]
# Handle overflow and underflow
if delta_ticks_left < -COUNTER_MAX / 2:
delta_ticks_left += COUNTER_MAX
elif delta_ticks_left > COUNTER_MAX / 2:
delta_ticks_left -= COUNTER_MAX
if delta_ticks_right < -COUNTER_MAX / 2:
delta_ticks_right += COUNTER_MAX
elif delta_ticks_right > COUNTER_MAX / 2:
delta_ticks_right -= COUNTER_MAX
# Compute the traveled distance
dist_left = delta_ticks_left / self.TICKS_PER_METER
dist_right = delta_ticks_right / self.TICKS_PER_METER
dist_center = (dist_right + dist_left) / 2.0
# Compute the displacement
d_theta = (dist_right - dist_left) / self.BASE_WIDTH
d_x = dist_center * cos(self.pose_theta + d_theta / 2.0)
d_y = dist_center * sin(self.pose_theta + d_theta / 2.0)
# Update the pose
self.pose_x += d_x
self.pose_y += d_y
self.pose_theta = self.normalize_angle(self.pose_theta + d_theta)
# Compute the linear and angular velocities
vel_left = self.left_velocity / self.TICKS_PER_METER
vel_right = self.right_velocity / self.TICKS_PER_METER
self.vel_x = (vel_left + vel_right) / 2.0
self.vel_theta = (vel_right - vel_left) / self.BASE_WIDTH
def publish_odometry(self):
"""Publish odometry data to the ROS network"""
odom = Odometry()
odom.header.stamp = self.timestamp.to_msg()
odom.header.frame_id = "odom"
odom.child_frame_id = "base_link"
odom.pose.pose.position.x = self.pose_x
odom.pose.pose.position.y = self.pose_y
odom.pose.pose.position.z = 0.0
quat = quaternion_from_euler(0, 0, self.pose_theta)
odom.pose.pose.orientation = Quaternion(
x=quat[0],
y=quat[1],
z=quat[2],
w=quat[3],
)
odom.twist.twist.linear.x = self.vel_x
odom.twist.twist.linear.y = 0.0
odom.twist.twist.angular.z = self.vel_theta
self.odom_pub.publish(odom)
def publish_encoder_data(self):
"""Publish the encoder data to the ROS network"""
encoder_state = EncoderState()
encoder_state.header.stamp = self.timestamp.to_msg()
encoder_state.header.frame_id = "base_link"
encoder_state.ticks_per_rotation = float(self.TICKS_PER_ROTATION)
encoder_state.ticks_per_meter = float(self.TICKS_PER_METER)
encoder_state.position = self.left_ticks[1]
encoder_state.velocity = float(self.left_velocity)
self.left_encoder_pub.publish(encoder_state)
encoder_state.position = self.right_ticks[1]
encoder_state.velocity = float(self.right_velocity)
self.right_encoder_pub.publish(encoder_state)
def broadcast_tf(self):
"""Broadcast the transform from odom to base_link"""
t = TransformStamped()
t.header.stamp = self.timestamp.to_msg()
t.header.frame_id = "odom"
t.child_frame_id = "base_link"
t.transform.translation.x = self.pose_x
t.transform.translation.y = self.pose_y
t.transform.translation.z = 0.0
quat = quaternion_from_euler(0, 0, self.pose_theta)
t.transform.rotation = Quaternion(x=quat[0], y=quat[1], z=quat[2], w=quat[3])
self.tf_broadcaster.sendTransform(t)
Feb 27, 2026 7:30 pm
roboclaw_node.py
#!/usr/bin/env python
import diagnostic_updater
import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
from tcr_roboclaw import Roboclaw
from .electrical_wrapper import ElectricalWrapper
from .encoder_wrapper import EncoderWrapper
from . import utils as u
import serial # raw serial helper (used only in apply_voltage_limits)
# -----------------------------
# Raw RoboClaw packet helpers
# -----------------------------
def _crc16_ccitt(packet: bytes) -> int:
"""RoboClaw CRC16-CCITT (poly 0x1021), init 0."""
crc = 0
for b in packet:
crc ^= (b << 8)
for _ in range(8):
if crc & 0x8000:
crc = ((crc << 1) ^ 0x1021) & 0xFFFF
else:
crc = (crc << 1) & 0xFFFF
return crc
def _send_packet_expect_ff(
ser: serial.Serial, payload_wo_crc: bytes, timeout_s: float = 0.2
) -> bool:
"""Send payload (addr+cmd+data), append CRC16. Expect 0xFF ack."""
crc = _crc16_ccitt(payload_wo_crc)
pkt = payload_wo_crc + crc.to_bytes(2, "big")
ser.reset_input_buffer()
ser.write(pkt)
ser.flush()
ser.timeout = timeout_s
ack = ser.read(1)
return ack == b"\xFF"
def _read_exact(ser: serial.Serial, n: int, timeout_s: float = 0.3) -> bytes:
ser.timeout = timeout_s
data = ser.read(n)
if len(data) != n:
raise RuntimeError(
f"Timeout: expected {n} bytes, got {len(data)} bytes: {data!r}"
)
return data
def _set_main_voltages_raw(
ser: serial.Serial, addr: int, vmin_tenths: int, vmax_tenths: int, offset_tenths: int
) -> bool:
"""
57 - Set Main Battery Voltages
Send: [Address,57,Min(2),Max(2),AutoMaxMainBatOffset(1),CRC16(2)]
Receive: [0xFF]
"""
payload = bytearray([addr, 57])
payload += int(vmin_tenths).to_bytes(2, "big")
payload += int(vmax_tenths).to_bytes(2, "big")
payload += bytes([int(offset_tenths) & 0xFF])
return _send_packet_expect_ff(ser, bytes(payload))
def _set_logic_voltages_raw(
ser: serial.Serial, addr: int, vmin_tenths: int, vmax_tenths: int
) -> bool:
"""
58 - Set Logic Battery Voltages
Send: [Address,58,Min(2),Max(2),CRC16(2)]
Receive: [0xFF]
"""
payload = bytearray([addr, 58])
payload += int(vmin_tenths).to_bytes(2, "big")
payload += int(vmax_tenths).to_bytes(2, "big")
return _send_packet_expect_ff(ser, bytes(payload))
def _read_main_voltage_settings_raw(ser: serial.Serial, addr: int):
"""
59 - Read Main Battery Voltage Settings
Receive: [Min(2),Max(2),AutoMaxMainBatOffset(1),CRC16(2)]
CRC computed over [Address, Command, Data...]
"""
cmd = bytes([addr, 59])
ser.reset_input_buffer()
ser.write(cmd + _crc16_ccitt(cmd).to_bytes(2, "big"))
ser.flush()
resp = _read_exact(ser, 2 + 2 + 1 + 2)
data = resp[0:5]
recv_crc = int.from_bytes(resp[5:7], "big")
check = _crc16_ccitt(bytes([addr, 59]) + data)
if recv_crc != check:
raise RuntimeError(
f"Bad CRC on ReadMainVoltageSettings: recv {recv_crc:04X} != calc {check:04X}"
)
minv = int.from_bytes(data[0:2], "big")
maxv = int.from_bytes(data[2:4], "big")
off = data[4]
return (minv, maxv, off)
def _read_logic_voltage_settings_raw(ser: serial.Serial, addr: int):
"""
60 - Read Logic Battery Voltage Settings
Receive: [Min(2),Max(2),CRC16(2)]
CRC over [Address, Command, Data...]
"""
cmd = bytes([addr, 60])
ser.reset_input_buffer()
ser.write(cmd + _crc16_ccitt(cmd).to_bytes(2, "big"))
ser.flush()
resp = _read_exact(ser, 2 + 2 + 2)
data = resp[0:4]
recv_crc = int.from_bytes(resp[4:6], "big")
check = _crc16_ccitt(bytes([addr, 60]) + data)
if recv_crc != check:
raise RuntimeError(
f"Bad CRC on ReadLogicVoltageSettings: recv {recv_crc:04X} != calc {check:04X}"
)
minv = int.from_bytes(data[0:2], "big")
maxv = int.from_bytes(data[2:4], "big")
return (minv, maxv)
class RoboclawNode(Node):
"""RoboClaw Node"""
def apply_voltage_limits(self):
min_main_t = int(round(float(self.min_main_voltage) * 10))
max_main_t = int(round(float(self.max_main_voltage) * 10))
min_logic_t = int(round(float(self.min_logic_voltage) * 10))
max_logic_t = int(round(float(self.max_logic_voltage) * 10))
offset_t = int(round(float(self.automax_main_offset_voltage) * 10))
offset_t = max(0, min(255, offset_t))
baud = int(self.BAUD)
targets = [(str(self.DEV), int(self.ADDRESS))]
if self.MIRROR_SECOND:
targets.append((str(self.DEV2), int(self.ADDRESS2)))
for port, addr in targets:
self.get_logger().info(
f"Applying RoboClaw voltage limits on {port} @ {baud}, addr={addr}: "
f"MAIN {min_main_t/10:.1f}-{max_main_t/10:.1f}V (offset {offset_t/10:.1f}V), "
f"LOGIC {min_logic_t/10:.1f}-{max_logic_t/10:.1f}V"
)
try:
with serial.Serial(port, baud, timeout=0.3) as ser:
ok_main = _set_main_voltages_raw(ser, addr, min_main_t, max_main_t, offset_t)
ok_logic = _set_logic_voltages_raw(ser, addr, min_logic_t, max_logic_t)
if not ok_main:
self.get_logger().warn(f"[{port}] Set Main Voltage (57) did not ACK (0xFF).")
if not ok_logic:
self.get_logger().warn(f"[{port}] Set Logic Voltage (58) did not ACK (0xFF).")
try:
main_min, main_max, main_off = _read_main_voltage_settings_raw(ser, addr)
logic_min, logic_max = _read_logic_voltage_settings_raw(ser, addr)
self.get_logger().info(
f"[{port}] Voltage settings now: "
f"MAIN {main_min/10:.1f}-{main_max/10:.1f}V (offset {main_off/10:.1f}V), "
f"LOGIC {logic_min/10:.1f}-{logic_max/10:.1f}V"
)
except Exception as e:
self.get_logger().warn(f"[{port}] Could not read back voltage settings (59/60): {e}")
except Exception as e:
self.get_logger().warn(f"[{port}] Failed to apply voltage limits: {e}")
def __init__(self):
super().__init__("roboclaw_node")
self.read_parameters()
# Board 1
self.driver = self.init_device(self.DEV, self.ADDRESS, self.BAUD)
# Board 2 (optional mirror)
self.driver2 = None
if self.MIRROR_SECOND:
self.driver2 = self.init_device(self.DEV2, self.ADDRESS2, self.BAUD)
# Apply voltage limits (one-time at startup)
self.apply_voltage_limits()
# Reset + configure both boards
self.reset_device()
self.configure_device()
self.create_subscribers()
self.create_wrappers()
self.create_timers()
self.updater = diagnostic_updater.Updater(self)
self.updater.setHardwareID("RoboClaw")
self.updater.add(diagnostic_updater.FunctionDiagnosticTask("Vitals", self.diagnostics))
self.last_cmd_timestamp = self.get_clock().now().nanoseconds
# NEW: for encoder debug logging throttling
self._last_enc_log_ns = 0
def read_parameters(self):
"""Read parameters from the parameter server"""
self.get_logger().info("Reading parameters...")
# Device params
self.DEV = self.init_parameter("dev", "/dev/ttyACM0")
self.BAUD = self.init_parameter("baud", 38400)
self.ADDRESS = self.init_parameter("address", 128)
self.serial_timeout = self.init_parameter("serial_timeout", 0.0)
if self.ADDRESS > 0x87 or self.ADDRESS < 0x80:
self.get_logger().fatal("Address out of range")
self.shutdown("Address out of range")
# Odometry params
self.TICKS_PER_METER = self.init_parameter("ticks_per_meter", 4342.2)
self.TICKS_PER_ROTATION = self.init_parameter("ticks_per_rotation", 2780)
self.BASE_WIDTH = self.init_parameter("base_width", 0.315)
# Movement params
self.max_speed_linear = self.init_parameter("max_speed_linear", 2.0)
self.max_speed_angular = self.init_parameter("max_speed_angular", 1.0)
self.accel = int(self.init_parameter("acceleration", 1.0) * self.TICKS_PER_METER)
self.swap_motors = self.init_parameter("swap_motors", False)
self.stop_if_idle = self.init_parameter("stop_if_idle", True)
self.idle_timeout = self.init_parameter("idle_timeout", 1.0)
# Roboclaw params
self.custom_pid = self.init_parameter("custom_pid", True)
self.P = self.init_parameter("p_constant", 3.0)
self.I = self.init_parameter("i_constant", 0.42)
self.D = self.init_parameter("d_constant", 0.0)
self.qpps = self.init_parameter("qpps", 6000)
# NEW: encoder debug prints (helps detect reversed/dead encoder)
self.debug_encoders = self.init_parameter("debug_encoders", True)
self.debug_encoder_rate_hz = self.init_parameter("debug_encoder_rate_hz", 2.0) # print ~2 times/sec
# NEW: optionally disable dynamic PID writes (helps rule out instability)
self.enable_dynamic_params = self.init_parameter("enable_dynamic_params", False)
self.dynamic_params_period = self.init_parameter("dynamic_params_period", 1.0)
# Publishing params
self.ODOM_RATE = self.init_parameter("odom_rate", 10)
self.ELEC_RATE = self.init_parameter("elec_rate", 1)
self.PUBLISH_ODOM = self.init_parameter("publish_odom", True)
self.PUBLISH_ENCODERS = self.init_parameter("publish_encoders", True)
self.PUBLISH_ELEC = self.init_parameter("publish_elec", True)
self.PUBLISH_TF = self.init_parameter("publish_tf", False)
# IO params
self.S3Mode = self.init_parameter("S3_mode", 0x00)
self.S4Mode = self.init_parameter("S4_mode", 0x00)
self.S5Mode = self.init_parameter("S5_mode", 0x00)
# Voltage limit params (RoboClaw EEPROM settings)
self.min_main_voltage = self.init_parameter("min_main_voltage", 5.0)
self.max_main_voltage = self.init_parameter("max_main_voltage", 24.0)
self.min_logic_voltage = self.init_parameter("min_logic_voltage", 5.0)
self.max_logic_voltage = self.init_parameter("max_logic_voltage", 24.0)
self.automax_main_offset_voltage = self.init_parameter("automax_main_offset_voltage", 2.0)
# Optional 2nd RoboClaw
self.MIRROR_SECOND = self.init_parameter("mirror_second_board", False)
self.DEV2 = self.init_parameter("dev2", "")
self.ADDRESS2 = self.init_parameter("address2", self.ADDRESS)
# NEW: cmd_vel topic (so we can subscribe to twist_mux output)
self.cmd_vel_topic = self.init_parameter("cmd_vel_topic", "/twist_mux/cmd_vel")
def init_parameter(self, name, default):
"""Initialize a parameter and log it"""
value = self.declare_parameter(name, default).value
self.get_logger().info(f" {name}: {value}")
return value
def update_parameters(self):
"""
OPTIONAL dynamic update.
NOTE: Writing PID settings repeatedly while driving can cause weirdness on some setups.
That's why enable_dynamic_params defaults to False.
"""
self.max_speed_linear = self.get_parameter("max_speed_linear").value
self.max_speed_angular = self.get_parameter("max_speed_angular").value
self.stop_if_idle = self.get_parameter("stop_if_idle").value
self.idle_timeout = self.get_parameter("idle_timeout").value
self.custom_pid = self.get_parameter("custom_pid").value
self.P = self.get_parameter("p_constant").value
self.I = self.get_parameter("i_constant").value
self.D = self.get_parameter("d_constant").value
self.qpps = self.get_parameter("qpps").value
self.configure_device()
def init_device(self, dev_name: str, address: int, baud_rate: int):
"""Initialize the Roboclaw device and connect to it"""
try:
self.get_logger().info(f"Connecting to device {dev_name} with address {str(address)}")
driver = Roboclaw(dev_name, baud_rate, address)
driver.open()
self.get_logger().info("Connected!")
except Exception as e:
self.get_logger().fatal("Could not connect to device.")
self.get_logger().debug(e)
self.shutdown("Could not connect to Roboclaw")
try:
_, version = driver.ReadVersion()
self.get_logger().info(f"Roboclaw version: {str(version)}")
except Exception as e:
self.get_logger().warn("Problem getting Roboclaw version")
self.get_logger().debug(e)
return driver
def reset_device(self):
"""Reset the roboclaw speed and encoder counters"""
self.stop_motors()
try:
self.driver.ResetEncoders()
except OSError as e:
self.get_logger().warn(f"ResetEncoders OSError: {str(e)}")
if self.driver2 is not None:
try:
self.driver2.ResetEncoders()
except OSError as e:
self.get_logger().warn(f"ResetEncoders OSError (driver2): {str(e)}")
def configure_device(self):
"""Configure both boards: timeout, PID, pin modes."""
def _cfg(drv):
drv.SetSerialTimeout(int(self.serial_timeout * 10))
if self.custom_pid:
drv.SetM1VelocityPID(self.P, self.I, self.D, self.qpps)
drv.SetM2VelocityPID(self.P, self.I, self.D, self.qpps)
drv.SetPinFunctions(self.S3Mode, self.S4Mode, self.S5Mode)
_cfg(self.driver)
if self.driver2 is not None:
_cfg(self.driver2)
def create_subscribers(self):
"""Create subscribers for the node"""
self.cmd_vel_sub = self.create_subscription(
Twist,
str(self.cmd_vel_topic),
self.cmd_vel_callback,
10,
)
self.get_logger().info(f"Subscribed to cmd_vel_topic: {self.cmd_vel_topic}")
def create_wrappers(self):
"""Create the encoder and electrical wrappers (NOTE: wrappers use driver1 only)"""
self.encoder_wrapper = EncoderWrapper(
self,
self.driver,
self.TICKS_PER_METER,
self.TICKS_PER_ROTATION,
self.BASE_WIDTH,
self.PUBLISH_ODOM,
self.PUBLISH_ENCODERS,
self.PUBLISH_TF,
self.swap_motors,
)
self.electrical_wrapper = ElectricalWrapper(self, self.driver, self.PUBLISH_ELEC, self.swap_motors)
def create_timers(self):
"""Create timers for the node"""
if self.ODOM_RATE:
self.odom_timer = self.create_timer(1.0 / self.ODOM_RATE, self.encoder_wrapper.update_and_publish)
if self.ELEC_RATE:
self.elec_timer = self.create_timer(1.0 / self.ELEC_RATE, self.electrical_wrapper.update_and_publish)
self.idle_timer = self.create_timer(1 / 30, self.idle_callback)
# NEW: make dynamic param updates optional (default False)
if bool(self.enable_dynamic_params):
period = float(self.dynamic_params_period)
if period <= 0:
period = 1.0
self.dynamic_params_timer = self.create_timer(period, self.update_parameters)
def idle_callback(self):
"""Stop the robot if no commands are received for 'idle_timeout' seconds"""
now = self.get_clock().now().nanoseconds
if self.stop_if_idle and now - self.last_cmd_timestamp > self.idle_timeout * 1e9:
self.get_logger().debug(
f"Did not get command for {self.idle_timeout} second, stopping"
)
self.stop_motors()
# -----------------------------
# NEW: debug encoder helper
# -----------------------------
def _maybe_log_encoders(self):
if not self.debug_encoders:
return
rate_hz = float(self.debug_encoder_rate_hz) if self.debug_encoder_rate_hz else 0.0
if rate_hz <= 0:
return
now_ns = self.get_clock().now().nanoseconds
min_period_ns = int(1e9 / rate_hz)
if now_ns - self._last_enc_log_ns < min_period_ns:
return
self._last_enc_log_ns = now_ns
try:
e1 = self.driver.ReadEncM1()[1]
e2 = self.driver.ReadEncM2()[1]
self.get_logger().info(f"[ENC] board1 M1={e1} M2={e2}")
except Exception as e:
self.get_logger().warn(f"[ENC] board1 read failed: {e}")
if self.driver2 is not None:
try:
e3 = self.driver2.ReadEncM1()[1]
e4 = self.driver2.ReadEncM2()[1]
self.get_logger().info(f"[ENC] board2 M1={e3} M2={e4}")
except Exception as e:
self.get_logger().warn(f"[ENC] board2 read failed: {e}")
def cmd_vel_callback(self, twist):
"""Callback for /cmd_vel topic, move the robot according to the Twist message"""
# Limit the speed
linear_x = min(max(twist.linear.x, -self.max_speed_linear), self.max_speed_linear)
angular_z = min(max(twist.angular.z, -self.max_speed_angular), self.max_speed_angular)
if abs(linear_x) < 0.01 and abs(angular_z) < 0.01:
self.stop_motors()
return
# Convert to motor speeds (ticks/s)
right_speed = int((linear_x + angular_z * self.BASE_WIDTH / 2) * self.TICKS_PER_METER)
left_speed = int((linear_x - angular_z * self.BASE_WIDTH / 2) * self.TICKS_PER_METER)
self.get_logger().debug(
f"Sending command -> left: {left_speed}, right: {right_speed} (ticks/sec)"
)
# Send the command
try:
# Your preferred mapping:
# each board: M1=left, M2=right
if self.swap_motors:
# swap only swaps left/right assignment
self.driver.SpeedAccelM1M2(self.accel, right_speed, left_speed)
if self.driver2 is not None:
self.driver2.SpeedAccelM1M2(self.accel, right_speed, left_speed)
else:
self.driver.SpeedAccelM1M2(self.accel, left_speed, right_speed)
if self.driver2 is not None:
self.driver2.SpeedAccelM1M2(self.accel, left_speed, right_speed)
self.last_cmd_timestamp = self.get_clock().now().nanoseconds
# NEW: encoder debug prints (throttled)
self._maybe_log_encoders()
except OSError as e:
# NEW: if command fails, STOP BOTH immediately (prevents "one motor keeps running")
self.get_logger().warn(f"SpeedAccelM1M2 OSError: {str(e)} -> stopping motors")
self.stop_motors()
except Exception as e:
# Catch-all safety stop
self.get_logger().warn(f"SpeedAccelM1M2 unexpected error: {e} -> stopping motors")
self.stop_motors()
def diagnostics(self, stat):
"""Read the error status of the Roboclaw and report it as diagnostics"""
stat.summary(diagnostic_updater.DiagnosticStatus.STALE, "No data")
try:
status = self.driver.ReadError()[1]
statuses = u.decipher_rclaw_status(status)
for state, message in statuses:
stat.summary(state, message)
self.get_logger().debug("Got unit status: " + str(hex(status)))
except Exception as e:
self.get_logger().warn("Diagnostics error: " + str(e))
self.get_logger().debug(e)
return
return stat
def stop_motors(self):
"""Stop motors aggressively; handle each board independently."""
# Board 1
try:
# Immediate stop if supported by your library
self.driver.SpeedM1M2(0, 0)
except Exception:
pass
try:
# Ramp stop fallback
self.driver.SpeedAccelM1M2(self.accel, 0, 0)
except Exception as e:
self.get_logger().warn(f"Stop failed on driver1: {e}")
# Board 2
if self.driver2 is not None:
try:
self.driver2.SpeedM1M2(0, 0)
except Exception:
pass
try:
self.driver2.SpeedAccelM1M2(self.accel, 0, 0)
except Exception as e:
self.get_logger().warn(f"Stop failed on driver2: {e}")
self.last_cmd_timestamp = self.get_clock().now().nanoseconds
def shutdown(self, str_msg):
"""Shutdown the node"""
self.get_logger().info("Shutting down: " + str_msg)
self.stop_motors()
self.stop_motors() # Call twice to make sure
def main(args=None):
rclpy.init(args=args)
roboclaw_node = RoboclawNode()
rclpy.spin(roboclaw_node)
roboclaw_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()