Skip to content

mujoco sim module#1035

Open
jca0 wants to merge 48 commits intodevfrom
jing-sim
Open

mujoco sim module#1035
jca0 wants to merge 48 commits intodevfrom
jing-sim

Conversation

@jca0
Copy link

@jca0 jca0 commented Jan 16, 2026

mujoco simulation module

created

  • mujoco backend to bring up mujoco
  • mujoco manipulation interface to easily use with other robots
  • mujoco driver to make it a module/determine topics to publish and subscribe to

mujoco parses XML file to determine robot and environment

command to test: python3 -m dimos.simulation.sim_blueprints

todos:

  • fix mac mujoco viewer issue

@jca0 jca0 requested a review from a team January 16, 2026 01:34
@greptile-apps
Copy link

greptile-apps bot commented Jan 16, 2026

Greptile Overview

Greptile Summary

Implemented a comprehensive MuJoCo simulation module that provides a simulation backend for robotic manipulators. The implementation includes:

  • MuJoCo engine backend (mujoco_engine.py) with threaded simulation loop, proper state synchronization using locks, and support for position/velocity/effort control modes
  • Simulation-agnostic manipulator interface (sim_manip_interface.py) that wraps the engine and provides a uniform API matching hardware manipulator interfaces
  • Module wrapper (sim_module.py) with separate control and monitor loops for publishing joint states and robot states via LCM transport
  • XML parser (xml_parser.py) that extracts joint/actuator mappings from MuJoCo XML files, supporting joints, tendons, and actuators
  • Blueprint configuration for xarm7 simulation with end-to-end tests

The architecture is clean with clear separation of concerns: engine layer handles physics simulation, interface layer provides hardware abstraction, and module layer handles messaging/transport.

Confidence Score: 4/5

  • This PR is safe to merge with minor concerns about edge cases
  • The implementation is well-structured with proper threading, locking, and error handling. The code follows good practices with abstract base classes, clear separation of concerns, and comprehensive tests. Previous review comments identified issues that should be addressed, but the core architecture and implementation are sound.
  • No files require special attention - the previously identified issues in review threads should be addressed

Important Files Changed

Filename Overview
dimos/simulation/engines/mujoco_engine.py Core MuJoCo simulation engine - well-structured with proper threading, locking, and state management
dimos/simulation/engines/base.py Clean abstract base class defining simulation engine interface
dimos/simulation/manipulators/sim_manip_interface.py Adapter wrapper providing uniform manipulator API over simulation engine
dimos/simulation/manipulators/sim_module.py Module wrapper with control/monitor loops and message handling
dimos/simulation/utils/xml_parser.py Robust XML parser for MuJoCo joint/actuator metadata with proper recursion handling

Sequence Diagram

sequenceDiagram
    participant User
    participant SimModule as SimulationModule
    participant SimManip as SimManipInterface
    participant MujocoEngine as MujocoEngine
    participant MuJoCo as MuJoCo Library

    User->>SimModule: start()
    SimModule->>SimManip: connect()
    SimManip->>MujocoEngine: connect()
    MujocoEngine->>MuJoCo: load XML model
    MuJoCo-->>MujocoEngine: model + data
    MujocoEngine->>MujocoEngine: start sim_loop thread
    MujocoEngine-->>SimManip: connected
    SimManip-->>SimModule: connected
    SimModule->>SimModule: start control_loop thread
    SimModule->>SimModule: start monitor_loop thread

    loop Control Loop
        User->>SimModule: publish JointCommand
        SimModule->>SimModule: _on_joint_position_command
        SimModule->>SimManip: write_joint_positions
        SimManip->>MujocoEngine: write_joint_command
        MujocoEngine->>MujocoEngine: update targets
        MujocoEngine->>MuJoCo: mj_step
        MujocoEngine->>SimManip: read_joint_positions
        SimManip-->>SimModule: positions
        SimModule->>User: publish JointState
    end

    loop Monitor Loop
        SimModule->>SimManip: read_state
        SimManip-->>SimModule: state data
        SimModule->>User: publish RobotState
    end
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 154 to 157
def _on_joint_velocity_command(self, msg: JointCommand) -> None:
with self._command_lock:
self._pending_velocities = list(msg.positions)
self._pending_positions = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: JointCommand doesn't have a velocities field - it only has positions. This will cause an AttributeError when velocity commands are received.

Suggested change
def _on_joint_velocity_command(self, msg: JointCommand) -> None:
with self._command_lock:
self._pending_velocities = list(msg.positions)
self._pending_positions = None
def _on_joint_velocity_command(self, msg: JointCommand) -> None:
with self._command_lock:
self._pending_velocities = list(msg.positions)
self._pending_positions = None

@jca0 jca0 marked this pull request as draft January 16, 2026 01:59
jca0 and others added 3 commits January 15, 2026 23:41
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@jca0 jca0 marked this pull request as ready for review January 16, 2026 07:44
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a scalable approach. Having to build plugin for each oem manufacturer + sim engine is not maintainable.

The robot arms should not be specific to any SimEngine

Comment on lines 27 to 45
def _xarm_limits(dof: int) -> JointLimits:
if dof == 7:
lower_deg = [-360, -118, -360, -233, -360, -97, -360]
upper_deg = [360, 118, 360, 11, 360, 180, 360]
elif dof == 6:
lower_deg = [-360, -118, -225, -11, -360, -97]
upper_deg = [360, 118, 11, 225, 360, 180]
else:
lower_deg = [-360, -118, -225, -97, -360]
upper_deg = [360, 118, 11, 180, 360]

