4 matplotlib.use(
"TkAgg")
5 import matplotlib.pyplot
as plt
8 from ConfigParser
import SafeConfigParser
16 from os.path
import join
18 from operator
import itemgetter
20 warnings.simplefilter(
"ignore")
21 np.set_printoptions(precision=2)
25 """Generic interaction function 28 Generic interaction function that carries out live collection of data, classification of said data, interaction with other modules requesting the classification and generation of training outputs for recall. The parameters for the interaction function are loaded from the config file specified in the `config_path` parameter of the default context file for samSupervisor.py. An example of the configuration structure is shown below. 32 dataIn = <portName/ofInputData>:i <dataType of Port> 33 dataOut = <portName/ofOutputData>:o <dataType of Port> 34 rpcBase = <portName/ofRpcPort> 35 call_sign = ask_<X>_label,ask_<X>_instance 36 collectionMethod = collectionMethod lengthOfBuffer 39 dataIn = /sam/faces/imageData:i ImageRgb 40 dataOut = /sam/faces/imageData:o ImageMono 41 rpcBase = /sam/faces/rpc 42 callSign = ask_face_label,ask_face_instance 43 collectionMethod = future_buffered 3 46 dataIn : The port name for the port that will received the data to be classified and the dataType to be expected. 47 dataOut : The port name for the port that will output generated data and the dataType to be transmitted. 48 rpcBase : The rpc port that will receive external requests. This is usually controlled by samSupervisor.py. 49 call_sign : The commands that will trigger a classify from data event or a generate from label event. 50 collectionMethod : The collection method to be used. Either `buffered`, `future_buffered` or `continuous`. Followed by an integer indicating the length of the buffer to be used. In the case of `continuous` the buffer length is the maximum number of classification labels to be stored in a First In Last Out (FILO) configuration. Otherwise the buffer length indicates the number of data frames that are required for a classification to take place. 55 Initialisation of the interaction function 57 yarp.RFModule.__init__(self)
115 Configure interactionSAMModel yarp module 118 rf: Yarp RF context input 119 argv_1 : String containing data path. 120 argv_2 : String containing model path. 121 argv_3 : String containing config file path (from `config_path` parameter of samSupervisor config file). 122 argv_4 : String driver name corresponding to a valid driver present in SAM_Drivers folder. 123 argv_5 : String `'True'` or `'False'` to switch formatting of logging depending on whether interaction is logging to a separate window or to the stdout of another process. 125 Boolean indicating success or no success in initialising the yarp module 127 stringCommand =
'from SAM.SAM_Drivers import ' + sys.argv[4] +
' as Driver' 142 while os.path.isfile(loggerFName)
and os.path.getsize(loggerFName) > 0:
147 logFormatter = logging.Formatter(
"[%(levelname)s] %(message)s")
149 logFormatter = logging.Formatter(
"\033[31m%(asctime)s [%(name)-33s] [%(levelname)8s] %(message)s\033[0m")
151 rootLogger = logging.getLogger(
'interaction ' + self.
driverName)
152 rootLogger.setLevel(logging.DEBUG)
154 fileHandler = logging.FileHandler(loggerFName)
155 fileHandler.setFormatter(logFormatter)
156 rootLogger.addHandler(fileHandler)
158 consoleHandler = logging.StreamHandler()
159 consoleHandler.setFormatter(logFormatter)
160 rootLogger.addHandler(consoleHandler)
161 logging.root = rootLogger
164 logging.info(
'Arguments: ' + str(sys.argv))
165 logging.info(stringCommand)
166 logging.info(
'Using log' + str(loggerFName))
167 logging.info(
'-------------------')
168 logging.info(
'Interaction Settings:')
169 logging.info(
'Data Path: '.ljust(off) + str(self.
dataPath))
170 logging.info(
'Model Path: '.ljust(off) + str(self.
modelPath))
171 logging.info(
'Config Path: '.ljust(off) + str(self.
configPath))
172 logging.info(
'Driver: '.ljust(off) + str(self.
driverName))
173 logging.info(
'-------------------')
174 logging.info(
'Configuring Interaction...')
178 parser2 = SafeConfigParser()
197 proposedBuffer = int(self.
portNameList[j][1].split(
' ')[1])
199 logging.error(
'collectionMethod bufferSize is not an integer')
200 logging.error(
'Should be e.g: collectionMethod = buffered 3')
203 if self.
collectionMethod not in [
'buffered',
'continuous',
'future_buffered']:
204 logging.error(
'collectionMethod should be set to buffered / continuous / future_buffered')
210 logging.error(
'Recency value for ' + str(self.
driverName) +
' is not an integer')
216 if parts[1].lower() ==
'imagergb':
217 self.
portsList.append(yarp.BufferedPortImageRgb())
220 elif parts[1].lower() ==
'imagemono':
221 self.
portsList.append(yarp.BufferedPortImageMono())
224 elif parts[1].lower() ==
'bottle':
225 self.
portsList.append(yarp.BufferedPortBottle())
229 logging.error(
'Data type ' + str(parts[1]) +
' for ' +
235 if parts[0][-1] ==
'i':
238 elif parts[0][-1] ==
'o':
243 self.
portsList.append(yarp.BufferedPortBottle())
249 logging.warning(
'No recency value specified for ' + self.
driverName)
250 logging.warning(
'Setting value to default of 5 seconds')
254 logging.warning(
'Config file properties incorrect. Should look like this:')
255 logging.warning(
'[Actions]')
256 logging.warning(
'dataIn = /sam/actions/actionData:i Bottle')
257 logging.warning(
'dataOut = /sam/actions/actionData:o Bottle')
258 logging.warning(
'rpcBase = /sam/actions/rpc')
259 logging.warning(
'callSign = ask_action_label, ask_action_instance')
260 logging.warning(
'collectionMethod = buffered 3')
274 if self.
mm[0].model_mode !=
'temporal':
276 elif self.
mm[0].model_mode ==
'temporal':
296 Boolean indicating success or no success in closing the Yarp module 299 logging.info(
'Exiting ...')
307 Helper function to close ports with an enforced timeout of 3 seconds so the module doesn't hang. 319 def respond(self, command, reply):
321 Respond to external requests 324 Available requests \n 325 1) __heartbeat__ : Sanity check request to make sure module is still alive. \n 326 2) __information__ : Utility request to pass in contextual information. \n 327 3) __portNames__ : Request to return the name of the currently open ports for samSupervisor to keep a list of open ports. \n 328 4) __reload__ : Request to reload model from disk. \n 329 5) __toggleVerbose__ : Switch logging to stdout on or off. \n 330 6) __EXIT__ : Abort and close the module. \n 331 7) __ask_X_label__ : Request a classification from the module. \n 332 8) __ask_X_instance__ : Request a generative output from the module. \n 335 command : Incoming Yarp bottle containing external request. 336 reply : Outgoing Yarp bottle containing reply to processed request. 339 Boolean indicating success or no success in responding to external requests. 343 action = command.get(0).asString()
353 if action !=
'heartbeat' or action !=
'information':
354 logging.info(action +
' received')
355 logging.info(
'responding to ' + action +
' request')
357 if action ==
"portNames":
358 reply.addString(
'ack')
364 elif action ==
"reload":
368 logging.info(
"reloading model")
371 'update',
'interaction')
372 reply.addString(
'ack')
374 reply.addString(
'nack')
376 elif action ==
"heartbeat":
377 reply.addString(
'ack')
379 elif action ==
"toggleVerbose":
381 reply.addString(
'ack')
387 elif action ==
"information":
388 if command.size() < 3:
389 reply.addString(
'nack')
393 reply.addString(
'ack')
395 reply.addString(
'nack')
398 elif action ==
"EXIT":
399 reply.addString(
'ack')
403 logging.info(
'call sign command recognized')
404 if 'label' in action:
406 elif 'instance' in action:
410 reply.addString(
"nack")
411 reply.addString(
"Command not recognized")
413 reply.addString(
"nack")
414 reply.addString(
"Model not loaded")
420 Classify a live collected data instance 423 This method responds to an `ask_x_label` request sent via the rpc port of the module. \n 424 In the case of __collectionMethod__ = `buffered`, the data currently in the buffer is sent to processLiveData() method for the current driver which returns a classification label that is embedded in reply.\n 425 In the case of __collectionMethod__ = `future_buffered`, this method reads incoming frames from the `dataIn` port until the collection buffer is full at which point it calls processLiveData() to get a classification label.\n 426 In the case of __collectionMethod__ = `continuous`, this model returns the most recent label in the FILO buffer containing classification labels.\n 429 reply : Outgoing Yarp bottle containing classification label. 436 logging.info(
'-------------------------------------')
439 logging.debug(
'going in process live')
440 thisClass, probClass, dataList = self.
mm[0].processLiveData(self.
dataList, self.
mm,
443 logging.debug(
'exited process live')
446 logging.debug(thisClass)
447 logging.debug(
'object thisclass' + str(thisClass
is None))
448 logging.debug(
'object datalist' + str(dataList
is None))
449 logging.debug(
'string thisclass' + str(thisClass ==
'None'))
450 logging.debug(
'string datalist' + str(dataList ==
'None'))
451 if thisClass
is None or dataList
is None:
452 logging.debug(
'None reply')
453 reply.addString(
'nack')
455 logging.debug(
'correct reply')
456 reply.addString(
'ack')
457 reply.addString(thisClass)
458 logging.debug(
'finish reply')
471 minT = time.time() - self.
recency 473 logging.debug(
'minT ' + str(minT))
474 logging.debug(
'recency ' + str(self.
recency))
476 logging.debug(str(index) +
' ' + str(value) +
' ' + str(value > minT))
477 validList = [index
for index, value
in enumerate(self.
classTimestamps)
if value > minT]
478 logging.debug(
'validList ' + str(validList))
479 minIdx = min(validList)
480 logging.debug(
'minIdx ' + str(minIdx))
483 logging.debug(
'validClassList ' + str(validClassList))
485 logging.debug(
'classify probclassList' + str(self.
probClassList))
488 if len(validClassList) > 0:
491 if len(validClassList) == 1:
492 logging.debug(
'validClassList is of len 1')
493 decision = validClassList[0]
495 logging.debug(
'validClassList is of len ' + str(len(validClassList)))
496 setClass = list(set(validClassList))
497 logging.debug(
'setClass ' + str(setClass))
498 if len(setClass) == len(validClassList):
499 logging.debug(
'len setClass = len validClassList' + str(len(setClass)) +
' ' +
500 str(len(validClassList)))
501 decision = validClassList[validProbList.index(max(validProbList))]
505 logging.debug(
'currentM ' + str(m))
506 idxM = [idx
for idx, name
in enumerate(validClassList)
if name == m]
507 logging.debug(
'idx ' + str(idxM))
508 probVals = itemgetter(*idxM)(validProbList)
509 logging.debug(
'probs ' + str(probVals))
511 probSum = sum(probVals)
514 logging.debug(
'sum ' + str(probSum))
515 dictResults[m] = probSum
517 logging.debug(
'dictResults ' + str(dictResults))
518 maxDictProb = max(dictResults.values())
519 logging.debug(
'maxDictProb = ' + str(maxDictProb))
520 decisions = [key
for key
in dictResults.keys()
if dictResults[key] == maxDictProb]
521 logging.info(
'Decision: ' + str(decisions))
522 logging.info(
'We have resolution')
523 decision =
' and '.join(decisions)
524 logging.info(
'Decision: ' + decision)
526 reply.addString(
'ack')
527 reply.addString(decision)
537 logging.info(
'No valid classifications')
538 reply.addString(
'nack')
539 logging.debug(
'replying ' + reply.toString())
541 logging.debug(
'classify probclassList' + str(self.
probClassList))
544 logging.info(
'No classifications yet')
545 reply.addString(
'nack')
553 thisClass, probClass, dataList = self.
mm[0].processLiveData(self.
dataList, self.
mm,
559 if thisClass
is None or dataList
is None:
560 reply.addString(
'nack')
562 reply.addString(
'ack')
563 reply.addString(thisClass)
566 reply.addString(
'nack')
567 reply.addString(
'No input connections to ' + str(self.
portsList[self.
labelPort].getName()))
568 logging.info(
'--------------------------------------')
571 """Responds to an ask_X_instance request 574 Implements the logic for responding to an `ask_X_instance` rpc request for __instanceName__. This method responds with an `ack` or `nack` on the rpc port indicating success of memory generation and outputs the generated instance returned by recallFromLabel on the `dataOut` port. 577 reply : Yarp Bottle to embed the rpc response. 578 instanceName : Name of class to generate. 584 if instanceName
in self.
mm[0].textLabels:
587 formattedData = self.
mm[0].formatGeneratedData(instance)
590 in str(type(formattedData)):
593 img.copy(formattedData)
595 reply.addString(
'ack')
596 reply.addString(
'Generated instance of ' + instanceName +
' as ' +
597 str(type(formattedData)))
599 reply.addString(
'nack')
600 reply.addString(
'Failed to write ' + instanceName +
' as ' +
603 reply.addString(
'nack')
604 reply.addString(
'Output of ' + self.
driverName +
'.formatGeneratedData is of type: ' +
605 str(type(formattedData)) +
'. Should be type: ' +
608 reply.addString(
'nack')
609 reply.addString(
'Instance name not found. Available instance names are: ' + str(self.
mm[0].textLabels))
611 reply.addString(
'nack')
616 Generates instance based on label. 619 label : String containing the class label for the requested generated instance. 622 Serialised vector representing the generated instance. 624 ind = self.
mm[0].textLabels.index(label)
626 indsToChooseFrom = self.
mm[ind + 1].SAMObject.model.textLabelPts[ind]
627 chosenInd = np.random.choice(indsToChooseFrom, 1)
628 yrecall = self.
mm[ind + 1].SAMObject.recall(chosenInd)
630 indsToChooseFrom = self.
mm[0].SAMObject.model.textLabelPts[ind]
631 chosenInd = np.random.choice(indsToChooseFrom, 1)
632 yrecall = self.
mm[0].SAMObject.recall(chosenInd)
638 Module interrupt logic. 640 Returns : Boolean indicating success of logic or not. 648 Returns : The period of the module in seconds. 654 Logic to execute every getPeriod() seconds. 657 This function makes sure important ports are connected. Priority 1 is the rpc port. Priority 2 is the data in port. If both are connected this function triggers collectData(). 659 Returns: Boolean indicating success of logic or not. 664 logging.info(
"Connection received")
666 logging.info(
'-------------------------------------')
677 logging.info(
'No data in connection. Waiting for ' +
685 ' to receive a connection')
692 Logic to read an available data frame. 695 This function first checks the required data type of the frame to be received and instantiates the required yarp data object. This function then subsequently reads in the latest frame from the __dataIn__ port which it returns. Return is `None` if the data type is not recognised. This is currently a limitation because `ImageRgb`, `ImageMono` and `Bottle` are so far the only supported bottle types. This can be easily extended in this section by adding more cases. 697 Returns: Boolean indicating success of logic or not. 700 frame = yarp.ImageRgb()
702 frame = yarp.ImageMono()
704 frame = yarp.Bottle()
711 frame.fromString(frameRead.toString())
713 frame.copy(frameRead)
718 """Collect data function 721 This function implements three types of data collection procedures: \n 723 1) __buffered__ : Collects data in a fixed length FIFO buffer of past frames. This buffer is read by classifyInstance(). \n 724 2) __future_buffered__ : No operation. \n 725 3) __continuous__ : Collect data until a buffer of length windowSize is full and then perform a classification on this data. The classification is then stored in a buffer with is read by classifyInstance(). \n 754 thisClass, probClass, dataList = self.
mm[0].processLiveData(self.
dataList, self.
mm,
760 if thisClass
is not None:
768 if thisClass !=
'None':
772 eventBottle.addString(
'ack')
780 logging.debug(
'thisclass ' + str(thisClass))
790 logging.debug(
'minMemT ' + str(minMemT))
791 goodIdxs = [idx
for idx, timeVal
in enumerate(self.
classTimestamps)
if timeVal > minMemT]
792 logging.debug(
'goodIdxs ' + str(goodIdxs))
793 minIdx = min(goodIdxs)
821 Utility function to test data collection procedures for debugging purposes. 825 classifyBlock = self.
mm[0].paramsDict[
'windowSize']
835 if out
and dataInConnected:
837 reply = yarp.Bottle()
839 logging.info(reply.toString())
843 if count == classifyBlock:
845 reply = yarp.Bottle()
847 logging.info(
'CLASSIFICATION: ' + reply.toString())
862 """Callback function to record any errors that occur in the log files. 865 Substitutes the standard python exception_hook with one that records the error into a log file. Can only work if interactionSAMModel.py is called from python and not ipython because ipython overrides this substitution. 867 exc_type: Exception Type. 868 exc_value: Exception Value. 869 exc_traceback: Exception Traceback. 874 logging.error(
"Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
876 sys.excepthook = exception_hook
878 if __name__ ==
'__main__':
882 rf = yarp.ResourceFinder()
886 rf.configure(sys.argv)
def configure(self, rf)
Configure interactionSAMModel yarp module.
def respond(self, command, reply)
Respond to external requests.
def interruptModule(self)
Module interrupt logic.
def collectData(self)
Collect data function.
def close(self)
Close Yarp module.
def initialiseModels(argv, update, initMode='training')
Initialise SAM Model data structure, training parameters and user parameters.
def recallFromLabel(self, label)
Generates instance based on label.
def exception_hook(exc_type, exc_value, exc_traceback)
Callback function to record any errors that occur in the log files.
def classifyInstance(self, reply)
Classify a live collected data instance.
Generic interaction function.
def generateInstance(self, reply, instanceName)
Responds to an ask_X_instance request.
def closePort(self, j)
Helper function to close ports with an enforced timeout of 3 seconds so the module doesn't hang...
Class to terminate a function running inside of a separate thread.
def test(self)
Utility function to test data collection procedures for debugging purposes.
def getPeriod(self)
Module refresh rate.
def readFrame(self)
Logic to read an available data frame.
def updateModule(self)
Logic to execute every getPeriod() seconds.