"""Functions for extrinsic calibration of cameras.
.. versionadded:: 0.9
"""
from klampt import *
from klampt.math import vectorops,so3,se3
from klampt.io import resource
from klampt.model.trajectory import RobotTrajectory
import math
from klampt.control.robotinterface import RobotInterfaceBase
from klampt.model import sensing
from typing import Optional,Union,Sequence,List,Tuple,Dict,Any,Callable
from .typing import Vector2,Vector3,RigidTransform,Vector
import random
Key = Union[int,str]
[docs]class PointMarker:
"""A physical point marker that can be detected in an image.
Attributes:
link (int or str, optional): link on which this is located. None means
attached to the world frame.
size (float): used only for visual debugging, default 0.01 (1cm)
local_coordinates (3-vector, optional): position relative to link
variable (bool): whether this should be optimized
"""
def __init__(self,link=None,local_coordinates=None):
self.link = link # type: Optional[Key]
self.size = 0.01 # type: float
self.local_coordinates = local_coordinates # type: Vector3
self.variable = True # type: bool
[docs]class CameraInfo:
"""Stores info for a camera in a calibration.
Attributes:
link (int or str, optional): link on which this is located. None means
attached to the world frame.
intrinsics (dict): dict with keys 'fx', 'fy', 'cx', 'cy' giving the
camera intrinsics.
local_coordinates (se3 element, optional): pose relative to link
variable (bool): whether this should be optimized
"""
def __init__(self,link,intrinsics=None,local_coordinates=None):
self.link = link # type: Optional[Key]
self.intrinsics = intrinsics # type: dict
self.local_coordinates = local_coordinates # type: RigidTransform
self.variable = True # type: bool
[docs]class PointObservation:
"""A 3D point measurement in the frame of a camera.
X is to the right, Y is down, Z is forward. Units are in m.
"""
def __init__(self,value,camera_id,frame_id,marker_id,feature_id=None):
self.camera_id = camera_id
self.frame_id = frame_id
self.marker_id = marker_id
self.feature_id = feature_id
self.value = value
self.error = None
self.hand_eye_transform = None
[docs]class PixelObservation:
"""A 2D pixel measurement in the frame of a camera.
X is to the right, Y is down. Units are in pixels.
"""
def __init__(self,value,camera_id,frame_id,marker_id,feature_id=None):
self.camera_id = camera_id
self.frame_id = frame_id
self.marker_id = marker_id
self.feature_id = feature_id
self.value = value
self.error = None
self.hand_eye_transform = None
[docs]class RobotExtrinsicCalibration:
"""Stores a calibration dataset and helps perform extrinsic (hand-eye)
calibration.
Usage for a world-attached camera, Aruco marker on end effector::
calib = RobotExtrinsicCalibration()
calib.robot = #set the robot model here
calib.cameras[0] = CameraInfo(link=None,intrinsics=intrinsics)
ee_link = # set the ee link index or name here
calib.markers[tag_id] = TransformMarker(ee_link)
calib.visualize() #this will show a visualization of the markers and camera
calib.editTrajectory() #this will pop up an editing window
controller = MyRobotInterface() #setup the robot controller here
controller.initialize()
def take_camera_picture():
#TODO: take the picture from the camera here
return image
#TODO: setup aruco_detector... see Aruco documentation for more details
camparam = aruco.CameraParameters()
camparam.readFromXMLFile('cam_params.yml')
detector = aruco.MarkerDetector()
detector.setDictionary("my_markers.dict")
def detect_aruco_marker(image):
#return all detected markers
markers = aruco_detector.detect(image)
results = []
for marker in markers:
marker.calculateExtrinsics(0.15, camparam)
mtx = marker.getTransformMatrix()
Tmarker = se3.from_homogeneous(mtx) #convert to klampt.se3
results.append((marker.id,Tmarker))
return results
#if you just want to run detection on the pictures offline, use
#take_camera_picture = None and detect_aruco_marker = None
calib.executeTrajectory(controller,take_camera_picture,detect_aruco_marker)
calib.editCalibration() #visual editing of initial guesses
calib.calibrate() #optimize the calibration
calib.save('my_calibration.json')
To use reprojection error instead::
intrinsics = {'fx':fx,'fy':fy,'cx':cx,'cy':cy} #intrinsics are needed if you are using pixels
If you want to load a set of configurations and marker transforms from
disk manually::
... do calib robot and marker setup ...
configs = klampt.io.loader.load('calibration.configs')
transforms = []
with open('marker_transforms.txt','r') as f:
for line in f.readlines():
if len(line.strip()) > 0:
T = klampt.io.loader.read(line,'RigidTransform')
transforms.append(T)
for i,(q,T) in enumerate(zip(configs,transforms)):
calib.frames.append(None)
calib.configurations.append(q)
calib.addDetection(T,frame_id=i,marker_id=0)
calib.editCalibration() #visual editing of initial guesses
calib.calibration() #optimize the calibration
calib.save('my_calibration.json')
To visualize the calibration::
... do calib robot and marker setup ...
calib.load('my_calibration.json')
calib.visualize()
To fix the marker transform during optimization, e.g. at local coordinates
[x,y,z]::
calib.markers[0].local_coordinates = (so3.identity(),[x,y,z])
calib.markers[0].variable = False
calib.calibrate()
A similar flag can be used for fixing the camera.
"""
def __init__(self) -> None:
self.robot = None # type: Optional[RobotModel]
self.world = None # type: Optional[WorldModel]
self.trajectory = None # type: Optional[RobotTrajectory]
self.cameras = dict() # type: Dict[Key,CameraInfo]
self.camera_constraints = dict() #TODO: for bundle adjustment
self.markers = dict() # type: Dict[Key,Union[PointMarker,TransformMarker]]
self.frames = []
self.frame_camera_ids = [] # type: List[Key]
self.configurations = [] # type: List[Vector]
self.observations = [] # type: List[Union[PointObservation,TransformObservation,PixelObservation]]
[docs] def cameraFromRobot(self,sensors=None) -> None:
"""Sets up the camera(s) from the robot model.
Args:
sensors (str, int, list of str, or list of int, optional):
specifies one or more sensors to use. If None, just uses the
first camera. If 'all', uses all cameras.
"""
self.cameras = dict()
simsensors = []
if sensors is None or sensors=='all':
sindex = 0
while True:
s = self.robot.sensor(sindex)
if s.name() == '':
break
if s.type() == 'CameraSensor':
simsensors.append(s)
if sensors is None:
break
sindex += 1
else:
if not isinstance(sensors,(list,tuple)):
sensors = [sensors]
for sindex in sensors:
s = self.robot.sensor(sindex)
if s.name() == '':
raise ValueError("Invalid sensor {}, not in robot model".format(sindex))
if s.type() != 'CameraSensor':
raise ValueError("Invalid sensor {}, not a CameraSensor".format(sindex))
simsensors.append(s)
for s in simsensors:
caminfo = CameraInfo(int(s.getSetting('link')))
caminfo.intrinsics = sensing.camera_to_intrinsics(s,'json')
caminfo.local_coordinates = sensing.get_sensor_xform(s)
self.cameras[s.name()] = caminfo
[docs] def editTrajectory(self, world : WorldModel = None, name='calibration_trajectory') -> RobotTrajectory:
"""Returns a RobotTrajectory that passes through all calibration
configurations.
Note: the executed trajectory stops at all milestones.
"""
if world is None:
if self.world is not None:
world = self.world
else:
if self.robot is None:
raise ValueError("Can't edit trajectory without a robot model")
world = WorldModel()
world.add('temp',self.robot)
if self.trajectory is None and len(self.configurations) > 0:
self.trajectory = RobotTrajectory(self.robot, list(range(len(self.configurations))), self.configurations)
save,newtraj = resource.edit(name,self.trajectory, 'Trajectory', world=world, doedit=True)
if save:
self.trajectory = newtraj
return newtraj
return self.trajectory
[docs] def executeTrajectory(self,
controller : RobotInterfaceBase,
traj : RobotTrajectory,
camera_fn : Callable,
detect_fn : Callable=None,
speed=1.0,
wait=0.0) -> None:
"""Executes a trajectory on a controller, optionally taking images
along the way.
TODO: specify times to actually stop.
"""
import time
def wait_for_move():
moving = True
while moving:
controller.beginStep()
moving = controller.isMoving()
controller.endStep()
time.sleep(0.1)
for i in range(len(traj.milestones)):
print("Moving to milestone",i)
controller.beginStep()
controller.moveToPosition(traj.milestones[0],speed)
controller.endStep()
wait_for_move()
if wait != 0:
print("Waiting for",wait,"s")
time.sleep(wait*0.5)
if camera_fn:
print("Taking pictures")
if isinstance(camera_fn,dict):
for (k,fn) in camera_fn.items():
image = fn()
self.add_frame(image,controller.sensedPosition(),camera_id=k,detect_fn=detect_fn)
else:
image = camera_fn()
self.add_frame(image,controller.sensedPosition(),detect_fn=detect_fn)
time.sleep(wait*0.5)
[docs] def visualize(self, world : WorldModel=None, edit=False) -> None:
"""Pops up a visualization window showing the calibration setup.
"""
from klampt import vis
from klampt.model import sensing
if world is None:
if self.world is not None:
world = self.world
else:
if self.robot is None:
raise ValueError("Can't edit trajectory without a robot model")
world = WorldModel()
world.add('temp',self.robot)
vis.add('world',world)
camera_widgets = dict()
marker_widgets = dict()
sensors = dict()
for i,c in self.cameras.items():
try:
s = self.robot.sensor(i)
if s.name()=='':
raise ValueError()
except Exception:
#create a new camera on the robot
s = self.robot.addSensor(str(i),'CameraSensor')
sensing.intrinsics_to_camera(c.intrinsics,s,'json')
s.setSetting('zmin','0.1')
s.setSetting('zmax','10')
s.setSetting('link',str(_linkIndex(c.link,self.robot)))
sensing.set_sensor_xform(s,c.local_coordinates)
#for k in s.settings():
# print(k,s.getSetting(k))
sensors[i] = s
vis.add('Camera_widget {}'.format(i),s)
vis.add('Camera {}'.format(i),sensing.get_sensor_xform(s,self.robot))
if edit:
camera_widgets[i] = vis.edit('Camera {}'.format(i))
for i,m in self.markers.items():
if isinstance(m,PointMarker):
coords = _worldPosition(m.link,self.robot,m.local_coordinates)
vis.add('Marker {}'.format(i),coords,size=m.size)
if edit:
marker_widgets[i] = vis.edit('Marker {}'.format(i))
else:
coords = _worldTransform(m.link,self.robot,m.local_coordinates)
vis.add('Marker {}'.format(i),coords,length=m.size)
if edit:
marker_widgets[i] = vis.edit('Marker {}'.format(i))
import time
vis.add('calibration_configs',self.configurations,color=(0,1,0,0.3))
vis.show()
re_read_cameras = []
re_read_markers = []
while vis.shown():
#TODO: spin between calibration configurations and show sensed observations + errors for each frame
#read edited camera / marker transforms
for k in re_read_cameras: #make sure to get released configuration
Tc = vis.getItemConfig('Camera {}'.format(k))
Tc = (Tc[:9],Tc[9:])
c = self.cameras[k]
c.local_coordinates = _localTransform(c.link,self.robot,Tc)
sensing.set_sensor_xform(sensors[k],c.local_coordinates)
re_read_cameras = []
for k,cw in camera_widgets.items():
if cw.hasFocus():
Tc = vis.getItemConfig('Camera {}'.format(k))
Tc = (Tc[:9],Tc[9:])
c = self.cameras[k]
c.local_coordinates = _localTransform(c.link,self.robot,Tc)
sensing.set_sensor_xform(sensors[k],c.local_coordinates)
re_read_cameras.append(k)
for k in re_read_markers: #make sure to get released configuration
Tm = vis.getItemConfig('Marker {}'.format(k))
m = self.markers[k]
if isinstance(m,PointMarker):
m.local_coordinates = _localPosition(m.link,self.robot,Tm)
else:
Tm = (Tm[:9],Tm[9:])
m.local_coordinates = _localTransform(m.link,self.robot,Tm)
re_read_cameras = []
for k,mw in marker_widgets.items():
if mw.hasFocus():
Tm = vis.getItemConfig('Marker {}'.format(k))
m = self.markers[k]
if isinstance(m,PointMarker):
m.local_coordinates = _localPosition(m.link,self.robot,Tm)
else:
Tm = (Tm[:9],Tm[9:])
m.local_coordinates = _localTransform(m.link,self.robot,Tm)
re_read_markers.append(k)
time.sleep(0.05)
[docs] def editCalibration(self, world : WorldModel = None) -> None:
for i,c in self.cameras.items():
if c.local_coordinates is None:
c.local_coordinates = se3.identity()
for i,m in self.markers.items():
if m.local_coordinates is None:
if isinstance(m,TransformMarker):
m.local_coordinates = se3.identity()
else:
m.local_coordinates = [0.0]*3
self.visualize(world,True)
[docs] def addFrame(self, frame, q : Vector=None, camera_id : Key=None, detect_fn:Callable=None) -> None:
"""
Add a frame and configuration, automatically using forward kinematics
to determine camera / marker transforms. This is the easiest way to add
frames and detections.
Args:
frame (Image): the image for the frame.
q (configuration, optional): the robot's configuration. If not
given, the observations' `hand_eye_transform` must be filled
in manually.
camera_id (int or str, optional): the camera ID, 0 by default.
detect_fn (callable, optional): if given, a function
`detect(frame)` which returns a list of detected observations.
Each observation can be a PixelObservation, PointObservation,
TransformObservation, or a (marker,value) pair. To indicate
a detected feature on a marker, a `((marker,feature),value)`
pair is returned.
"""
if q is not None:
if self.robot is None:
raise ValueError("Need to set robot first")
if len(q) != self.robot.numLinks():
raise ValueError("Invalid length of configuration")
frame_id = len(self.frames)
self.frames.append(frame)
self.configurations.append(q)
if camera_id is None:
camera_id = list(self.cameras.keys())[0]
self.frame_camera_ids.append(camera_id)
if detect_fn is not None:
detections = detect_fn(frame)
for det in detections:
if isinstance(det,(PixelObservation,PointObservation,TransformObservation)):
det.camera_id = camera_id
det.frame_id = frame_id
self.robot.setConfig(q)
camera_link = self.cameras[camera_id].link
marker_link = self.markers[det.marker_id].link
camera_link_transform = _linkTransform(camera_link,self.robot)
marker_link_transform = _linkTransform(marker_link,self.robot)
det.hand_eye_transform = se3.mul(se3.inv(camera_link_transform),marker_link_transform)
self.observations.append(det)
else:
marker,value = det
feature = None
if hasattr(marker,'__iter__'):
marker,feature = marker
self.addDetection(frame_id,marker,feature,value,camera_id)
[docs] def addDetection(self, value : Union[Vector2,Vector3,RigidTransform],
frame_id : int,
marker_id : Key,
feature_id : int=None,
camera_id : Key=None,
error : Union[float,Sequence[float]]=None) -> Union[PixelObservation,PointObservation,TransformObservation]:
"""Manually adds a detection for a given camera / frame / marker /
feature. Automatically determines a `hand_eye_transform` for the
observation if a robot and configuration are already provided for
this frame.
Args:
value (2-list, 3-list, or SE3 element): the pixel, point, or
transform of the detection, given in camera coordinates.
frame_id (int): the index of the frame
marker_id (int or str): the index of the marker
feature_id (int, optional): the index of the feature on the marker.
camera_id (int or str, optional): the camera that observed this
feature (default 0)
error (float or list, optional): the standard deviation of the
observation error. If a float is given, noise is assumed
to be isotropic.
Returns:
The configured detection object.
"""
if frame_id < 0 or frame_id >= len(self.frames):
raise ValueError("Invalid frame")
if marker_id not in self.markers:
raise ValueError("Invalid marker")
if camera_id is None:
camera_id = list(self.cameras.keys())[0]
if camera_id not in self.cameras:
raise ValueError("Invalid camera")
marker = self.markers[marker_id]
if feature_id is not None:
if not hasattr(marker,'local_features'):
raise ValueError("Invalid detection, references a feature in a point marker")
if feature_id < 0 or feature_id >= len(marker.local_features):
raise ValueError("Invalid detection, references a non-existent feature")
if len(value)==2 and not hasattr(value[0],'__iter__'):
self.observations.append(PixelObservation(value,camera_id,frame_id,marker_id,feature_id))
elif isinstance(marker,TransformMarker):
self.observations.append(TransformObservation(value,camera_id,frame_id,marker_id))
else:
self.observations.append(PointObservation(value,camera_id,frame_id,marker_id,feature_id))
self.observations[-1].error = error
if self.robot is not None and frame_id < len(self.configurations) and self.configurations[frame_id] is not None:
self.robot.setConfig(self.configurations[frame_id])
camera_link = self.cameras[camera_id].link
marker_link = marker.link
camera_link_transform = _linkTransform(camera_link,self.robot)
marker_link_transform = _linkTransform(marker_link,self.robot)
self.observations[-1].hand_eye_transform = se3.mul(se3.inv(camera_link_transform),marker_link_transform)
return self.observations[-1]
[docs] def cameraTransforms(self,world=False) -> Dict[Key,RigidTransform]:
"""Returns a dict of camera transforms, in either world or link-
local coordinates. World coordinates use the robot's current
configuration.
"""
if world:
res = dict()
for i,c in self.cameras.items():
res[i] = _worldTransform(c.link,self.robot,c.local_coordinates)
return res
else:
return dict((i,(c.local_coordinates if c is not None else se3.identity())) for i,c in self.cameras.items())
[docs] def markerTransforms(self,world=False) -> Dict[int,RigidTransform]:
"""Returns a dict of marker transforms, in either world or link-
local coordinates. World coordinates use the robot's current
configuration.
"""
if world:
res = dict()
for i,m in self.markers.items():
if isinstance(m,PointMarker):
res[i] = _worldPosition(m.link,self.robot,m.local_coordinates)
else:
res[i] = _worldTransform(m.link,self.robot,m.local_coordinates)
return res
else:
res = dict()
for i,m in self.markers.items():
if isinstance(m,PointMarker):
res[i] = (m.local_coordinates if m is not None else [0.0]*3)
else:
res[i] = (m.local_coordinates if m is not None else se3.identity())
return res
[docs] def setCameraTransform(self, camera : Key, T : RigidTransform, world=False):
"""Sets the camera transform for the specified camera.
World coordinates use the robot's current configuration.
"""
c = self.cameras[camera]
if world:
T = _localTransform(c.link,self.robot,T)
c.local_coordinates = T
[docs] def setMarkerTransform(self, marker : Key, T : RigidTransform, world=False):
"""Sets the transform / coordinates for the specified marker.
World coordinates use the robot's current configuration.
"""
m = self.markers[marker]
if world:
if isinstance(m,TransformMarker):
T = _localTransform(m.link,self.robot,T)
else:
T = _localPosition(m.link,self.robot,T)
m.local_coordinates = T
[docs] def predictedObservations(self,camera_transforms=None,marker_transforms=None,noise=False) -> List[Union[PointObservation,PixelObservation,TransformObservation]]:
"""Returns a list of predicted observations for a given set of camera
and marker transforms.
Args:
camera_transforms (dict key->se3 elements, optional): keys matching
`self.cameras`. If None, then it is assumed that all cameras
have transforms (`local_coordinates`) specified.
marker_transforms (dict key->se3 elements, optional): keys matching
`self.markers`. If None, then it is assumed that all markers
have transforms (`local_coordinates`) specified.
noise (bool or float): if not False, then noise is injected into
the prediction. This is useful for generating synthetic data
and understanding the observation error model.
Returns:
A list of configured observation objects, corresponding to the
values in `self.observations`.
"""
if camera_transforms is None:
camera_transforms = dict()
if not isinstance(camera_transforms,dict):
if len(self.cameras)==1:
#be tolerant
camera_transforms = {list(self.cameras.keys())[0]:camera_transforms}
if any(i not in camera_transforms for i,c in self.cameras.items()):
camera_transforms = camera_transforms.copy()
for i,c in self.cameras.items():
if i not in camera_transforms:
camera_transforms[i] = c.local_coordinates
if c.local_coordinates is None:
raise ValueError("No transform specified for camera {}".format(i))
if marker_transforms is None:
marker_transforms = dict()
if not isinstance(marker_transforms,dict):
if len(self.markers)==1:
#be tolerant
marker_transforms = {list(self.markers.keys())[0]:marker_transforms}
if any(i not in marker_transforms for i,m in self.markers.items()):
marker_transforms = marker_transforms.copy()
for i,m in self.markers.items():
if i not in marker_transforms:
marker_transforms[i] = m.local_coordinates
if m.local_coordinates is None:
raise ValueError("No transform specified for marker {}".format(i))
if noise is False:
noise = 0
def _add_noise(value,error):
if noise == 0:
return
if error is None:
for i in range(len(value)):
value[i] += random.gauss(0,noise)
elif hasattr(error,'__iter__'):
for i in range(len(value)):
value[i] += random.gauss(0,noise*error[i])
else:
for i in range(len(value)):
value[i] += random.gauss(0,noise*error)
obs = []
for o in self.observations:
cam = self.cameras[o.camera_id]
if cam.intrinsics is not None:
fx,fy,cx,cy = cam.intrinsics['fx'],cam.intrinsics['fy'],cam.intrinsics['cx'],cam.intrinsics['cy']
marker = self.markers[o.marker_id]
Tm_local = marker_transforms[o.marker_id]
Tc_local = camera_transforms[o.camera_id]
if isinstance(marker,TransformMarker):
Tm_c = se3.mul(se3.inv(Tc_local),se3.mul(o.hand_eye_transform,Tm_local))
if isinstance(o,PixelObservation):
#projection
xyz = se3.apply(Tm_c,marker.local_features[o.feature_id])
pixel = [fx*xyz[0]/xyz[2] + cx,fy*xyz[1]/xyz[2] + cy]
_add_noise(pixel,o.error)
obs.append(PixelObservation(pixel,o.camera_id,o.frame_id,o.marker_id,o.feature_id))
elif isinstance(o,PointObservation):
#point
feature = o.feature_id
xyz = se3.apply(Tm_c,marker.local_features[o.feature_id])
_add_noise(xyz,o.error)
obs.append(PointObservation(xyz,o.camera_id,o.frame_id,o.marker_id,o.feature_id))
else:
#transform
if noise:
disturbance = [0]*6
_add_noise(disturbance,o.error)
Tdist = so3.from_moment(disturbance[:3]),disturbance[3:]
Tm_c = se3.mul(Tdist,Tm_c)
obs.append(TransformObservation(Tm_c,o.camera_id,o.frame_id,o.marker_id))
else:
xyz = se3.apply(se3.inv(Tc_local),se3.apply(o.hand_eye_transform,Tm_local))
if isinstance(o,PixelObservation):
#projection
pixel = [fx*xyz[0]/xyz[2] + cx,fy*xyz[1]/xyz[2] + cy]
_add_noise(pixel,o.error)
obs.append(PixelObservation(pixel,o.camera_id,o.frame_id,o.marker_id,o.feature_id))
elif isinstance(o,PointObservation):
#point
_add_noise(xyz,o.error)
obs.append(PointObservation(xyz,o.camera_id,o.frame_id,o.marker_id,o.feature_id))
else:
raise ValueError("Uh.. can't get a TransformObservation for a PointMarker")
return obs
[docs] def predictedResiduals(self,camera_transforms=None,marker_transforms=None) -> List[Vector]:
"""Returns a list of predicted residuals for a given set of camera
and marker transforms.
Args:
camera_transforms (list of se3 elements, optional): length
`len(self.cameras)`. If None, then it is assumed that all
cameras have transforms (`local_coordinates`) specified.
marker_transforms (list of se3 elements, optional): length
`len(self.markers)`. If None, then it is assumed that all
markers have transforms (`local_coordinates`) specified.
Returns:
A list of residuals (prediction - observed value)
corresponding to the values in `self.observations`.
"""
if camera_transforms is None:
camera_transforms = dict()
if not isinstance(camera_transforms,dict):
if len(self.cameras)==1:
#be tolerant
camera_transforms = {list(self.cameras.keys())[0]:camera_transforms}
if any(i not in camera_transforms for i,c in self.cameras.items()):
camera_transforms = camera_transforms.copy()
for i,c in self.cameras.items():
if i not in camera_transforms:
print("Reading camera transform",i)
camera_transforms[i] = c.local_coordinates
if c.local_coordinates is None:
raise ValueError("No transform specified for camera {}".format(i))
if marker_transforms is None:
marker_transforms = dict()
if not isinstance(marker_transforms,dict):
if len(self.markers)==1:
#be tolerant
marker_transforms = {list(self.markers.keys())[0]:marker_transforms}
if any(i not in marker_transforms for i,m in self.markers.items()):
marker_transforms = marker_transforms.copy()
for i,m in self.markers.items():
if i not in marker_transforms:
print("Reading marker transform",i)
marker_transforms[i] = m.local_coordinates
if m.local_coordinates is None:
raise ValueError("No transform specified for marker {}".format(i))
res = []
for o in self.observations:
cam = self.cameras[o.camera_id]
if cam.intrinsics is not None:
fx,fy,cx,cy = cam.intrinsics['fx'],cam.intrinsics['fy'],cam.intrinsics['cx'],cam.intrinsics['cy']
marker = self.markers[o.marker_id]
Tm_local = marker_transforms[o.marker_id]
Tc_local = camera_transforms[o.camera_id]
if isinstance(marker,TransformMarker):
Tm_c = se3.mul(se3.inv(Tc_local),se3.mul(o.hand_eye_transform,Tm_local))
if isinstance(o,PixelObservation):
#projection
xyz = se3.apply(Tm_c,marker.local_features[o.feature_id])
pixel = [fx*xyz[0]/xyz[2] + cx,fy*xyz[1]/xyz[2] + cy]
res.append(vectorops.sub(pixel,o.value))
elif isinstance(o,PointObservation):
#point
feature = o.feature_id
xyz = se3.apply(Tm_c,marker.local_features[o.feature_id])
res.append(vectorops.sub(xyz,o.value))
else:
#transform
res.append(se3.error(Tm_c,o.value))
else:
xyz = se3.apply(se3.inv(Tc_local),se3.apply(o.hand_eye_transform,Tm_local))
if isinstance(o,PixelObservation):
#projection
pixel = [fx*xyz[0]/xyz[2] + cx,fy*xyz[1]/xyz[2] + cy]
res.append(vectorops.sub(pixel,o.value))
elif isinstance(o,PointObservation):
#point
res.append(vectorops.sub(xyz,o.value))
else:
raise ValueError("Uh.. can't get a TransformObservation for a PointMarker")
return res
[docs] def predictedLogLikelihoods(self,camera_transforms=None,marker_transforms=None) -> List[float]:
"""Returns a list of predicted observation log-likelihoods for a
given set of camera and marker transforms.
Args:
camera_transforms (list of se3 elements, optional): length
`len(self.cameras)`. If None, then it is assumed that all
cameras have transforms (`local_coordinates`) specified.
marker_transforms (list of se3 elements, optional): length
`len(self.markers)`. If None, then it is assumed that all
markers have transforms (`local_coordinates`) specified.
Returns:
A list of log likelihoods of residuals corresponding to the
values in `self.observations`.
"""
residuals = self.predictedResiduals(camera_transforms,marker_transforms)
log2pi = math.log(math.pi*2)
ll = 0
for res,obs in zip(residuals,self.observations):
if obs.error is None:
ll += -vectorops.normSquared(res)*0.5 - len(res)*log2pi
elif hasattr(obs.error):
ll += -vectorops.normSquared(vectorops.div(res,obs.error))*0.5 - len(res)*log2pi - sum(math.log(e) for e in obs.error)
else:
ll += -vectorops.normSquared(vectorops.div(res,obs.error))*0.5 - len(res)*(log2pi+math.log(obs.error))
return ll
[docs] def optimize(self,maxIters=100,tol=1e-7,regularizationFactor=0.0,store=True) -> Tuple[float,Dict[Key,RigidTransform],Dict[Key,RigidTransform]]:
"""Optimizes the calibrated transforms.
Requires scipy.
Returns:
A tuple of the RMSE, camera transform dictionary, and
marker transform dictionary.
"""
import numpy as np
import scipy.optimize
observation_weights = []
for o in self.observations:
if o.error is None:
observation_weights.append(1)
elif hasattr(o.error,'__iter__'):
observation_weights.append(np.divide(1,o.error))
else:
observation_weights.append(1.0/o.error)
def pack_variables(camera_transforms,marker_transforms):
xs = []
for i,c in self.cameras.items():
T = camera_transforms[i]
if c.variable:
xs.append(so3.moment(T[0]))
xs.append(T[1])
for i,m in self.markers.items():
T = marker_transforms[i]
if m.variable:
if isinstance(m,TransformMarker):
xs.append(so3.moment(T[0]))
xs.append(T[1])
else:
xs.append(T)
return np.hstack(xs)
def unpack_variables(x):
#unpack variables from x
i = 0
camera_transforms = dict()
for k,c in self.cameras.items():
if c.variable:
camera_transforms[k] =(so3.from_moment(x[i:i+3]),x[i+3:i+6])
i += 6
else:
camera_transforms[k] = c.local_coordinates
marker_transforms = dict()
for k,m in self.markers.items():
if isinstance(m,TransformMarker):
marker_transforms[k] = (so3.from_moment(x[i:i+3]),x[i+3:i+6])
i+=6
else:
marker_transforms[k] = x[i:i+3]
i+=3
return camera_transforms,marker_transforms
def error_fn(x):
camera_transforms,marker_transforms = unpack_variables(x)
error = 0
for (e,w) in zip(self.predictedResiduals(camera_transforms,marker_transforms),observation_weights):
ew = np.multiply(e,w)
error += ew.dot(ew)
#print("Sqrt weighted error",math.sqrt(error))
if regularizationFactor != 0:
return error + regularizationFactor*(x-x0).dot(x-x0)
return error
camera_transforms0,marker_transforms0 = self.cameraTransforms(),self.markerTransforms()
x = pack_variables(camera_transforms0,marker_transforms0)
if regularizationFactor != 0.0:
x0 = x.copy()
print("Initial RMSE:",math.sqrt(sum(vectorops.normSquared(e) for e in self.predictedResiduals(camera_transforms0,marker_transforms0))))
#res = scipy.optimize.minimize(error_fn,x,method='Nelder-Mead',options={'maxiter':maxIters})
res = scipy.optimize.minimize(error_fn,x,method='BFGS',options={'maxiter':maxIters})
x = res.x
camera_transforms,marker_transforms = unpack_variables(x)
rmse = math.sqrt(sum(vectorops.normSquared(e) for e in self.predictedResiduals(camera_transforms,marker_transforms)))
print("Final RMSE:",rmse)
try:
import numdifftools as nd
Hf = nd.Hessian(error_fn)
H = Hf(x)
Hinv = np.linalg.inv(H)
Hinv_diag = np.sqrt(np.diag(Hinv)/len(self.observations))
i = 0
print()
print("Cramer-Rao estimate of standard errors for cameras")
for k,c in self.cameras.items():
if c.variable:
print(k,":",Hinv_diag[i:i+3],"(r)",Hinv_diag[i+3:i+6],'(t)')
i += 6
print("Cramer-Rao estimate of standard errors for markers")
for k,m in self.markers.items():
if m.variable:
if isinstance(m,PointMarker):
print(k,":",Hinv_diag[i:i+3])
i += 3
else:
print(k,":",Hinv_diag[i:i+3],"(r)",Hinv_diag[i+3:i+6],'(t)')
i += 6
except ImportError:
pass
if store:
for k,c in self.cameras.items():
if c.variable:
T = camera_transforms[k]
c.local_coordinates = T
for k,m in self.markers.items():
if m.variable:
T = marker_transforms[k]
m.local_coordinates = T
return rmse,camera_transforms,marker_transforms
[docs] def updateRobotSensors(self) -> None:
"""Uses the current camera transforms to update the robot model's
sensors.
"""
for i,c in self.cameras.items():
try:
s = self.robot.sensor(i)
if s.name()=='':
raise ValueError()
except Exception:
#create a new camera on the robot
s = self.robot.addSensor(str(i),'CameraSensor')
sensing.intrinsics_to_camera(c.intrinsics,s,'json')
s.setSetting('zmin','0.1')
s.setSetting('zmax','10')
s.setSetting('link',str(_linkIndex(c.link,self.robot)))
sensing.set_sensor_xform(s,c.local_coordinates)
[docs] def save(self,fn) -> None:
"""Saves to a JSON file on disk.
.. warning::
Images will not be saved.
Args:
fn (str or file): the file to save to.
"""
if isinstance(fn,str):
with open(fn,'w') as f:
return self.save(f)
import json
jsonobj = {}
if self.robot is not None:
jsonobj['robot'] = self.robot.getName()
jsonobj['cameras'] = dict((k,_CameraIO.toJson(c)) for k,c in self.cameras.items())
jsonobj['frames'] = [None for f in self.frames]
jsonobj['frame_camera_ids'] = self.frame_camera_ids
jsonobj['markers'] = dict((k,_MarkerIO.toJson(m)) for k,m in self.markers.items())
jsonobj['configurations'] = self.configurations
jsonobj['observations'] = [_ObservationIO.toJson(c) for c in self.observations]
json.dump(jsonobj)
[docs] def load(self,fn) -> None:
"""Loads from a JSON file on disk.
Args:
fn (str or file): the file to load from
"""
if isinstance(fn,str):
with open(fn,'r') as f:
return self.load(f)
import json
jsonobj = json.load(fn)
if 'robot' in jsonobj:
robotname = jsonobj['robot']
if self.robot is not None:
if self.robot.getName() != robotname:
import warnings
warnings.warn("Robot name in calibration file does not match robot object")
self.cameras = dict((k,_CameraIO.fromJson(c)) for k,c in jsonobj['cameras'].items())
self.frames = jsonobj['frames']
self.frame_camera_ids = jsonobj['frame_camera_ids']
self.configurations = jsonobj['configurations']
self.markers = dict((k,_MarkerIO.fromJson(c)) for k,c in jsonobj['markers'].items())
self.observations = [_ObservationIO.fromJson(c) for c in jsonobj['observations']]
def _linkIndex(link,robot):
if link is None:
return -1
if isinstance(link,int):
return link
else:
return robot.link(link).index
def _linkTransform(link,robot):
link = _linkIndex(link,robot)
if link < 0:
return se3.identity()
return robot.link(link).getTransform()
def _worldTransform(link,robot,localTransform):
link = _linkIndex(link,robot)
if localTransform is None:
localTransform = se3.identity()
if link < 0:
return localTransform
return se3.mul(robot.link(link).getTransform(),localTransform)
def _worldPosition(link,robot,localPosition):
link = _linkIndex(link,robot)
if localPosition is None:
localPosition = [0.0]*3
if link < 0:
return localPosition
return se3.mul(robot.link(link).getTransform(),localPosition)
def _localTransform(link,robot,worldTransform):
link = _linkIndex(link,robot)
if worldTransform is None:
worldTransform = se3.identity()
if link < 0:
return worldTransform
return se3.mul(se3.inv(robot.link(link).getTransform()),worldTransform)
def _localPosition(link,robot,worldPosition):
link = _linkIndex(link,robot)
if worldPosition is None:
worldPosition = [0.0]*3
if link < 0:
return worldPosition
return se3.mul(se3.inv(robot.link(link).getTransform()),worldPosition)
class _MarkerIO:
@staticmethod
def toJson(obj):
if isinstance(obj,PointMarker):
return {'type':'Point','link':obj.link,'local_coordinates':obj.local_coordinates,'variable':obj.variable}
else:
return {'type':'Transform','link':obj.link,'local_coordinates':obj.local_coordinates,'local_features':obj.local_features,'variable':obj.variable}
@staticmethod
def fromJson(obj):
if obj['type'] == 'Point':
res = PointMarker(obj['link'],obj['local_coordinates'])
res.variable = obj['variable']
else:
res = TransformMarker(obj['link'],obj['local_coordinates'],obj['local_features'])
res.variable = obj['variable']
return res
class _CameraIO:
@staticmethod
def toJson(obj):
return {'link':obj.link,'intrinsics':obj.intrinsics,'local_coordinates':obj.local_coordinates,'variable':obj.variable}
@staticmethod
def fromJson(obj):
res = CameraInfo(obj['link'],obj['intrinsics'],obj['local_coordinates'])
res.variable = obj['variable']
return res
class _ObservationIO:
@staticmethod
def toJson(obj):
if isinstance(obj,PixelObservation):
return {'type':'Pixel','value':obj.value,'camera_id':obj.camera_id,'frame_id':obj.frame_id,'marker_id':obj.marker_id,'feature_id':obj['feature_id']}
elif isinstance(obj,PointObservation):
return {'type':'Point','value':obj.value,'camera_id':obj.camera_id,'frame_id':obj.frame_id,'marker_id':obj.marker_id,'feature_id':obj['feature_id']}
else:
return {'type':'Transform','value':obj.value,'camera_id':obj.camera_id,'frame_id':obj.frame_id,'marker_id':obj.marker_id}
@staticmethod
def fromJson(obj):
if obj['type'] == 'Pixel':
return PixelObservation(obj['value'],obj['camera_id'],obj['frame_id'],obj['marker_id'],obj['feature_id'])
elif obj['type'] == 'Point':
return PointObservation(obj['value'],obj['camera_id'],obj['frame_id'],obj['marker_id'],obj['feature_id'])
elif obj['type'] == 'Transform':
return TransformObservation(obj['value'],obj['camera_id'],obj['frame_id'],obj['marker_id'])