icub-client
interactionSAMModel.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import matplotlib
4 matplotlib.use("TkAgg")
5 import matplotlib.pyplot as plt
6 import sys
7 import time
8 from ConfigParser import SafeConfigParser
9 from SAM.SAM_Core import SAMDriver as Driver
10 from SAM.SAM_Core.SAM_utils import initialiseModels, timeout
11 import readline
12 import warnings
13 import numpy as np
14 import yarp
15 import logging
16 from os.path import join
17 import os
18 from operator import itemgetter
19 import thread
20 warnings.simplefilter("ignore")
21 np.set_printoptions(precision=2)
22 
23 
24 class interactionSAMModel(yarp.RFModule):
25  """Generic interaction function
26 
27  Description:
28  Generic interaction function that carries out live collection of data, classification of said data, interaction with other modules requesting the classification and generation of training outputs for recall. The parameters for the interaction function are loaded from the config file specified in the `config_path` parameter of the default context file for samSupervisor.py. An example of the configuration structure is shown below.
29 
30  Example:
31  [Model Name]
32  dataIn = <portName/ofInputData>:i <dataType of Port>
33  dataOut = <portName/ofOutputData>:o <dataType of Port>
34  rpcBase = <portName/ofRpcPort>
35  call_sign = ask_<X>_label,ask_<X>_instance
36  collectionMethod = collectionMethod lengthOfBuffer
37 
38  [Faces]
39  dataIn = /sam/faces/imageData:i ImageRgb
40  dataOut = /sam/faces/imageData:o ImageMono
41  rpcBase = /sam/faces/rpc
42  callSign = ask_face_label,ask_face_instance
43  collectionMethod = future_buffered 3
44 
45  Args:
46  dataIn : The port name for the port that will received the data to be classified and the dataType to be expected.
47  dataOut : The port name for the port that will output generated data and the dataType to be transmitted.
48  rpcBase : The rpc port that will receive external requests. This is usually controlled by samSupervisor.py.
49  call_sign : The commands that will trigger a classify from data event or a generate from label event.
50  collectionMethod : The collection method to be used. Either `buffered`, `future_buffered` or `continuous`. Followed by an integer indicating the length of the buffer to be used. In the case of `continuous` the buffer length is the maximum number of classification labels to be stored in a First In Last Out (FILO) configuration. Otherwise the buffer length indicates the number of data frames that are required for a classification to take place.
51 
52  """
53  def __init__(self):
54  """
55  Initialisation of the interaction function
56  """
57  yarp.RFModule.__init__(self)
58  self.mm = None
59  self.dataPath = None
60  self.configPath = None
61  self.modelPath = None
62  self.driverName = ''
63  self.model_type = None
64  self.model_mode = None
65  self.textLabels = None
66  self.classifiers = None
67  self.classif_thresh = None
68  self.verbose = None
69  self.Quser = None
70  self.listOfModels = None
71  self.portsList = []
72  self.svPort = None
73  self.labelPort = None
74  self.instancePort = None
75  self.callSignList = []
76  self.inputBottle = yarp.Bottle()
77  self.outputBottle = yarp.Bottle()
78  self.portNameList = []
79 
80  self.rpcConnected = False
81  self.dataInConnected = False
82  self.dataOutConnected = False
83  self.collectionMethod = ''
84  self.bufferSize = None
85 
86  self.falseCount = 0
87  self.noDataCount = 0
88  self.inputType = None
89  self.outputType = None
90  self.errorRate = 50
91  self.dataList = []
93  self.closeFlag = False
94  self.instancePortName = ''
95  self.labelPortName = ''
96  self.verboseSetting = False
97  self.exitFlag = False
98  self.recordingFile = ''
99  self.additionalInfoDict = dict()
100  self.modelLoaded = False
101  self.attentionMode = 'continue'
102  self.baseLogFileName = 'interactionErrorLog'
103  self.windowedMode = True
104  self.modelRoot = None
105  self.eventPort = None
106  self.eventPortName = None
107  self.classTimestamps = None
108  self.probClassList = None
109  self.recency = None
110  self.useRecentClassTime = True
111  self.my_mutex = thread.allocate_lock()
113  def configure(self, rf):
114  """
115  Configure interactionSAMModel yarp module
116 
117  Args:
118  rf: Yarp RF context input
119  argv_1 : String containing data path.
120  argv_2 : String containing model path.
121  argv_3 : String containing config file path (from `config_path` parameter of samSupervisor config file).
122  argv_4 : String driver name corresponding to a valid driver present in SAM_Drivers folder.
123  argv_5 : String `'True'` or `'False'` to switch formatting of logging depending on whether interaction is logging to a separate window or to the stdout of another process.
124  Returns:
125  Boolean indicating success or no success in initialising the yarp module
126  """
127  stringCommand = 'from SAM.SAM_Drivers import ' + sys.argv[4] + ' as Driver'
128  exec stringCommand
129 
130  self.mm = [Driver()]
131  self.dataPath = sys.argv[1]
132  self.modelPath = sys.argv[2]
133  self.driverName = sys.argv[4]
134  self.configPath = sys.argv[3]
135  self.windowedMode = sys.argv[5] == 'True'
136  self.modelRoot = self.dataPath.split('/')[-1]
137 
138  file_i = 0
139  loggerFName = join(self.dataPath, self.baseLogFileName + '_' + str(file_i) + '.log')
140 
141  # check if file exists
142  while os.path.isfile(loggerFName) and os.path.getsize(loggerFName) > 0:
143  loggerFName = join(self.dataPath, self.baseLogFileName + '_' + str(file_i) + '.log')
144  file_i += 1
145 
146  if self.windowedMode:
147  logFormatter = logging.Formatter("[%(levelname)s] %(message)s")
148  else:
149  logFormatter = logging.Formatter("\033[31m%(asctime)s [%(name)-33s] [%(levelname)8s] %(message)s\033[0m")
150 
151  rootLogger = logging.getLogger('interaction ' + self.driverName)
152  rootLogger.setLevel(logging.DEBUG)
153 
154  fileHandler = logging.FileHandler(loggerFName)
155  fileHandler.setFormatter(logFormatter)
156  rootLogger.addHandler(fileHandler)
157 
158  consoleHandler = logging.StreamHandler()
159  consoleHandler.setFormatter(logFormatter)
160  rootLogger.addHandler(consoleHandler)
161  logging.root = rootLogger
162 
163  off = 17
164  logging.info('Arguments: ' + str(sys.argv))
165  logging.info(stringCommand)
166  logging.info('Using log' + str(loggerFName))
167  logging.info('-------------------')
168  logging.info('Interaction Settings:')
169  logging.info('Data Path: '.ljust(off) + str(self.dataPath))
170  logging.info('Model Path: '.ljust(off) + str(self.modelPath))
171  logging.info('Config Path: '.ljust(off) + str(self.configPath))
172  logging.info('Driver: '.ljust(off) + str(self.driverName))
173  logging.info('-------------------')
174  logging.info('Configuring Interaction...')
175  logging.info('')
176 
177  # parse settings from config file
178  parser2 = SafeConfigParser()
179  parser2.read(self.configPath)
180  proposedBuffer = 5
181  if self.modelRoot in parser2.sections():
182  self.portNameList = parser2.items(self.dataPath.split('/')[-1])
183  logging.info(str(self.portNameList))
184  self.portsList = []
185  for j in range(len(self.portNameList)):
186  if self.portNameList[j][0] == 'rpcbase':
187  self.portsList.append(yarp.Port())
188  self.portsList[j].open(self.portNameList[j][1]+":i")
189  self.svPort = j
190  self.attach(self.portsList[j])
191  elif self.portNameList[j][0] == 'callsign':
192  # should check for repeated call signs by getting list from samSupervisor
193  self.callSignList = self.portNameList[j][1].split(',')
194  elif self.portNameList[j][0] == 'collectionmethod':
195  self.collectionMethod = self.portNameList[j][1].split(' ')[0]
196  try:
197  proposedBuffer = int(self.portNameList[j][1].split(' ')[1])
198  except ValueError:
199  logging.error('collectionMethod bufferSize is not an integer')
200  logging.error('Should be e.g: collectionMethod = buffered 3')
201  return False
202 
203  if self.collectionMethod not in ['buffered', 'continuous', 'future_buffered']:
204  logging.error('collectionMethod should be set to buffered / continuous / future_buffered')
205  return False
206  elif self.portNameList[j][0] == 'recency':
207  try:
208  self.recency = int(self.portNameList[j][1])
209  except ValueError:
210  logging.error('Recency value for ' + str(self.driverName) + ' is not an integer')
211  self.recency = 5
212  else:
213  parts = self.portNameList[j][1].split(' ')
214  logging.info(parts)
215 
216  if parts[1].lower() == 'imagergb':
217  self.portsList.append(yarp.BufferedPortImageRgb())
218  # self.portsList[j].open(parts[0])
219 
220  elif parts[1].lower() == 'imagemono':
221  self.portsList.append(yarp.BufferedPortImageMono())
222  # self.portsList[j].open(parts[0])
223 
224  elif parts[1].lower() == 'bottle':
225  self.portsList.append(yarp.BufferedPortBottle())
226  # self.portsList[j].open(parts[0])
227 
228  else:
229  logging.error('Data type ' + str(parts[1]) + ' for ' +
230  str(self.portNameList[j][0]) + ' unsupported')
231  return False
232  # mrd models with label/instance training will always have:
233  # 1 an input data line which is used when a label is requested
234  # 2 an output data line which is used when a generated instance is required
235  if parts[0][-1] == 'i':
236  self.labelPort = j
237  self.labelPortName = parts[0]
238  elif parts[0][-1] == 'o':
239  self.instancePort = j
240  self.instancePortName = parts[0]
241 
242  if self.collectionMethod == 'continuous':
243  self.portsList.append(yarp.BufferedPortBottle())
244  self.eventPort = len(self.portsList) - 1
245  self.eventPortName = '/'.join(self.labelPortName.split('/')[:3])+'/event'
246  self.portsList[self.eventPort].open(self.eventPortName)
247  self.classTimestamps = []
248  if self.recency is None:
249  logging.warning('No recency value specified for ' + self.driverName)
250  logging.warning('Setting value to default of 5 seconds')
251  self.recency = 5
252 
253  if self.svPort is None or self.labelPort is None or self.instancePort is None:
254  logging.warning('Config file properties incorrect. Should look like this:')
255  logging.warning('[Actions]')
256  logging.warning('dataIn = /sam/actions/actionData:i Bottle')
257  logging.warning('dataOut = /sam/actions/actionData:o Bottle')
258  logging.warning('rpcBase = /sam/actions/rpc')
259  logging.warning('callSign = ask_action_label, ask_action_instance')
260  logging.warning('collectionMethod = buffered 3')
261 
262  # self.mm[0].configInteraction(self)
263  self.inputType = self.portNameList[self.labelPort][1].split(' ')[1].lower()
264  self.outputType = self.portNameList[self.labelPort][1].split(' ')[1].lower()
265  self.dataList = []
266  self.classificationList = []
267  self.probClassList = []
268  self.classTimestamps = []
269  yarp.Network.init()
270 
271  self.mm = initialiseModels([self.dataPath, self.modelPath, self.driverName], 'update', 'interaction')
272  self.modelLoaded = True
273 
274  if self.mm[0].model_mode != 'temporal':
275  self.bufferSize = proposedBuffer
276  elif self.mm[0].model_mode == 'temporal':
277  self.bufferSize = self.mm[0].temporalModelWindowSize
278 
279  self.portsList[self.labelPort].open(self.labelPortName)
280  self.portsList[self.instancePort].open(self.instancePortName)
281  # self.test()
282 
283  return True
284  else:
285  logging.error('Section ' + str(self.modelRoot) + ' not found in ' + str(self.configPath))
286  return False
287 
288  def close(self):
289  """
290  Close Yarp module
291 
292  Args:
293  None
294 
295  Returns:
296  Boolean indicating success or no success in closing the Yarp module
297  """
298  # close ports of loaded models
299  logging.info('Exiting ...')
300  for j in self.portsList:
301  self.closePort(j)
302  return False
303 
304  @timeout(3)
305  def closePort(self, j):
306  """
307  Helper function to close ports with an enforced timeout of 3 seconds so the module doesn't hang.
308 
309  Args:
310  j: Yarp Port
311 
312  Returns:
313  None
314  """
315  j.interrupt()
316  time.sleep(1)
317  j.close()
318 
319  def respond(self, command, reply):
320  """
321  Respond to external requests
322 
323  Description:
324  Available requests \n
325  1) __heartbeat__ : Sanity check request to make sure module is still alive. \n
326  2) __information__ : Utility request to pass in contextual information. \n
327  3) __portNames__ : Request to return the name of the currently open ports for samSupervisor to keep a list of open ports. \n
328  4) __reload__ : Request to reload model from disk. \n
329  5) __toggleVerbose__ : Switch logging to stdout on or off. \n
330  6) __EXIT__ : Abort and close the module. \n
331  7) __ask_X_label__ : Request a classification from the module. \n
332  8) __ask_X_instance__ : Request a generative output from the module. \n
333 
334  Args:
335  command : Incoming Yarp bottle containing external request.
336  reply : Outgoing Yarp bottle containing reply to processed request.
337 
338  Returns:
339  Boolean indicating success or no success in responding to external requests.
340  """
341  # this method responds to samSupervisor commands
342  reply.clear()
343  action = command.get(0).asString()
344 
345  count = 0
346  while not self.modelLoaded:
347  count += 1
348  time.sleep(0.5)
349  if count == 10:
350  break
351 
352  if self.modelLoaded:
353  if action != 'heartbeat' or action != 'information':
354  logging.info(action + ' received')
355  logging.info('responding to ' + action + ' request')
356 
357  if action == "portNames":
358  reply.addString('ack')
359  reply.addString(self.labelPortName)
360  reply.addString(self.instancePortName)
361  if self.collectionMethod == 'continuous':
362  reply.addString(self.eventPortName)
363  # -------------------------------------------------
364  elif action == "reload":
365  # send a message to the interaction model to check version of currently loaded model
366  # and compare it with that stored on disk. If model on disk is more recent reload model
367  # interaction model to return "model reloaded correctly" or "loaded model already up to date"
368  logging.info("reloading model")
369  try:
370  self.mm = initialiseModels([self.dataPath, self.modelPath, self.driverName],
371  'update', 'interaction')
372  reply.addString('ack')
373  except:
374  reply.addString('nack')
375  # -------------------------------------------------
376  elif action == "heartbeat":
377  reply.addString('ack')
378  # -------------------------------------------------
379  elif action == "toggleVerbose":
380  self.verboseSetting = not self.verboseSetting
381  reply.addString('ack')
382  # -------------------------------------------------
383  # elif action == "attention":
384  # self.attentionMode = command.get(1).asString()
385  # reply.addString('ack')
386  # -------------------------------------------------
387  elif action == "information":
388  if command.size() < 3:
389  reply.addString('nack')
390  else:
391  try:
392  self.additionalInfoDict[command.get(1).asString()] = command.get(2).asString()
393  reply.addString('ack')
394  except:
395  reply.addString('nack')
396  logging.info(self.additionalInfoDict)
397  # -------------------------------------------------
398  elif action == "EXIT":
399  reply.addString('ack')
400  self.close()
401  # -------------------------------------------------
402  elif action in self.callSignList:
403  logging.info('call sign command recognized')
404  if 'label' in action:
405  self.classifyInstance(reply)
406  elif 'instance' in action:
407  self.generateInstance(reply, command.get(1).asString())
408  # -------------------------------------------------
409  else:
410  reply.addString("nack")
411  reply.addString("Command not recognized")
412  else:
413  reply.addString("nack")
414  reply.addString("Model not loaded")
415 
416  return True
417 
418  def classifyInstance(self, reply):
419  """
420  Classify a live collected data instance
421 
422  Description:
423  This method responds to an `ask_x_label` request sent via the rpc port of the module. \n
424  In the case of __collectionMethod__ = `buffered`, the data currently in the buffer is sent to processLiveData() method for the current driver which returns a classification label that is embedded in reply.\n
425  In the case of __collectionMethod__ = `future_buffered`, this method reads incoming frames from the `dataIn` port until the collection buffer is full at which point it calls processLiveData() to get a classification label.\n
426  In the case of __collectionMethod__ = `continuous`, this model returns the most recent label in the FILO buffer containing classification labels.\n
427 
428  Args:
429  reply : Outgoing Yarp bottle containing classification label.
430 
431  Returns:
432  None
433  """
434  if self.portsList[self.labelPort].getInputCount() > 0:
435  if self.verboseSetting:
436  logging.info('-------------------------------------')
437  if self.collectionMethod == 'buffered':
438  if self.modelLoaded:
439  logging.debug('going in process live')
440  thisClass, probClass, dataList = self.mm[0].processLiveData(self.dataList, self.mm,
441  verbose=self.verboseSetting,
442  additionalData=self.additionalInfoDict)
443  logging.debug('exited process live')
444  else:
445  thisClass = None
446  logging.debug(thisClass)
447  logging.debug('object thisclass' + str(thisClass is None))
448  logging.debug('object datalist' + str(dataList is None))
449  logging.debug('string thisclass' + str(thisClass == 'None'))
450  logging.debug('string datalist' + str(dataList == 'None'))
451  if thisClass is None or dataList is None:
452  logging.debug('None reply')
453  reply.addString('nack')
454  else:
455  logging.debug('correct reply')
456  reply.addString('ack')
457  reply.addString(thisClass)
458  logging.debug('finish reply')
459  # reply.addDouble(probClass)
460  # -------------------------------------------------
461  elif self.collectionMethod == 'continuous':
462  # mutex lock classificationList
463  self.my_mutex.acquire()
464  logging.debug(self.classificationList)
465  logging.debug(self.classTimestamps)
466  # check last n seconds
467  if len(self.classificationList) > 0:
468  if self.useRecentClassTime:
469  minT = self.classTimestamps[-1] - self.recency
470  else:
471  minT = time.time() - self.recency
472 
473  logging.debug('minT ' + str(minT))
474  logging.debug('recency ' + str(self.recency))
475  for index, value in enumerate(self.classTimestamps):
476  logging.debug(str(index) + ' ' + str(value) + ' ' + str(value > minT))
477  validList = [index for index, value in enumerate(self.classTimestamps) if value > minT]
478  logging.debug('validList ' + str(validList))
479  minIdx = min(validList)
480  logging.debug('minIdx ' + str(minIdx))
481  validClassList = self.classificationList[minIdx:]
482  validProbList = self.probClassList[minIdx:]
483  logging.debug('validClassList ' + str(validClassList))
484  logging.debug('classify classList' + str(self.classificationList))
485  logging.debug('classify probclassList' + str(self.probClassList))
486  logging.debug('classify classTimeStamps' + str(self.classTimestamps))
487 
488  if len(validClassList) > 0:
489 
490  # combine all classifications
491  if len(validClassList) == 1:
492  logging.debug('validClassList is of len 1')
493  decision = validClassList[0]
494  else:
495  logging.debug('validClassList is of len ' + str(len(validClassList)))
496  setClass = list(set(validClassList))
497  logging.debug('setClass ' + str(setClass))
498  if len(setClass) == len(validClassList):
499  logging.debug('len setClass = len validClassList' + str(len(setClass)) + ' ' +
500  str(len(validClassList)))
501  decision = validClassList[validProbList.index(max(validProbList))]
502  else:
503  dictResults = dict()
504  for m in setClass:
505  logging.debug('currentM ' + str(m))
506  idxM = [idx for idx, name in enumerate(validClassList) if name == m]
507  logging.debug('idx ' + str(idxM))
508  probVals = itemgetter(*idxM)(validProbList)
509  logging.debug('probs ' + str(probVals))
510  try:
511  probSum = sum(probVals)
512  except TypeError:
513  probSum = probVals
514  logging.debug('sum ' + str(probSum))
515  dictResults[m] = probSum
516  logging.debug('')
517  logging.debug('dictResults ' + str(dictResults))
518  maxDictProb = max(dictResults.values())
519  logging.debug('maxDictProb = ' + str(maxDictProb))
520  decisions = [key for key in dictResults.keys() if dictResults[key] == maxDictProb]
521  logging.info('Decision: ' + str(decisions))
522  logging.info('We have resolution')
523  decision = ' and '.join(decisions)
524  logging.info('Decision: ' + decision)
525 
526  reply.addString('ack')
527  reply.addString(decision)
528  # reply.addString(validClassList[-1])
529  # self.classificationList.pop(-1)
530 
531  # remove validclassList from self.classificationList / probClassList / classTimeStamps
532  self.classificationList = self.classificationList[:minIdx]
533  self.probClassList = self.probClassList[:minIdx]
534  self.classTimestamps = self.classTimestamps[:minIdx]
535 
536  else:
537  logging.info('No valid classifications')
538  reply.addString('nack')
539  logging.debug('replying ' + reply.toString())
540  logging.debug('classify classList' + str(self.classificationList))
541  logging.debug('classify probclassList' + str(self.probClassList))
542  logging.debug('classify classTimeStamps' + str(self.classTimestamps))
543  else:
544  logging.info('No classifications yet')
545  reply.addString('nack')
546  self.my_mutex.release()
547  # -------------------------------------------------
548  elif self.collectionMethod == 'future_buffered':
549  self.dataList = []
550  for j in range(self.bufferSize):
551  self.dataList.append(self.readFrame())
552  if self.modelLoaded:
553  thisClass, probClass, dataList = self.mm[0].processLiveData(self.dataList, self.mm,
554  verbose=self.verboseSetting,
555  additionalData=self.additionalInfoDict)
556  else:
557  thisClass = None
558 
559  if thisClass is None or dataList is None:
560  reply.addString('nack')
561  else:
562  reply.addString('ack')
563  reply.addString(thisClass)
564  # reply.addDouble(probClass)
565  else:
566  reply.addString('nack')
567  reply.addString('No input connections to ' + str(self.portsList[self.labelPort].getName()))
568  logging.info('--------------------------------------')
569 
570  def generateInstance(self, reply, instanceName):
571  """Responds to an ask_X_instance request
572 
573  Description:
574  Implements the logic for responding to an `ask_X_instance` rpc request for __instanceName__. This method responds with an `ack` or `nack` on the rpc port indicating success of memory generation and outputs the generated instance returned by recallFromLabel on the `dataOut` port.
575 
576  Args:
577  reply : Yarp Bottle to embed the rpc response.
578  instanceName : Name of class to generate.
579 
580  Returns:
581  None
582  """
583  if self.portsList[self.instancePort].getOutputCount() != 0:
584  if instanceName in self.mm[0].textLabels:
585  instance = self.recallFromLabel(instanceName)
586  # send generated instance to driver where it is converted into the proper format
587  formattedData = self.mm[0].formatGeneratedData(instance)
588  # check formattedData is of correct data type
589  if str(type(self.portsList[self.instancePort])).split('\'')[1].split('Port')[1] \
590  in str(type(formattedData)):
591  try:
592  img = self.portsList[self.instancePort].prepare()
593  img.copy(formattedData)
594  self.portsList[self.instancePort].write()
595  reply.addString('ack')
596  reply.addString('Generated instance of ' + instanceName + ' as ' +
597  str(type(formattedData)))
598  except:
599  reply.addString('nack')
600  reply.addString('Failed to write ' + instanceName + ' as ' +
601  str(type(self.portsList[self.instancePort])))
602  else:
603  reply.addString('nack')
604  reply.addString('Output of ' + self.driverName + '.formatGeneratedData is of type: ' +
605  str(type(formattedData)) + '. Should be type: ' +
606  str(type(self.portsList[self.instancePort])))
607  else:
608  reply.addString('nack')
609  reply.addString('Instance name not found. Available instance names are: ' + str(self.mm[0].textLabels))
610  else:
611  reply.addString('nack')
612  reply.addString('No outgoing connections on ' + str(self.portsList[self.instancePort].getName()))
613 
614  def recallFromLabel(self, label):
615  """
616  Generates instance based on label.
617 
618  Args:
619  label : String containing the class label for the requested generated instance.
620 
621  Returns:
622  Serialised vector representing the generated instance.
623  """
624  ind = self.mm[0].textLabels.index(label)
625  if len(self.mm) > 1:
626  indsToChooseFrom = self.mm[ind + 1].SAMObject.model.textLabelPts[ind]
627  chosenInd = np.random.choice(indsToChooseFrom, 1)
628  yrecall = self.mm[ind + 1].SAMObject.recall(chosenInd)
629  else:
630  indsToChooseFrom = self.mm[0].SAMObject.model.textLabelPts[ind]
631  chosenInd = np.random.choice(indsToChooseFrom, 1)
632  yrecall = self.mm[0].SAMObject.recall(chosenInd)
633 
634  return yrecall
635 
636  def interruptModule(self):
637  """
638  Module interrupt logic.
639 
640  Returns : Boolean indicating success of logic or not.
641  """
642  return True
644  def getPeriod(self):
645  """
646  Module refresh rate.
647 
648  Returns : The period of the module in seconds.
649  """
650  return 0.1
652  def updateModule(self):
653  """
654  Logic to execute every getPeriod() seconds.
655 
656  Description:
657  This function makes sure important ports are connected. Priority 1 is the rpc port. Priority 2 is the data in port. If both are connected this function triggers collectData().
658 
659  Returns: Boolean indicating success of logic or not.
660  """
661  out = self.portsList[self.svPort].getOutputCount() + self.portsList[self.svPort].getInputCount()
662  if out != 0:
663  if not self.rpcConnected:
664  logging.info("Connection received")
665  logging.info('\n')
666  logging.info('-------------------------------------')
667  self.rpcConnected = True
668  self.falseCount = 0
669  else:
670  self.dataInConnected = self.portsList[self.labelPort].getInputCount() > 0
671  if self.dataInConnected:
672  self.collectData()
673  else:
674  self.noDataCount += 1
675  if self.noDataCount == self.errorRate:
676  self.noDataCount = 0
677  logging.info('No data in connection. Waiting for ' +
678  self.portNameList[self.labelPort][1] + ' to receive a connection')
679  else:
680  self.rpcConnected = False
681  self.falseCount += 1
682  if self.falseCount == self.errorRate:
683  self.falseCount = 0
684  logging.info('Waiting for ' + self.portNameList[self.svPort][1] +
685  ' to receive a connection')
686 
687  time.sleep(0.05)
688  return True
689 
690  def readFrame(self):
691  """
692  Logic to read an available data frame.
693 
694  Description:
695  This function first checks the required data type of the frame to be received and instantiates the required yarp data object. This function then subsequently reads in the latest frame from the __dataIn__ port which it returns. Return is `None` if the data type is not recognised. This is currently a limitation because `ImageRgb`, `ImageMono` and `Bottle` are so far the only supported bottle types. This can be easily extended in this section by adding more cases.
696 
697  Returns: Boolean indicating success of logic or not.
698  """
699  if self.inputType == 'imagergb':
700  frame = yarp.ImageRgb()
701  elif self.inputType == 'imagemono':
702  frame = yarp.ImageMono()
703  elif self.inputType == 'bottle':
704  frame = yarp.Bottle()
705  else:
706  return None
707 
708  frameRead = self.portsList[self.labelPort].read(True)
709 
710  if self.inputType == 'bottle':
711  frame.fromString(frameRead.toString())
712  elif 'image' in self.inputType:
713  frame.copy(frameRead)
714 
715  return frame
716 
717  def collectData(self):
718  """Collect data function
719 
720  Description:
721  This function implements three types of data collection procedures: \n
722 
723  1) __buffered__ : Collects data in a fixed length FIFO buffer of past frames. This buffer is read by classifyInstance(). \n
724  2) __future_buffered__ : No operation. \n
725  3) __continuous__ : Collect data until a buffer of length windowSize is full and then perform a classification on this data. The classification is then stored in a buffer with is read by classifyInstance(). \n
726 
727  Returns:
728  None
729  """
730  self.noDataCount = 0
732  if self.collectionMethod == 'buffered':
733  frame = self.readFrame()
734  # append frame to buffer
735  if len(self.dataList) == self.bufferSize:
736  # FIFO buffer first item in list most recent
737  self.dataList.pop(0)
738  self.dataList.append(frame)
739  else:
740  self.dataList.append(frame)
741  # -------------------------------------------------
742  elif self.collectionMethod == 'continuous' and self.attentionMode == 'continue':
743  # read frame of data
744  frame = self.readFrame()
745  # append frame to dataList
746 
747  if self.dataList is None:
748  self.dataList = []
749 
750  self.dataList.append(frame)
751  # process list of frames for a classification
752  dataList = []
753  if self.modelLoaded:
754  thisClass, probClass, dataList = self.mm[0].processLiveData(self.dataList, self.mm,
755  verbose=self.verboseSetting,
756  additionalData=self.additionalInfoDict)
757  else:
758  thisClass = None
759  # if proper classification
760  if thisClass is not None:
761  # empty dataList
762 
763  # mutex dataList lock
764  self.my_mutex.acquire()
765  self.dataList = dataList
766  # mutex dataList release
767 
768  if thisClass != 'None':
769  tStamp = time.time()
770  eventBottle = self.portsList[self.eventPort].prepare()
771  eventBottle.clear()
772  eventBottle.addString('ack')
773  self.portsList[self.eventPort].write()
774  # add classification to classificationList to be retrieved during respond method
775 
776  # mutex classificationList lock
777 
778  # Time based method
779  logging.info('classList len: ' + str(len(self.classificationList)))
780  logging.debug('thisclass ' + str(thisClass))
781  self.classificationList = self.classificationList + thisClass
782  logging.debug('classificationList ' + str(self.classificationList))
783  self.probClassList = self.probClassList + probClass
784  logging.debug('probClass ' + str(self.probClassList))
785  self.classTimestamps = self.classTimestamps + [tStamp]*len(thisClass)
786  logging.debug('self.classTimestamps ' + str(self.classTimestamps))
787  # remove timestamps older than memory duration (self.bufferSize in seconds)
788  logging.debug('last time stamp: ' + str(self.classTimestamps[-1]))
789  minMemT = self.classTimestamps[-1] - self.bufferSize
790  logging.debug('minMemT ' + str(minMemT))
791  goodIdxs = [idx for idx, timeVal in enumerate(self.classTimestamps) if timeVal > minMemT]
792  logging.debug('goodIdxs ' + str(goodIdxs))
793  minIdx = min(goodIdxs)
794  self.classificationList = self.classificationList[minIdx:]
795  self.probClassList = self.probClassList[minIdx:]
796  self.classTimestamps = self.classTimestamps[minIdx:]
797 
798  logging.debug('classificationList ' + str(self.classificationList))
799  logging.debug('probClass ' + str(self.probClassList))
800  logging.debug('self.classTimestamps ' + str(self.classTimestamps))
801 
802  # Old method
803  # if len(self.classificationList) == self.bufferSize:
804  # # FIFO buffer first item in list is oldest
805  # self.classificationList.pop(0)
806  # self.classTimestamps.pop(0)
807  # self.classificationList.append(thisClass)
808  # self.classTimestamps.append(tStamp)
809  # else:
810  # self.classificationList.append(thisClass)
811  # self.classTimestamps.append(tStamp)
812 
813  # mutex release
814  self.my_mutex.release()
815  # -------------------------------------------------
816  elif self.collectionMethod == 'future_buffered':
817  pass
818 
819  def test(self):
820  """
821  Utility function to test data collection procedures for debugging purposes.
822  """
823  count = 0
824  if self.collectionMethod == 'continuous':
825  classifyBlock = self.mm[0].paramsDict['windowSize']
826  elif self.collectionMethod == 'buffered':
827  classifyBlock = self.bufferSize
828  else:
829  classifyBlock = 10
830 
831  while True:
832  out = (self.portsList[self.svPort].getOutputCount() + self.portsList[self.svPort].getInputCount()) > 0
833  dataInConnected = self.portsList[self.labelPort].getInputCount() > 0
834 
835  if out and dataInConnected:
836  if self.collectionMethod == 'future_buffered':
837  reply = yarp.Bottle()
838  self.classifyInstance(reply)
839  logging.info(reply.toString())
840  elif self.collectionMethod == 'continuous':
841  self.collectData()
842  count += 1
843  if count == classifyBlock:
844  count = 0
845  reply = yarp.Bottle()
846  self.classifyInstance(reply)
847  logging.info('CLASSIFICATION: ' + reply.toString())
848 
849  # self.dataList = []
850  # for j in range(self.bufferSize):
851  # self.dataList.append(self.readFrame())
852 
853  # if thisClass is None:
854  # logging.info('None')
855  # else:
856  # logging.info(thisClass, ' ', likelihood)
857 
858  time.sleep(0.05)
859 
860 
861 def exception_hook(exc_type, exc_value, exc_traceback):
862  """Callback function to record any errors that occur in the log files.
863 
864  Documentation:
865  Substitutes the standard python exception_hook with one that records the error into a log file. Can only work if interactionSAMModel.py is called from python and not ipython because ipython overrides this substitution.
866  Args:
867  exc_type: Exception Type.
868  exc_value: Exception Value.
869  exc_traceback: Exception Traceback.
870 
871  Returns:
872  None
873  """
874  logging.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
876 sys.excepthook = exception_hook
877 
878 if __name__ == '__main__':
879  plt.ion()
880  yarp.Network.init()
881  mod = interactionSAMModel()
882  rf = yarp.ResourceFinder()
883  rf.setVerbose(True)
884  # rf.setDefaultContext("samSupervisor")
885  # rf.setDefaultConfigFile("default.ini")
886  rf.configure(sys.argv)
887 
888  mod.runModule(rf)
def configure(self, rf)
Configure interactionSAMModel yarp module.
def respond(self, command, reply)
Respond to external requests.
def initialiseModels(argv, update, initMode='training')
Initialise SAM Model data structure, training parameters and user parameters.
Definition: SAM_utils.py:59
def recallFromLabel(self, label)
Generates instance based on label.
def exception_hook(exc_type, exc_value, exc_traceback)
Callback function to record any errors that occur in the log files.
def classifyInstance(self, reply)
Classify a live collected data instance.
def generateInstance(self, reply, instanceName)
Responds to an ask_X_instance request.
def closePort(self, j)
Helper function to close ports with an enforced timeout of 3 seconds so the module doesn&#39;t hang...
Class to terminate a function running inside of a separate thread.
Definition: SAM_utils.py:449
def test(self)
Utility function to test data collection procedures for debugging purposes.
def readFrame(self)
Logic to read an available data frame.
def updateModule(self)
Logic to execute every getPeriod() seconds.