Loading [MathJax]/extensions/tex2jax.js
iCub-main
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
python_simworld_control.py
Go to the documentation of this file.
1import collections
2import yarp
3
4yarp.Network.init() # Initialise YARP
5
6
8 """Class for controlling iCub simulator via its RPC world port."""
9
10 def __init__(self):
11 self._rpc_client = yarp.RpcClient()
12 self._port_name = "/WorldController-" + str(id(self)) + "/commands"
13 self._rpc_client.open(self._port_name)
14 self._rpc_client.addOutput("/icubSim/world")
15
16 # A dictionary to track simulator object IDs for all types of objects
17 self._sim_ids_counters = collections.defaultdict(lambda:0)
18
19 # A sequence to track internal object IDs. This list stores tuples (object type, simulator id)
20 # so that outside one does not have to remember the type of object.
21 self._objects = [ ]
22
23 def _execute(self, cmd):
24 """Execute an RPC command, returning obtained answer bottle."""
25 ans = yarp.Bottle()
26 self._rpc_client.write(cmd, ans)
27 return ans
28
29 def _is_success(self, ans):
30 """Check if RPC call answer Bottle indicates successfull execution."""
31 return ans.size() == 1 and ans.get(0).asVocab() == 27503 # Vocab for '[ok]'
32
34 """Prepare the "world del all" command bottle."""
35 result = yarp.Bottle()
36 result.clear()
37 map(result.addString, [ "world", "del", "all" ])
38 return result
39
40 def _prepare_create_command(self, obj, size, location, colour):
41 """Prepare an RPC command for creating an object in the simulator environment.
42
43 See Simulator Readme section 'Object Creation'
44
45 Parameters:
46 obj - object type string. 'sph', 'box', 'cyl' 'ssph', 'sbox' or 'scyl'.
47 size - list of values specifying the size of an object. Parameters depend on object type:
48 (s)box: [ x, y, z ]
49 (s)sph: [ radius ]
50 (s)cyl: [ radius, length ]
51 location - coordinates of the object location, [ x, y, z ]
52 colour - object colour in RGB (normalised), [ r, g, b ]
53 Returns:
54 yarp.Bottle with the command, ready to be sent to the rpc port of the simulator
55
56 """
57 result = yarp.Bottle()
58 result.clear()
59
60 map(result.addString, ["world", "mk", obj])
61 map(result.addDouble, size)
62 map(result.addDouble, location)
63 map(result.addDouble, colour)
64
65 return result
66
67 def _prepare_move_command(self, obj, obj_id, location):
68 """Prepare the "world set <obj> <xyz>" command bottle."""
69 result = yarp.Bottle()
70 result.clear()
71
72 map(result.addString, ["world", "set", obj])
73 result.addInt(obj_id)
74 map(result.addDouble, location)
75
76 return result
77
78 def _prepare_rot_command(self, obj, obj_id, rotation):
79 """Prepare the "world rot <obj> <rotxyz>" command bottle."""
80 result = yarp.Bottle()
81 result.clear()
82
83 map(result.addString, ["world", "rot", obj])
84 result.addInt(obj_id)
85 map(result.addDouble, rotation)
86
87 return result
88
89 def _prepare_get_command(self, obj, obj_id):
90 """Prepare the "world get <obj> <id>" command bottle."""
91 result = yarp.Bottle()
92 result.clear()
93
94 map(result.addString, ["world", "get", obj])
95 result.addInt(obj_id)
96
97 return result
98
99 def create_object(self, obj, size, location, colour):
100 """Create an object of a specified type, size, location and colour, returning internal object ID or -1 on error."""
101 cmd = self._prepare_create_command(obj, size, location, colour)
102
103 if self._is_success(self._execute(cmd)):
104 obj_sim_id = self._sim_ids_counters[obj] + 1 # iCub simulator IDs start from 1
105
106 # Update the counters
107 self._sim_ids_counters[obj] += 1
108 self._objects.append((obj, obj_sim_id))
109
110 # Internal object IDs are shared among all types of objects and start from 0;
111 # they are essentially indices of the self._objects sequence
112 return len(self._objects) - 1
113 else:
114 return -1 # error
115
116 def move_object(self, obj_id, location):
117 """Move an object specified by the internal id to another location."""
118 obj_desc = self._objects[obj_id]
119 return self._is_success(self._execute(self._prepare_move_command(obj_desc[0], obj_desc[1], location)))
120
121 def rotate_object(self, obj_id, rotation):
122 """Rotate an object specified by the internal id giving its absolute rotation."""
123 obj_desc = self._objects[obj_id]
124 return self._is_success(self._execute(self._prepare_rot_command(obj_desc[0], obj_desc[1], rotation)))
125
126 def get_object_location(self, obj_id):
127 """Obtain the object location from the simulator. Returns None on failure."""
128 obj_desc = self._objects[obj_id]
129 result = self._execute(self._prepare_get_command(obj_desc[0], obj_desc[1]))
130 if result.size() == 3:
131 return [ result.get(i).asDouble() for i in xrange(3) ] # 3-element list with xyz coordinates
132 else:
133 return None # An error occured
134
135 def del_all(self):
136 """Delete all objects from the simultor"""
137 result = self._is_success(self._execute(self._prepare_del_all_command()))
138
139 if result:
140 # Clear the counters
141 self._sim_ids_counters.clear()
142 del self._objects[:]
143
144 return result
145
146 def __del__(self):
147 try:
148 if self._rpc_client <> None:
149 self.del_all()
150 self._rpc_client.close()
151 del self._rpc_client
152 except AttributeError:
153 pass
_prepare_move_command(self, obj, obj_id, location)
_prepare_rot_command(self, obj, obj_id, rotation)
create_object(self, obj, size, location, colour)
_prepare_create_command(self, obj, size, location, colour)