lower_rad = [math.radians(d) for d in lower_deg[:dof]]
upper_rad = [math.radians(d) for d in upper_deg[:dof]]
max_vel_rad = math.radians(180.0)
return JointLimits(
position_lower=lower_rad,
position_upper=upper_rad,
velocity_max=[max_vel_rad] * dof,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sim module should get a Robot config that has all its limits, assets etc.

Robot shouldn't need a registry to be loaded, but should be defined in the blueprint

model: str | None = None


class SimulationEngine(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice



@dataclass(frozen=True)
class RobotSpec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this extend some existing general robot spec @mustafab0 ? Yeah pretty sure this should subclass, and shouldnt this take urdf @jca0

for i in range(limit):
self._joint_position_targets[i] = float(positions[i])

def write_joint_velocities(self, velocities: list[float]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be changed to list of some velocity type. I assume Vector3

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't make sense to. Vector3 is 3D, Twist is 6D, joints can only go in 2 directions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally a JointState message

Both write_joint_position & write_joint_velocities can be refactored into 1 write_joint_command/etc and take in a standard JointState message that can set Pos/Vel/Effort

self._pending_positions = None

def _control_loop(self) -> None:
period = 1.0 / max(self._control_rate, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second 1.0 here is the minimum control rate so should be an env var called MIN CONTROL RATE or something

time.sleep(period)

def _monitor_loop(self) -> None:
period = 1.0 / max(self._monitor_rate, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above here

return [f"{self._joint_prefix}{i + 1}" for i in range(dof)]


def get_blueprint() -> dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return blueprint how we normally do idk if we support returning blueprint via dict like this or if converting this back to .blueprint() type even works.

joints=[float(p) for p in positions],
)
)
time.sleep(period)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WRONG

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.sleep(period) ignores work time; loop runs slower than target.

Comment on lines 27 to 47
def _xarm_limits(dof: int) -> JointLimits:
if dof == 7:
lower_deg = [-360, -118, -360, -233, -360, -97, -360]
upper_deg = [360, 118, 360, 11, 360, 180, 360]
elif dof == 6:
lower_deg = [-360, -118, -225, -11, -360, -97]
upper_deg = [360, 118, 11, 225, 360, 180]
else:
lower_deg = [-360, -118, -225, -97, -360]
upper_deg = [360, 118, 11, 180, 360]

lower_rad = [math.radians(d) for d in lower_deg[:dof]]
upper_rad = [math.radians(d) for d in upper_deg[:dof]]
max_vel_rad = math.radians(180.0)
return JointLimits(
position_lower=lower_rad,
position_upper=upper_rad,
velocity_max=[max_vel_rad] * dof,
)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad

@spomichter
Copy link
Contributor

@jca0 Plus these changes add a blueprint to add_blueprints so devs can easily run, and then need an e2e test that runs everything and puts the arm through some motions, as well as a few unit tests where needed

Comment on lines 70 to 71
if not self.config.engine:
raise ValueError("engine is required for SimulationModule")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If engine is required, then its type should be str, not str | None = None.

There's no need to verify it here if you disallow the class from being instantiated with an invalid config.

Comment on lines 74 to 76
engine_key = self.config.engine.lower()
if engine_key not in self._ENGINES:
raise ValueError(f"Unknown simulation engine: {self.config.engine}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to use the type system.

EngineType = Literal["mujoco", "some_other"]
_ENGINES: dict[EngineType, SimulationEngine] = {
    "mujoco": MujocoEngine,
}

...

# No need for None check, lower(), or key existence.
engine_cls = self._ENGINES[self.config.engine]

except Exception as exc:
import logging

logging.getLogger(self.__class__.__name__).warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use logger = setup_logger() for logging.

But what exceptions are you expecting here? I would expect just this to work without if and try:

                self._disposables.add(
                    Disposable(
                        self.joint_position_command.subscribe(self._on_joint_position_command)
                    )
                )

If that doesn't work, it would be good to know why.

return self._error_code, self._error_message

def write_joint_positions(self, positions: list[float], velocity: float = 1.0) -> bool:
_ = velocity
Copy link
Contributor

@paul-nechifor paul-nechifor Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to silence the velocity parameter not being used? If so, it's better to rename it to _velocity. I assume it's required in the interface?

Oh, velocity isn't used at all. Just delete it?

Comment on lines 149 to 151
if hasattr(self._engine, "hold_current_position"):
self._engine.hold_current_position() # type: ignore[attr-defined]
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking if a method exists with hasattr is often a sign that the code isn't ideal.

It looks like hold_current_position exists on MujocoEngine, but not on SimulationEngine and you're using hasattr to break the abstraction.

The better way to accomplish this (adding special handling just for Mujoco) without ignoring types is to check if self._engine is an instance of MujocoEngine .

But isn't it better to just add hold_current_position to SimulationEngine?

Comment on lines 93 to 95
with self._lock:
pos_targets = list(self._joint_position_targets)
with self._lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why two? Seems like you're releasing and acquiring unnecessary the second time.

Comment on lines 231 to 233
limit = min(len(positions), self._num_joints)
for i in range(limit):
self._joint_position_targets[i] = float(positions[i])
Copy link
Contributor

@paul-nechifor paul-nechifor Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this pattern a lot in the code where you're copying values only up to a point.

But I think this just silently ignores errors. Is there a valid reason why write_joint_positions would be sent more positions than can be stored? I think you should let the code throw.

@spomichter
Copy link
Contributor

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants