Conversation
Greptile OverviewGreptile SummaryImplemented a comprehensive MuJoCo simulation module that provides a simulation backend for robotic manipulators. The implementation includes:
The architecture is clean with clear separation of concerns: engine layer handles physics simulation, interface layer provides hardware abstraction, and module layer handles messaging/transport. Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant SimModule as SimulationModule
participant SimManip as SimManipInterface
participant MujocoEngine as MujocoEngine
participant MuJoCo as MuJoCo Library
User->>SimModule: start()
SimModule->>SimManip: connect()
SimManip->>MujocoEngine: connect()
MujocoEngine->>MuJoCo: load XML model
MuJoCo-->>MujocoEngine: model + data
MujocoEngine->>MujocoEngine: start sim_loop thread
MujocoEngine-->>SimManip: connected
SimManip-->>SimModule: connected
SimModule->>SimModule: start control_loop thread
SimModule->>SimModule: start monitor_loop thread
loop Control Loop
User->>SimModule: publish JointCommand
SimModule->>SimModule: _on_joint_position_command
SimModule->>SimManip: write_joint_positions
SimManip->>MujocoEngine: write_joint_command
MujocoEngine->>MujocoEngine: update targets
MujocoEngine->>MuJoCo: mj_step
MujocoEngine->>SimManip: read_joint_positions
SimManip-->>SimModule: positions
SimModule->>User: publish JointState
end
loop Monitor Loop
SimModule->>SimManip: read_state
SimManip-->>SimModule: state data
SimModule->>User: publish RobotState
end
|
| def _on_joint_velocity_command(self, msg: JointCommand) -> None: | ||
| with self._command_lock: | ||
| self._pending_velocities = list(msg.positions) | ||
| self._pending_positions = None |
There was a problem hiding this comment.
syntax: JointCommand doesn't have a velocities field - it only has positions. This will cause an AttributeError when velocity commands are received.
| def _on_joint_velocity_command(self, msg: JointCommand) -> None: | |
| with self._command_lock: | |
| self._pending_velocities = list(msg.positions) | |
| self._pending_positions = None | |
| def _on_joint_velocity_command(self, msg: JointCommand) -> None: | |
| with self._command_lock: | |
| self._pending_velocities = list(msg.positions) | |
| self._pending_positions = None |
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
There was a problem hiding this comment.
This is not a scalable approach. Having to build plugin for each oem manufacturer + sim engine is not maintainable.
The robot arms should not be specific to any SimEngine
| def _xarm_limits(dof: int) -> JointLimits: | ||
| if dof == 7: | ||
| lower_deg = [-360, -118, -360, -233, -360, -97, -360] | ||
| upper_deg = [360, 118, 360, 11, 360, 180, 360] | ||
| elif dof == 6: | ||
| lower_deg = [-360, -118, -225, -11, -360, -97] | ||
| upper_deg = [360, 118, 11, 225, 360, 180] | ||
| else: | ||
| lower_deg = [-360, -118, -225, -97, -360] | ||
| upper_deg = [360, 118, 11, 180, 360] | ||
|
|
||
| lower_rad = [math.radians(d) for d in lower_deg[:dof]] | ||
| upper_rad = [math.radians(d) for d in upper_deg[:dof]] | ||
| max_vel_rad = math.radians(180.0) | ||
| return JointLimits( | ||
| position_lower=lower_rad, | ||
| position_upper=upper_rad, | ||
| velocity_max=[max_vel_rad] * dof, | ||
| ) |
There was a problem hiding this comment.
The sim module should get a Robot config that has all its limits, assets etc.
Robot shouldn't need a registry to be loaded, but should be defined in the blueprint
| model: str | None = None | ||
|
|
||
|
|
||
| class SimulationEngine(ABC): |
dimos/simulation/engines/base.py
Outdated
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class RobotSpec: |
There was a problem hiding this comment.
Shouldn't this extend some existing general robot spec @mustafab0 ? Yeah pretty sure this should subclass, and shouldnt this take urdf @jca0
| for i in range(limit): | ||
| self._joint_position_targets[i] = float(positions[i]) | ||
|
|
||
| def write_joint_velocities(self, velocities: list[float]) -> None: |
There was a problem hiding this comment.
Needs to be changed to list of some velocity type. I assume Vector3
There was a problem hiding this comment.
doesn't make sense to. Vector3 is 3D, Twist is 6D, joints can only go in 2 directions
There was a problem hiding this comment.
Ideally a JointState message
Both write_joint_position & write_joint_velocities can be refactored into 1 write_joint_command/etc and take in a standard JointState message that can set Pos/Vel/Effort
| self._pending_positions = None | ||
|
|
||
| def _control_loop(self) -> None: | ||
| period = 1.0 / max(self._control_rate, 1.0) |
There was a problem hiding this comment.
The second 1.0 here is the minimum control rate so should be an env var called MIN CONTROL RATE or something
| time.sleep(period) | ||
|
|
||
| def _monitor_loop(self) -> None: | ||
| period = 1.0 / max(self._monitor_rate, 1.0) |
| return [f"{self._joint_prefix}{i + 1}" for i in range(dof)] | ||
|
|
||
|
|
||
| def get_blueprint() -> dict[str, Any]: |
There was a problem hiding this comment.
Just return blueprint how we normally do idk if we support returning blueprint via dict like this or if converting this back to .blueprint() type even works.
| joints=[float(p) for p in positions], | ||
| ) | ||
| ) | ||
| time.sleep(period) |
There was a problem hiding this comment.
time.sleep(period) ignores work time; loop runs slower than target.
| def _xarm_limits(dof: int) -> JointLimits: | ||
| if dof == 7: | ||
| lower_deg = [-360, -118, -360, -233, -360, -97, -360] | ||
| upper_deg = [360, 118, 360, 11, 360, 180, 360] | ||
| elif dof == 6: | ||
| lower_deg = [-360, -118, -225, -11, -360, -97] | ||
| upper_deg = [360, 118, 11, 225, 360, 180] | ||
| else: | ||
| lower_deg = [-360, -118, -225, -97, -360] | ||
| upper_deg = [360, 118, 11, 180, 360] | ||
|
|
||
| lower_rad = [math.radians(d) for d in lower_deg[:dof]] | ||
| upper_rad = [math.radians(d) for d in upper_deg[:dof]] | ||
| max_vel_rad = math.radians(180.0) | ||
| return JointLimits( | ||
| position_lower=lower_rad, | ||
| position_upper=upper_rad, | ||
| velocity_max=[max_vel_rad] * dof, | ||
| ) | ||
|
|
||
|
|
|
@jca0 Plus these changes add a blueprint to add_blueprints so devs can easily run, and then need an e2e test that runs everything and puts the arm through some motions, as well as a few unit tests where needed |
| if not self.config.engine: | ||
| raise ValueError("engine is required for SimulationModule") |
There was a problem hiding this comment.
If engine is required, then its type should be str, not str | None = None.
There's no need to verify it here if you disallow the class from being instantiated with an invalid config.
| engine_key = self.config.engine.lower() | ||
| if engine_key not in self._ENGINES: | ||
| raise ValueError(f"Unknown simulation engine: {self.config.engine}") |
There was a problem hiding this comment.
I think it's better to use the type system.
EngineType = Literal["mujoco", "some_other"]
_ENGINES: dict[EngineType, SimulationEngine] = {
"mujoco": MujocoEngine,
}
...
# No need for None check, lower(), or key existence.
engine_cls = self._ENGINES[self.config.engine]| except Exception as exc: | ||
| import logging | ||
|
|
||
| logging.getLogger(self.__class__.__name__).warning( |
There was a problem hiding this comment.
We use logger = setup_logger() for logging.
But what exceptions are you expecting here? I would expect just this to work without if and try:
self._disposables.add(
Disposable(
self.joint_position_command.subscribe(self._on_joint_position_command)
)
)
If that doesn't work, it would be good to know why.
| return self._error_code, self._error_message | ||
|
|
||
| def write_joint_positions(self, positions: list[float], velocity: float = 1.0) -> bool: | ||
| _ = velocity |
There was a problem hiding this comment.
Is this to silence the velocity parameter not being used? If so, it's better to rename it to _velocity. I assume it's required in the interface?
Oh, velocity isn't used at all. Just delete it?
| if hasattr(self._engine, "hold_current_position"): | ||
| self._engine.hold_current_position() # type: ignore[attr-defined] | ||
| return True |
There was a problem hiding this comment.
Checking if a method exists with hasattr is often a sign that the code isn't ideal.
It looks like hold_current_position exists on MujocoEngine, but not on SimulationEngine and you're using hasattr to break the abstraction.
The better way to accomplish this (adding special handling just for Mujoco) without ignoring types is to check if self._engine is an instance of MujocoEngine .
But isn't it better to just add hold_current_position to SimulationEngine?
| with self._lock: | ||
| pos_targets = list(self._joint_position_targets) | ||
| with self._lock: |
There was a problem hiding this comment.
Why two? Seems like you're releasing and acquiring unnecessary the second time.
| limit = min(len(positions), self._num_joints) | ||
| for i in range(limit): | ||
| self._joint_position_targets[i] = float(positions[i]) |
There was a problem hiding this comment.
I see this pattern a lot in the code where you're copying values only up to a point.
But I think this just silently ignores errors. Is there a valid reason why write_joint_positions would be sent more positions than can be stored? I think you should let the code throw.
Co-authored-by: Paul Nechifor <paul@nechifor.net>
|
@greptile |
mujoco simulation module
created
mujoco parses XML file to determine robot and environment
command to test:
python3 -m dimos.simulation.sim_blueprintstodos: