Source code for mujoco_py.mjviewer

from threading import Lock
import glfw
from mujoco_py.builder import cymj
from mujoco_py.generated import const
import time
import copy
from multiprocessing import Process
from mujoco_py.utils import rec_copy, rec_assign
import imageio


[docs]class MjViewerBasic(cymj.MjRenderContextWindow): """ A simple display GUI showing the scene of an :class:`.MjSim` with a mouse-movable camera. :class:`.MjViewer` extends this class to provide more sophisticated playback and interaction controls. Parameters ---------- sim : :class:`.MjSim` The simulator to display. """ def __init__(self, sim): super().__init__(sim) self._gui_lock = Lock() self._button_left_pressed = False self._button_right_pressed = False self._last_mouse_x = 0 self._last_mouse_y = 0 framebuffer_width, _ = glfw.get_framebuffer_size(self.window) window_width, _ = glfw.get_window_size(self.window) self._scale = framebuffer_width * 1.0 / window_width glfw.set_cursor_pos_callback(self.window, self._cursor_pos_callback) glfw.set_mouse_button_callback( self.window, self._mouse_button_callback) glfw.set_scroll_callback(self.window, self._scroll_callback) glfw.set_key_callback(self.window, self.key_callback)
[docs] def render(self): """ Render the current simulation state to the screen or off-screen buffer. Call this in your main loop. """ if self.window is None: return elif glfw.window_should_close(self.window): exit(0) with self._gui_lock: super().render() glfw.poll_events()
def key_callback(self, window, key, scancode, action, mods): if action == glfw.RELEASE and key == glfw.KEY_ESCAPE: print("Pressed ESC") print("Quitting.") exit(0) def _cursor_pos_callback(self, window, xpos, ypos): if not (self._button_left_pressed or self._button_right_pressed): return # Determine whether to move, zoom or rotate view mod_shift = ( glfw.get_key(window, glfw.KEY_LEFT_SHIFT) == glfw.PRESS or glfw.get_key(window, glfw.KEY_RIGHT_SHIFT) == glfw.PRESS) if self._button_right_pressed: action = const.MOUSE_MOVE_H if mod_shift else const.MOUSE_MOVE_V elif self._button_left_pressed: action = const.MOUSE_ROTATE_H if mod_shift else const.MOUSE_ROTATE_V else: action = const.MOUSE_ZOOM # Determine dx = int(self._scale * xpos) - self._last_mouse_x dy = int(self._scale * ypos) - self._last_mouse_y width, height = glfw.get_framebuffer_size(window) with self._gui_lock: self.move_camera(action, dx / height, dy / height) self._last_mouse_x = int(self._scale * xpos) self._last_mouse_y = int(self._scale * ypos) def _mouse_button_callback(self, window, button, act, mods): self._button_left_pressed = ( glfw.get_mouse_button(window, glfw.MOUSE_BUTTON_LEFT) == glfw.PRESS) self._button_right_pressed = ( glfw.get_mouse_button(window, glfw.MOUSE_BUTTON_RIGHT) == glfw.PRESS) x, y = glfw.get_cursor_pos(window) self._last_mouse_x = int(self._scale * x) self._last_mouse_y = int(self._scale * y) def _scroll_callback(self, window, x_offset, y_offset): with self._gui_lock: self.move_camera(const.MOUSE_ZOOM, 0, -0.05 * y_offset)
[docs]class MjViewer(MjViewerBasic): """ Extends :class:`.MjViewerBasic` to add video recording, interactive time and interaction controls. The key bindings are as follows: - TAB: Switch between MuJoCo cameras. - H: Toggle hiding all GUI components. - SPACE: Pause/unpause the simulation. - RIGHT: Advance simulation by one step. - V: Start/stop video recording. - T: Capture screenshot. - I: Drop into ``ipdb`` debugger. - S/F: Decrease/Increase simulation playback speed. - C: Toggle visualization of contact forces (off by default). - D: Enable/disable frame skipping when rendering lags behind real time. - R: Toggle transparency of geoms. - M: Toggle display of mocap bodies. Parameters ---------- sim : :class:`.MjSim` The simulator to display. """ def __init__(self, sim): super().__init__(sim) self._ncam = sim.model.ncam self._paused = False # is viewer paused. # should we advance viewer just by one step. self._advance_by_one_step = False # Vars for recording video self._record_video = False self._video_frames = [] self._video_idx = 0 self._video_path = "/tmp/video_%07d.mp4" # vars for capturing screen self._image_idx = 0 self._image_path = "/tmp/frame_%07d.png" # run_speed = x1, means running real time, x2 means fast-forward times # two. self._run_speed = 1.0 self._loop_count = 0 self._render_every_frame = False self._show_mocap = True # Show / hide mocap bodies. self._transparent = False # Make everything transparent. # this variable is estamated as a running average. self._time_per_render = 1 / 60.0 self._hide_overlay = False # hide the entire overlay. self._user_overlay = {}
[docs] def render(self): """ Render the current simulation state to the screen or off-screen buffer. Call this in your main loop. """ def render_inner_loop(self): render_start = time.time() self._overlay.clear() if not self._hide_overlay: for k, v in self._user_overlay.items(): self._overlay[k] = v self._create_full_overlay() super().render() if self._record_video: frame = self._read_pixels_as_in_window() self._video_frames.append(frame) else: self._time_per_render = 0.9 * self._time_per_render + \ 0.1 * (time.time() - render_start) self._user_overlay = copy.deepcopy(self._overlay) # Render the same frame if paused. if self._paused: while self._paused: render_inner_loop(self) if self._advance_by_one_step: self._advance_by_one_step = False break else: # inner_loop runs "_loop_count" times in expectation (where "_loop_count" is a float). # Therefore, frames are displayed in the real-time. self._loop_count += self.sim.model.opt.timestep * self.sim.nsubsteps / \ (self._time_per_render * self._run_speed) if self._render_every_frame: self._loop_count = 1 while self._loop_count > 0: render_inner_loop(self) self._loop_count -= 1 # Markers and overlay are regenerated in every pass. self._markers[:] = [] self._overlay.clear()
def _read_pixels_as_in_window(self): # Reads pixels with markers and overlay from the same camera as screen. resolution = glfw.get_framebuffer_size( self.sim._render_context_window.window) if self.sim._render_context_offscreen is None: self.sim.render(*resolution) offscreen_ctx = self.sim._render_context_offscreen window_ctx = self.sim._render_context_window # Save markers and overlay from offscreen. saved = [copy.deepcopy(offscreen_ctx._markers), copy.deepcopy(offscreen_ctx._overlay), rec_copy(offscreen_ctx.cam)] # Copy markers and overlay from window. offscreen_ctx._markers[:] = window_ctx._markers[:] offscreen_ctx._overlay.clear() offscreen_ctx._overlay.update(window_ctx._overlay) rec_assign(offscreen_ctx.cam, rec_copy(window_ctx.cam)) img = self.sim.render(*resolution) # Restore markers and overlay to offscreen. offscreen_ctx._markers[:] = saved[0][:] offscreen_ctx._overlay.clear() offscreen_ctx._overlay.update(saved[1]) rec_assign(offscreen_ctx.cam, saved[2]) return img def _create_full_overlay(self): if self._render_every_frame: self.add_overlay(const.GRID_TOPLEFT, "", "") else: self.add_overlay(const.GRID_TOPLEFT, "Run speed = %.3f x real time" % self._run_speed, "[S]lower, [F]aster") self.add_overlay( const.GRID_TOPLEFT, "Ren[d]er every frame", "Off" if self._render_every_frame else "On") self.add_overlay(const.GRID_TOPLEFT, "Switch camera (#cams = %d)" % (self._ncam + 1), "[Tab] (camera ID = %d)" % self.cam.fixedcamid) self.add_overlay(const.GRID_TOPLEFT, "[C]ontact forces", "Off" if self.vopt.flags[ 10] == 1 else "On") self.add_overlay( const.GRID_TOPLEFT, "Referenc[e] frames", "Off" if self.vopt.frame == 1 else "On") self.add_overlay(const.GRID_TOPLEFT, "T[r]ansparent", "On" if self._transparent else "Off") self.add_overlay( const.GRID_TOPLEFT, "Display [M]ocap bodies", "On" if self._show_mocap else "Off") if self._paused is not None: if not self._paused: self.add_overlay(const.GRID_TOPLEFT, "Stop", "[Space]") else: self.add_overlay(const.GRID_TOPLEFT, "Start", "[Space]") self.add_overlay(const.GRID_TOPLEFT, "Advance simulation by one step", "[right arrow]") self.add_overlay(const.GRID_TOPLEFT, "[H]ide Menu", "") if self._record_video: ndots = int(7 * (time.time() % 1)) dots = ("." * ndots) + (" " * (6 - ndots)) self.add_overlay(const.GRID_TOPLEFT, "Record [V]ideo (On) " + dots, "") else: self.add_overlay(const.GRID_TOPLEFT, "Record [V]ideo (Off) ", "") if self._video_idx > 0: fname = self._video_path % (self._video_idx - 1) self.add_overlay(const.GRID_TOPLEFT, " saved as %s" % fname, "") self.add_overlay(const.GRID_TOPLEFT, "Cap[t]ure frame", "") if self._image_idx > 0: fname = self._image_path % (self._image_idx - 1) self.add_overlay(const.GRID_TOPLEFT, " saved as %s" % fname, "") self.add_overlay(const.GRID_TOPLEFT, "Start [i]pdb", "") if self._record_video: extra = " (while video is not recorded)" else: extra = "" self.add_overlay(const.GRID_BOTTOMLEFT, "FPS", "%d%s" % (1 / self._time_per_render, extra)) self.add_overlay(const.GRID_BOTTOMLEFT, "Solver iterations", str( self.sim.data.solver_iter + 1)) def key_callback(self, window, key, scancode, action, mods): if action != glfw.RELEASE: return elif key == glfw.KEY_TAB: # Switches cameras. self.cam.fixedcamid += 1 self.cam.type = const.CAMERA_FIXED if self.cam.fixedcamid >= self._ncam: self.cam.fixedcamid = -1 self.cam.type = const.CAMERA_FREE elif key == glfw.KEY_H: # hides all overlay. self._hide_overlay = not self._hide_overlay elif key == glfw.KEY_SPACE and self._paused is not None: # stops simulation. self._paused = not self._paused # Advances simulation by one step. elif key == glfw.KEY_RIGHT and self._paused is not None: self._advance_by_one_step = True self._paused = True elif key == glfw.KEY_V or \ (key == glfw.KEY_ESCAPE and self._record_video): # Records video. Trigers with V or if in progress by ESC. self._record_video = not self._record_video if not self._record_video and len(self._video_frames) > 0: # This include captures console, if in the top declaration. frames = [f for f in self._video_frames] fps = (1 / self._time_per_render) process = Process(target=save_video, args=(frames, self._video_path % self._video_idx, fps)) process.start() self._video_frames = [] self._video_idx += 1 elif key == glfw.KEY_T: # capture screenshot img = self._read_pixels_as_in_window() imageio.imwrite(self._image_path % self._image_idx, img) self._image_idx += 1 elif key == glfw.KEY_I: # drops in debugger. print('You can access the simulator by self.sim') import ipdb ipdb.set_trace() elif key == glfw.KEY_S: # Slows down simulation. self._run_speed /= 2.0 elif key == glfw.KEY_F: # Speeds up simulation. self._run_speed *= 2.0 elif key == glfw.KEY_C: # Displays contact forces. vopt = self.vopt vopt.flags[10] = vopt.flags[11] = not vopt.flags[10] elif key == glfw.KEY_D: # turn off / turn on rendering every frame. self._render_every_frame = not self._render_every_frame elif key == glfw.KEY_E: vopt = self.vopt vopt.frame = 1 - vopt.frame elif key == glfw.KEY_R: # makes everything little bit transparent. self._transparent = not self._transparent if self._transparent: self.sim.model.geom_rgba[:, 3] /= 5.0 else: self.sim.model.geom_rgba[:, 3] *= 5.0 elif key == glfw.KEY_M: # Shows / hides mocap bodies self._show_mocap = not self._show_mocap for body_idx1, val in enumerate(self.sim.model.body_mocapid): if val != -1: for geom_idx, body_idx2 in enumerate(self.sim.model.geom_bodyid): if body_idx1 == body_idx2: if not self._show_mocap: # Store transparency for later to show it. self.sim.extras[ geom_idx] = self.sim.model.geom_rgba[geom_idx, 3] self.sim.model.geom_rgba[geom_idx, 3] = 0 else: self.sim.model.geom_rgba[ geom_idx, 3] = self.sim.extras[geom_idx] super().key_callback(window, key, scancode, action, mods)
# Separate Process to save video. This way visualization is # less slowed down. def save_video(frames, filename, fps): writer = imageio.get_writer(filename, fps=fps) for f in frames: writer.append_data(f) writer.close()