Source code for udaan.models.quadrotor_cspayload.mujoco

"""QuadrotorCsPayloadMujoco — MuJoCo physics backend for quadrotor with cable-suspended payload."""

import copy
import enum

import numpy as np

from ...manif import S2, SO3, TS2, TSO3
from ...utils.logging import get_logger
from ..mujoco import MujocoModel
from .base import QuadrotorCsPayloadBase

_logger = get_logger(__name__)


[docs] class QuadrotorCsPayloadMujoco(QuadrotorCsPayloadBase): """Quadrotor with cable-suspended payload backed by MuJoCo physics engine. Overrides step/reset to use MuJoCo instead of Euler ZOH. Supports three cable modeling approaches: - TENDON: spatial tendon constraint (fast, less realistic) - LINKS: rigid cylinder links with ball joints (slower, most stable) - CABLE: MuJoCo 2.3+ composite cable (experimental) """
[docs] class MODEL_TYPE(enum.Enum): LINKS = 0 TENDON = 1 CABLE = 2
[docs] def __init__(self, **kwargs): self.render = kwargs.get("render", False) super().__init__(**kwargs) self._mjMdl = None self._mj_quad_index = None self._mj_payload_index = None self._mj_cable_index = None self._model_type = self.MODEL_TYPE.LINKS self._attitude_zoh = kwargs.get("attitude_zoh", False) if "cable_model" in kwargs: cable_model = kwargs["cable_model"] if cable_model == "tendon": self._model_type = self.MODEL_TYPE.TENDON elif cable_model == "cable": self._model_type = self.MODEL_TYPE.CABLE _logger.warning( "cable model 'cable' is not officially supported; " "included for experimentation only" ) elif cable_model == "links": self._model_type = self.MODEL_TYPE.LINKS else: raise ValueError(f"Invalid cable_model type: {cable_model!r}") # Initialize the appropriate MuJoCo model if self._model_type == self.MODEL_TYPE.TENDON: self._init_tendon_model() elif self._model_type == self.MODEL_TYPE.LINKS: self._init_nlink_model() elif self._model_type == self.MODEL_TYPE.CABLE: self._init_composite_model() if self._force_type == self.FORCE_TYPE.PROP_FORCES: self._ctrl_index = 0 else: self._ctrl_index = 4 self._mjDt = 1.0 / 500.0 self._step_iter = max(1, int(self.dt / self._mjDt)) self._nFrames = 1 if self._attitude_zoh: self._step_iter, self._nFrames = self._nFrames, self._step_iter # Read params from MuJoCo model if self._mj_quad_index is not None: self.mass = copy.deepcopy(self._mjMdl.model.body_mass[self._mj_quad_index]) self.inertia = copy.deepcopy(self._mjMdl.model.body_inertia[self._mj_quad_index]) if self._mj_payload_index is not None: self.payload_mass = float( copy.deepcopy(self._mjMdl.model.body_mass[self._mj_payload_index]) ) # Reinit controllers with MuJoCo params self._init_default_controllers() if self.verbose: _logger.info("MuJoCo quadrotor-cspayload loaded (%s)", self._model_type.name)
# ─── Model initialization ────────────────────────────────────── def _init_tendon_model(self): self._mjMdl = MujocoModel(model_path="quad_payload_mj.xml", render=self.render) self._mj_quad_pos_index = 0 self._mj_quad_quat_index = 3 self._mj_quad_vel_index = 0 self._mj_quad_omega_index = 3 self._mj_payload_pos_index = 7 self._mj_payload_quat_index = 10 self._mj_payload_vel_index = 6 self._mj_payload_omega_index = 9 self._mj_cable_index = 0 self._mj_quad_index = 1 self._mj_payload_index = 2 def _init_nlink_model(self): self._mjMdl = MujocoModel(model_path="quad_flxcbl_mj.xml", render=self.render) self._mj_quad_pos_index = 0 self._mj_quad_quat_index = 3 self._mj_quad_vel_index = 0 self._mj_quad_omega_index = 3 self._mj_quad_index = 1 self._mj_payload_index = 27 self._mj_data__payload_pos_prev = np.zeros(3) self._mj_data__payload_vel_initialized = False def _init_composite_model(self): self._mjMdl = MujocoModel(model_path="quad_cable_mj.xml", render=self.render) self._mj_quad_pos_index = 0 self._mj_quad_quat_index = 3 self._mj_quad_vel_index = 0 self._mj_quad_omega_index = 3 self._mj_payload_pos_index = 167 self._mj_payload_quat_index = 170 self._mj_payload_vel_index = 126 self._mj_payload_omega_index = 129 # ─── Property setters (sync to MuJoCo) ───────────────────────── @QuadrotorCsPayloadBase.mass.setter def mass(self, value): QuadrotorCsPayloadBase.mass.fset(self, value) if self._mjMdl is not None and self._mj_quad_index is not None: self._mjMdl.model.body_mass[self._mj_quad_index] = value @QuadrotorCsPayloadBase.inertia.setter def inertia(self, value): QuadrotorCsPayloadBase.inertia.fset(self, value) if self._mjMdl is not None and self._mj_quad_index is not None: if value.ndim == 2: value = np.diag(value) self._mjMdl.model.body_inertia[self._mj_quad_index] = value @QuadrotorCsPayloadBase.payload_mass.setter def payload_mass(self, value): QuadrotorCsPayloadBase.payload_mass.fset(self, value) if self._mjMdl is not None and self._mj_payload_index is not None: self._mjMdl.model.body_mass[self._mj_payload_index] = value @QuadrotorCsPayloadBase.cable_length.setter def cable_length(self, value): QuadrotorCsPayloadBase.cable_length.fset(self, value) if self._mjMdl is not None and self._mj_cable_index is not None: from ..mujoco import mujoco self._mjMdl.model.tendon_length0[self._mj_cable_index] = value self._mjMdl.model.tendon_limited[self._mj_cable_index] = True self._mjMdl.model.tendon_range[self._mj_cable_index][1] = value self._mjMdl.model.tendon_lengthspring[self._mj_cable_index] = value mujoco.mj_tendon(self._mjMdl.model, self._mjMdl.data) # ─── Reset ─────────────────────────────────────────────────────
[docs] def reset(self, **kwargs): self.t = 0.0 self._mjMdl.reset() super().reset(**kwargs) if self._model_type == self.MODEL_TYPE.TENDON: self._reset_tendon_model() elif self._model_type == self.MODEL_TYPE.LINKS: self._reset_nlink_model() elif self._model_type == self.MODEL_TYPE.CABLE: self._reset_composite_model() self._query_latest_state()
def _reset_tendon_model(self): from scipy.spatial.transform import Rotation as sp_rot _qQ = sp_rot.from_matrix(np.asarray(self.state.orientation)).as_quat() quat_Q = np.array([_qQ[3], _qQ[0], _qQ[1], _qQ[2]]) i = self._mj_quad_pos_index self._mjMdl.data.qpos[i : i + 3] = self.state.position i = self._mj_quad_quat_index self._mjMdl.data.qpos[i : i + 4] = quat_Q i = self._mj_quad_vel_index self._mjMdl.data.qvel[i : i + 3] = self.state.velocity i = self._mj_quad_omega_index self._mjMdl.data.qvel[i : i + 3] = np.asarray(self.state.angular_velocity) i = self._mj_payload_pos_index self._mjMdl.data.qpos[i : i + 3] = self.state.payload_position i = self._mj_payload_quat_index self._mjMdl.data.qpos[i : i + 4] = np.array([1.0, 0.0, 0.0, 0.0]) i = self._mj_payload_vel_index self._mjMdl.data.qvel[i : i + 3] = self.state.payload_velocity i = self._mj_payload_omega_index self._mjMdl.data.qvel[i : i + 3] = np.zeros(3) def _reset_nlink_model(self): i = self._mj_quad_pos_index self._mjMdl.data.qpos[i : i + 3] = self.state.position self._mj_data__payload_pos_prev = np.zeros(3) self._mj_data__payload_vel_initialized = False def _reset_composite_model(self): # Composite cable reset is not fully implemented; _logger.warning("Composite cable reset is not supported") pass # ─── Step ──────────────────────────────────────────────────────
[docs] def step(self, u, desired_att=None): """Step MuJoCo simulation.""" for _ in range(self._step_iter): wrench = self._repackage_input(u, desired_att=desired_att) if self._force_type == self.FORCE_TYPE.PROP_FORCES: ctrl = self._wrench_to_propforces(wrench) else: ctrl = wrench self._mjMdl.data.ctrl[self._ctrl_index : self._ctrl_index + 4] = ctrl self._mjMdl._step_mujoco_simulation(self._nFrames) self._query_latest_state() self.t = self._mjMdl.data.time if self.render and self._mjMdl._viewer is not None: self._mjMdl._viewer.add_trail_point(self.state.position) self._mjMdl._viewer.add_trail_point( self.state.payload_position, key=1, rgba=[0.2, 0.2, 1.0, 0.6] )
# ─── State query ─────────────────────────────────────────────── def _query_latest_state(self): """Sync state from MuJoCo data.""" if self._model_type == self.MODEL_TYPE.TENDON: self._query_state_tendon_model() elif self._model_type == self.MODEL_TYPE.CABLE: self._query_state_composite_model() elif self._model_type == self.MODEL_TYPE.LINKS: self._query_state_nlink_model() self.t = self._mjMdl.data.time # Quadrotor state q = copy.deepcopy(self._mj_data__quad_quat) self.state.position = copy.deepcopy(self._mj_data__quad_pos) self.state.velocity = copy.deepcopy(self._mj_data__quad_vel) self.state.orientation = SO3(self._mjMdl._quat2rot(q)) self.state.angular_velocity = TSO3(np.array(copy.deepcopy(self._mj_data__quad_angvel))) # Payload state self.state.payload_position = copy.deepcopy(self._mj_data__payload_pos) self.state.payload_velocity = copy.deepcopy(self._mj_data__payload_vel) # Derive cable attitude and angular velocity from positions p = self.state.payload_position - self.state.position norm_p = np.linalg.norm(p) if norm_p > 1e-8: self.state.cable_attitude = S2(p / norm_p) dq = self.state.payload_velocity - self.state.velocity self.state.cable_angular_velocity = TS2(np.cross(np.asarray(self.state.cable_attitude), dq)) def _query_state_tendon_model(self): i = self._mj_quad_pos_index self._mj_data__quad_pos = self._mjMdl.data.qpos[i : i + 3] i = self._mj_quad_quat_index self._mj_data__quad_quat = self._mjMdl.data.qpos[i : i + 4] i = self._mj_quad_vel_index self._mj_data__quad_vel = self._mjMdl.data.qvel[i : i + 3] i = self._mj_quad_omega_index self._mj_data__quad_angvel = self._mjMdl.data.qvel[i : i + 3] i = self._mj_payload_pos_index self._mj_data__payload_pos = self._mjMdl.data.qpos[i : i + 3] i = self._mj_payload_vel_index self._mj_data__payload_vel = self._mjMdl.data.qvel[i : i + 3] def _query_state_composite_model(self): i = self._mj_quad_pos_index self._mj_data__quad_pos = self._mjMdl.data.qpos[i : i + 3] i = self._mj_quad_quat_index self._mj_data__quad_quat = self._mjMdl.data.qpos[i : i + 4] i = self._mj_quad_vel_index self._mj_data__quad_vel = self._mjMdl.data.qvel[i : i + 3] i = self._mj_quad_omega_index self._mj_data__quad_angvel = self._mjMdl.data.qvel[i : i + 3] i = self._mj_payload_pos_index self._mj_data__payload_pos = self._mjMdl.data.qpos[i : i + 3] i = self._mj_payload_vel_index self._mj_data__payload_vel = self._mjMdl.data.qvel[i : i + 3] def _query_state_nlink_model(self): i = self._mj_quad_pos_index self._mj_data__quad_pos = self._mjMdl.data.qpos[i : i + 3] i = self._mj_quad_quat_index self._mj_data__quad_quat = self._mjMdl.data.qpos[i : i + 4] i = self._mj_quad_vel_index self._mj_data__quad_vel = self._mjMdl.data.qvel[i : i + 3] i = self._mj_quad_omega_index self._mj_data__quad_angvel = self._mjMdl.data.qvel[i : i + 3] self._mj_data__payload_pos = self._mjMdl.data.xpos[self._mj_payload_index] self._mj_data__payload_vel = ( self._mj_data__payload_pos - self._mj_data__payload_pos_prev ) / self._mjMdl.model.opt.timestep if not self._mj_data__payload_vel_initialized: self._mj_data__payload_vel = np.zeros(3) self._mj_data__payload_vel_initialized = True self._mj_data__payload_pos_prev = copy.deepcopy(self._mj_data__payload_pos)