Source code for pagoda.cooper

# -*- coding: utf-8 -*-

'''This module contains a Python implementation of a forward-dynamics solver.

For detailed information about the solver, its raison d'ĂȘtre, and how it works,
please see the documentation for the :class:`World` class.

Further comments and documentation are available in this source file. Eventually
I hope to integrate these comments into some sort of online documentation for
the package as a whole.
'''

from __future__ import division, print_function, absolute_import

import climate
import numpy as np
import ode
import re

from . import physics
from . import skeleton

logging = climate.get_logger(__name__)


[docs]class Markers: ''' ''' DEFAULT_CFM = 1e-6 DEFAULT_ERP = 0.3
[docs] def __init__(self, world): self.world = world self.jointgroup = ode.JointGroup() self.bodies = {} self.joints = {} self.targets = {} self.offsets = {} self.channels = {} self.data = None self.cfms = None self.erp = Markers.DEFAULT_ERP # these arrays are derived from the data array. self.visibility = None self.positions = None self.velocities = None self._frame_no = -1
@property def num_frames(self): '''Return the number of frames of marker data.''' return self.data.shape[0] @property def num_markers(self): '''Return the number of markers in each frame of data.''' return self.data.shape[1] @property def labels(self): '''Return the names of our marker labels in canonical order.''' return sorted(self.channels, key=lambda c: self.channels[c]) def __iter__(self): return iter(self.data) def __getitem__(self, idx): return self.data[idx] def _map_labels_to_channels(self, labels): if isinstance(labels, str): labels = labels.strip().split() if isinstance(labels, (tuple, list)): return dict((c, i) for i, c in enumerate(labels)) return labels or {}
[docs] def load_csv(self, filename, start_frame=10, max_frames=int(1e300)): '''Load marker data from a CSV file. The file will be imported using Pandas, which must be installed to use this method. (``pip install pandas``) The first line of the CSV file will be used for header information. The "time" column will be used as the index for the data frame. There must be columns named 'markerAB-foo-x','markerAB-foo-y','markerAB-foo-z', and 'markerAB-foo-c' for marker 'foo' to be included in the model. Parameters ---------- filename : str Name of the CSV file to load. ''' import pandas as pd compression = None if filename.endswith('.gz'): compression = 'gzip' df = pd.read_csv(filename, compression=compression).set_index('time').fillna(-1) # make sure the data frame's time index matches our world. assert self.world.dt == pd.Series(df.index).diff().mean() markers = [] for c in df.columns: m = re.match(r'^marker\d\d-(.*)-c$', c) if m: markers.append(m.group(1)) self.channels = self._map_labels_to_channels(markers) cols = [c for c in df.columns if re.match(r'^marker\d\d-.*-[xyzc]$', c)] self.data = df[cols].values.reshape((len(df), len(markers), 4))[start_frame:] self.data[:, :, [1, 2]] = self.data[:, :, [2, 1]] logging.info('%s: loaded marker data %s', filename, self.data.shape) self.process_data() self.create_bodies()
[docs] def load_c3d(self, filename, start_frame=0, max_frames=int(1e300)): '''Load marker data from a C3D file. The file will be imported using the c3d module, which must be installed to use this method. (``pip install c3d``) Parameters ---------- filename : str Name of the C3D file to load. start_frame : int, optional Discard the first N frames. Defaults to 0. max_frames : int, optional Maximum number of frames to load. Defaults to loading all frames. ''' import c3d with open(filename, 'rb') as handle: reader = c3d.Reader(handle) logging.info('world frame rate %s, marker frame rate %s', 1 / self.world.dt, reader.point_rate) # set up a map from marker label to index in the data stream. self.channels = self._map_labels_to_channels([ s.strip() for s in reader.point_labels]) # read the actual c3d data into a numpy array. data = [] for i, (_, frame, _) in enumerate(reader.read_frames()): if i >= start_frame: data.append(frame[:, [0, 1, 2, 4]]) if len(data) > max_frames: break self.data = np.array(data) # scale the data to meters -- mm is a very common C3D unit. if reader.get('POINT:UNITS').string_value.strip().lower() == 'mm': logging.info('scaling point data from mm to m') self.data[:, :, :3] /= 1000. logging.info('%s: loaded marker data %s', filename, self.data.shape) self.process_data() self.create_bodies()
[docs] def process_data(self): '''Process data to produce velocity and dropout information.''' self.visibility = self.data[:, :, 3] self.positions = self.data[:, :, :3] self.velocities = np.zeros_like(self.positions) + 1000 for frame_no in range(1, len(self.data) - 1): prev = self.data[frame_no - 1] next = self.data[frame_no + 1] for c in range(self.num_markers): if -1 < prev[c, 3] < 100 and -1 < next[c, 3] < 100: self.velocities[frame_no, c] = ( next[c, :3] - prev[c, :3]) / (2 * self.world.dt) self.cfms = np.zeros_like(self.visibility) + self.DEFAULT_CFM
[docs] def create_bodies(self): '''Create physics bodies corresponding to each marker in our data.''' self.bodies = {} for label in self.channels: body = self.world.create_body( 'sphere', name='marker:{}'.format(label), radius=0.02) body.is_kinematic = True body.color = 0.9, 0.1, 0.1, 0.5 self.bodies[label] = body
[docs] def load_attachments(self, source, skeleton): '''Load attachment configuration from the given text source. The attachment configuration file has a simple format. After discarding Unix-style comments (any part of a line that starts with the pound (#) character), each line in the file is then expected to have the following format:: marker-name body-name X Y Z The marker name must correspond to an existing "channel" in our marker data. The body name must correspond to a rigid body in the skeleton. The X, Y, and Z coordinates specify the body-relative offsets where the marker should be attached: 0 corresponds to the center of the body along the given axis, while -1 and 1 correspond to the minimal (maximal, respectively) extent of the body's bounding box along the corresponding dimension. Parameters ---------- source : str or file-like A filename or file-like object that we can use to obtain text configuration that describes how markers are attached to skeleton bodies. skeleton : :class:`pagoda.skeleton.Skeleton` The skeleton to attach our marker data to. ''' self.targets = {} self.offsets = {} filename = source if isinstance(source, str): source = open(source) else: filename = '(file-{})'.format(id(source)) for i, line in enumerate(source): tokens = line.split('#')[0].strip().split() if not tokens: continue label = tokens.pop(0) if label not in self.channels: logging.info('%s:%d: unknown marker %s', filename, i, label) continue if not tokens: continue name = tokens.pop(0) bodies = [b for b in skeleton.bodies if b.name == name] if len(bodies) != 1: logging.info('%s:%d: %d skeleton bodies match %s', filename, i, len(bodies), name) continue b = self.targets[label] = bodies[0] o = self.offsets[label] = \ np.array(list(map(float, tokens))) * b.dimensions / 2 logging.info('%s <--> %s, offset %s', label, b.name, o)
[docs] def detach(self): '''Detach all marker bodies from their associated skeleton bodies.''' self.jointgroup.empty() self.joints = {}
[docs] def attach(self, frame_no): '''Attach marker bodies to the corresponding skeleton bodies. Attachments are only made for markers that are not in a dropout state in the given frame. Parameters ---------- frame_no : int The frame of data we will use for attaching marker bodies. ''' assert not self.joints for label, j in self.channels.items(): target = self.targets.get(label) if target is None: continue if self.visibility[frame_no, j] < 0: continue if np.linalg.norm(self.velocities[frame_no, j]) > 10: continue joint = ode.BallJoint(self.world.ode_world, self.jointgroup) joint.attach(self.bodies[label].ode_body, target.ode_body) joint.setAnchor1Rel([0, 0, 0]) joint.setAnchor2Rel(self.offsets[label]) joint.setParam(ode.ParamCFM, self.cfms[frame_no, j]) joint.setParam(ode.ParamERP, self.erp) joint.name = label self.joints[label] = joint self._frame_no = frame_no
[docs] def reposition(self, frame_no): '''Reposition markers to a specific frame of data. Parameters ---------- frame_no : int The frame of data where we should reposition marker bodies. Markers will be positioned in the appropriate places in world coordinates. In addition, linear velocities of the markers will be set according to the data as long as there are no dropouts in neighboring frames. ''' for label, j in self.channels.items(): body = self.bodies[label] body.position = self.positions[frame_no, j] body.linear_velocity = self.velocities[frame_no, j]
[docs] def distances(self): '''Get a list of the distances between markers and their attachments. Returns ------- distances : ndarray of shape (num-markers, 3) Array of distances for each marker joint in our attachment setup. If a marker does not currently have an associated joint (e.g. because it is not currently visible) this will contain NaN for that row. ''' distances = [] for label in self.labels: joint = self.joints.get(label) distances.append([np.nan, np.nan, np.nan] if joint is None else np.array(joint.getAnchor()) - joint.getAnchor2()) return np.array(distances)
[docs] def forces(self, dx_tm1=None): '''Return an array of the forces exerted by marker springs. Notes ----- The forces exerted by the marker springs can be approximated by:: F = kp * dx where ``dx`` is the current array of marker distances. An even more accurate value is computed by approximating the velocity of the spring displacement:: F = kp * dx + kd * (dx - dx_tm1) / dt where ``dx_tm1`` is an array of distances from the previous time step. Parameters ---------- dx_tm1 : ndarray An array of distances from markers to their attachment targets, measured at the previous time step. Returns ------- F : ndarray An array of forces that the markers are exerting on the skeleton. ''' cfm = self.cfms[self._frame_no][:, None] kp = self.erp / (cfm * self.world.dt) kd = (1 - self.erp) / cfm dx = self.distances() F = kp * dx if dx_tm1 is not None: bad = np.isnan(dx) | np.isnan(dx_tm1) F[~bad] += (kd * (dx - dx_tm1) / self.world.dt)[~bad] return F
[docs]class World(physics.World): '''Simulate a physics world that includes an articulated skeleton model. The "cooper" method, originally described by Cooper & Ballard (2012 Proc. Motion in Games), uses a forward physics simulator (here, the Open Dynamics Engine; ODE) to compute inverse motion quantities like angles and torques using motion-capture data and a structured, articulated model of the human skeleton. The prerequisites for this method are: - Record some motion-capture data from a human. This is expected to result in the locations, in world coordinates, of several motion-capture markers at regularly-spaced intervals over time. - Construct a simulated skeleton that matches the size and shape of the human to some reasonable degree of accuracy. The more accurate the skeleton, the more accurate the resulting measurements. In broad strokes, the cooper method proceeds in two stages: 1. :func:`Inverse Kinematics <inverse_kinematics>`. The motion-capture data are attached to the simulated skeleton using ball joints. These ball joints are configured so that their constraints (namely, placing both anchor points of the joint at the same location in space) are allowed to slip; ODE implements this slippage using a spring dynamics, which provides a natural mechanism for the articulated skeleton to interpolate the marker data as well as possible. At each frame during the first pass, the motion-capture markers are placed at the appropriate location in space, and the attached articulated skeleton "snaps" toward the markers using its inertia (from the motion in preceding frames) as well as the spring constraints provided by the marker joint slippage. At each frame of this process, the articulated skeleton can be queried to obtain joint angles for each degree of freedom. In addition, the markers can be queried to find their constraint slippage. 2. :func:`Inverse Dynamics <inverse_dynamics>`. The marker constraints are removed, and the joint angles computed in the first pass are used to constrain the skeleton's movements. At each frame during the second pass, the joints in the skeleton attempt to follow the angles computed in the first pass; a PID controller is used to convert the angular error value into a target angular velocity for each joint. The torques that ODE computes to solve this forward angle-following problem are returned as a result of the second pass. In general, the cooper model is a useful way of getting a physics simulator, a model of a human skeleton, and some motion-capture data to interact smoothly. Particularly useful for almost any simulations of human motion are the :func:`settle_to_markers` and :func:`follow_markers` methods. '''
[docs] def load_skeleton(self, filename, pid_params=None): '''Create and configure a skeleton in our model. Parameters ---------- filename : str The name of a file containing skeleton configuration data. pid_params : dict, optional If given, use this dictionary to set the PID controller parameters on each joint in the skeleton. See :func:`pagoda.skeleton.pid` for more information. ''' self.skeleton = skeleton.Skeleton(self) self.skeleton.load(filename, color=(0.3, 0.5, 0.9, 0.8)) if pid_params: self.skeleton.set_pid_params(**pid_params) self.skeleton.erp = 0.1 self.skeleton.cfm = 0
[docs] def load_markers(self, filename, attachments, max_frames=1e100): '''Load marker data and attachment preferences into the model. Parameters ---------- filename : str The name of a file containing marker data. This currently needs to be either a .C3D or a .CSV file. CSV files must adhere to a fairly strict column naming convention; see :func:`Markers.load_csv` for more information. attachments : str The name of a text file specifying how markers are attached to skeleton bodies. max_frames : number, optional Only read in this many frames of marker data. By default, the entire data file is read into memory. Returns ------- markers : :class:`Markers` Returns a markers object containing loaded marker data as well as skeleton attachment configuration. ''' self.markers = Markers(self) fn = filename.lower() if fn.endswith('.c3d'): self.markers.load_c3d(filename, max_frames=max_frames) elif fn.endswith('.csv') or fn.endswith('.csv.gz'): self.markers.load_csv(filename, max_frames=max_frames) else: logging.fatal('%s: not sure how to load markers!', filename) self.markers.load_attachments(attachments, self.skeleton)
[docs] def step(self, substeps=2): '''Advance the physics world by one step. Typically this is called as part of a :class:`pagoda.viewer.Viewer`, but it can also be called manually (or some other stepping mechanism entirely can be used). ''' # by default we step by following our loaded marker data. self.frame_no += 1 try: next(self.follower) except (AttributeError, StopIteration) as err: self.reset()
[docs] def reset(self): '''Reset the automatic process that gets called by :func:`step`. By default this follows whatever marker data is loaded into our model. Provide an override for this method to customize the default behavior of the :func:`step` method. ''' self.follower = self.follow_markers()
[docs] def settle_to_markers(self, frame_no=0, max_distance=0.05, max_iters=300, states=None): '''Settle the skeleton to our marker data at a specific frame. Parameters ---------- frame_no : int, optional Settle the skeleton to marker data at this frame. Defaults to 0. max_distance : float, optional The settling process will stop when the mean marker distance falls below this threshold. Defaults to 0.1m (10cm). Setting this too small prevents the settling process from finishing (it will loop indefinitely), and setting it too large prevents the skeleton from settling to a stable state near the markers. max_iters : int, optional Attempt to settle markers for at most this many iterations. Defaults to 1000. states : list of body states, optional If given, set the bodies in our skeleton to these kinematic states before starting the settling process. ''' if states is not None: self.skeleton.set_body_states(states) dist = None for _ in range(max_iters): for _ in self._step_to_marker_frame(frame_no): pass dist = np.nanmean(abs(self.markers.distances())) logging.info('settling to frame %d: marker distance %.3f', frame_no, dist) if dist < max_distance: return self.skeleton.get_body_states() for b in self.skeleton.bodies: b.linear_velocity = 0, 0, 0 b.angular_velocity = 0, 0, 0 return states
[docs] def follow_markers(self, start=0, end=1e100, states=None): '''Iterate over a set of marker data, dragging its skeleton along. Parameters ---------- start : int, optional Start following marker data after this frame. Defaults to 0. end : int, optional Stop following marker data after this frame. Defaults to the end of the marker data. states : list of body states, optional If given, set the states of the skeleton bodies to these values before starting to follow the marker data. ''' if states is not None: self.skeleton.set_body_states(states) for frame_no, frame in enumerate(self.markers): if frame_no < start: continue if frame_no >= end: break for states in self._step_to_marker_frame(frame_no): yield states
def _step_to_marker_frame(self, frame_no, dt=None): '''Update the simulator to a specific frame of marker data. This method returns a generator of body states for the skeleton! This generator must be exhausted (e.g., by consuming this call in a for loop) for the simulator to work properly. This process involves the following steps: - Move the markers to their new location: - Detach from the skeleton - Update marker locations - Reattach to the skeleton - Detect ODE collisions - Yield the states of the bodies in the skeleton - Advance the ODE world one step Parameters ---------- frame_no : int Step to this frame of marker data. dt : float, optional Step with this time duration. Defaults to ``self.dt``. Returns ------- states : sequence of state tuples A generator of a sequence of one body state for the skeleton. This generator must be exhausted for the simulation to work properly. ''' # update the positions and velocities of the markers. self.markers.detach() self.markers.reposition(frame_no) self.markers.attach(frame_no) # detect collisions. self.ode_space.collide(None, self.on_collision) # record the state of each skeleton body. states = self.skeleton.get_body_states() self.skeleton.set_body_states(states) # yield the current simulation state to our caller. yield states # update the ode world. self.ode_world.step(dt or self.dt) # clear out contact joints to prepare for the next frame. self.ode_contactgroup.empty()
[docs] def inverse_kinematics(self, start=0, end=1e100, states=None, max_force=20): '''Follow a set of marker data, yielding kinematic joint angles. Parameters ---------- start : int, optional Start following marker data after this frame. Defaults to 0. end : int, optional Stop following marker data after this frame. Defaults to the end of the marker data. states : list of body states, optional If given, set the states of the skeleton bodies to these values before starting to follow the marker data. max_force : float, optional Allow each degree of freedom in the skeleton to exert at most this force when attempting to maintain its equilibrium position. This defaults to 20N. Set this value higher to simulate a stiff skeleton while following marker data. Returns ------- angles : sequence of angle frames Returns a generator of joint angle data for the skeleton. One set of joint angles will be generated for each frame of marker data between `start` and `end`. ''' zeros = None if max_force > 0: self.skeleton.enable_motors(max_force) zeros = np.zeros(self.skeleton.num_dofs) for _ in self.follow_markers(start, end, states): if zeros is not None: self.skeleton.set_target_angles(zeros) yield self.skeleton.joint_angles
[docs] def inverse_dynamics(self, angles, start=0, end=1e100, states=None, max_force=100): '''Follow a set of angle data, yielding dynamic joint torques. Parameters ---------- angles : ndarray (num-frames x num-dofs) Follow angle data provided by this array of angle values. start : int, optional Start following angle data after this frame. Defaults to the start of the angle data. end : int, optional Stop following angle data after this frame. Defaults to the end of the angle data. states : list of body states, optional If given, set the states of the skeleton bodies to these values before starting to follow the marker data. max_force : float, optional Allow each degree of freedom in the skeleton to exert at most this force when attempting to follow the given joint angles. Defaults to 100N. Setting this value to be large results in more accurate following but can cause oscillations in the PID controllers, resulting in noisy torques. Returns ------- torques : sequence of torque frames Returns a generator of joint torque data for the skeleton. One set of joint torques will be generated for each frame of angle data between `start` and `end`. ''' if states is not None: self.skeleton.set_body_states(states) for frame_no, frame in enumerate(angles): if frame_no < start: continue if frame_no >= end: break self.ode_space.collide(None, self.on_collision) states = self.skeleton.get_body_states() self.skeleton.set_body_states(states) # joseph's stability fix: step to compute torques, then reset the # skeleton to the start of the step, and then step using computed # torques. thus any numerical errors between the body states after # stepping using angle constraints will be removed, because we # will be stepping the model using the computed torques. self.skeleton.enable_motors(max_force) self.skeleton.set_target_angles(angles[frame_no]) self.ode_world.step(self.dt) torques = self.skeleton.joint_torques self.skeleton.disable_motors() self.skeleton.set_body_states(states) self.skeleton.add_torques(torques) yield torques self.ode_world.step(self.dt) self.ode_contactgroup.empty()
[docs] def forward_dynamics(self, torques, start=0, states=None): '''Move the body according to a set of torque data.''' if states is not None: self.skeleton.set_body_states(states) for frame_no, torque in enumerate(torques): if frame_no < start: continue if frame_no >= end: break self.ode_space.collide(None, self.on_collision) self.skeleton.add_torques(torque) self.ode_world.step(self.dt) yield self.ode_contactgroup.empty()