icub-client
samSupervisor.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import matplotlib
3 matplotlib.use("TkAgg")
4 import matplotlib.pyplot as plt
5 import SAM
6 import sys
7 import subprocess
8 import os
9 from os import listdir
10 from os.path import isfile, join, isdir
11 import glob
12 import pkgutil
13 import time
14 import datetime
15 import signal
16 import pickle
17 import readline
18 import yarp
19 from ConfigParser import SafeConfigParser
20 import SAM.SAM_Core.SAM_utils as utils
21 import logging
22 import copy
23 
24 
25 class SamSupervisorModule(yarp.RFModule):
26  """Model management and supervisor
27 
28  Description:
29  Single point of entry for managing training and interaction with multiple models running in parallel. Parameters loaded from `default.ini` in `samSupervisor` context folder which contains the configuration details for samSupervisor as seen in the example below.
30 
31  Example:
32  root_path /usr/local/SAM_Data_Models
33  config_path sensory_level_conf.ini
34  persistence True
35  windowed True
36  verbose True
37  useOPC False
38  startModels False
39  acceptableDelay "5"
40  controllerIP 127.0.0.1
41 
42  Args:
43  root_path : Path to folder containing Data and Models folders.
44  config_path : Name of configuration file detailing parameters for different drivers.
45  persistence : `'True'` or `'False'` indicating terminals opened by samSupervisor stay open even after process termination.
46  windowed : `'True'` or `'False'` indicating separate terminals will be opened by samSupervisor for each training and interaction process.
47  verbose : `'True'` or `'False'` switching on or off logging to stdout.
48  useOPC : `'True'` or `'False'` indicating using or not using OPC for further information.
49  startModels : `'True'` or `'False'` indicating all models in the config_path file will be automatically loaded during samSupervisor startup.
50  acceptableDelay : String with an integer for the number of tries before a model is declared dead, aborted and restarted.
51  controllerIP : IP address for the ipyparallel controller. Currently not in use. Leave at 127.0.0.1.
52 
53  """
54  def __init__(self):
55  yarp.RFModule.__init__(self)
57  self.terminal = 'xterm'
58  self.rootPath = None
59  self.interactionConfPath = None
60  self.startModels = None
61  self.persistence = None
62  self.windowed = None
63  self.verbose = None
64  self.modelPath = None
65  self.dataPath = None
67  self.trainingListHandles = dict()
68  self.loadedListHandles = dict()
69  self.iter = 0
70  self.rpcConnections = []
71  self.inputBottle = yarp.Bottle()
72  self.sendingBottle = yarp.Bottle()
73  self.responseBottle = yarp.Bottle()
74  self.outputBottle = yarp.Bottle()
75  self.devnull = None
76  self.supervisorPort = None
77  self.interactionConfFile = None
78  self.interactionParser = None
80  self.cluster = None
81  self.functionsList = None
82  self.trainableModels = None
83  self.modelsList = None
84  self.updateModels = None
85  self.updateModelsNames = None
86  self.noModels = None
87  self.noModelsNames = None
88  self.uptodateModels = None
89  self.uptodateModelsNames = None
90  self.nonResponsiveDict = dict()
92  self.modelConnections = dict()
94  self.modelPriority = ['backup', 'exp', 'best']
95  self.opcPort = None
96  self.useOPC = None
97  self.attentionModes = ['continue', 'stop']
98  self.opcPortName = None
99  self.opcRPCName = None
100  self.baseLogFileName = 'samSupervisorErrorLog'
102  def configure(self, rf):
103  """
104  Configure interactionSAMModel Yarp module.
105 
106  Args:
107  rf: Yarp RF context input.
108 
109  Returns:
110  Boolean indicating success or no success in initialising the Yarp module.
111  """
112  self.rootPath = rf.find("root_path").asString()
113  file_i = 0
114  loggerFName = join(self.rootPath, self.baseLogFileName + '_' + str(file_i) + '.log')
115 
116  # check if file exists
117  while os.path.isfile(loggerFName) and os.path.getsize(loggerFName) > 0:
118  loggerFName = join(self.rootPath, self.baseLogFileName + '_' + str(file_i) + '.log')
119  file_i += 1
120 
121  logFormatter = logging.Formatter("%(asctime)s [%(name)-33s] [%(levelname)8s] %(message)s")
122 
123  rootLogger = logging.getLogger('samSupervisor')
124  rootLogger.setLevel(logging.DEBUG)
125 
126  fileHandler = logging.FileHandler(loggerFName)
127  fileHandler.setFormatter(logFormatter)
128  rootLogger.addHandler(fileHandler)
129 
130  consoleHandler = logging.StreamHandler()
131  consoleHandler.setFormatter(logFormatter)
132  rootLogger.addHandler(consoleHandler)
133  logging.root = rootLogger
134 
135  logging.info(loggerFName)
136 
137  yarpAvailable = yarp.Network.checkNetwork()
138  if not yarpAvailable:
139  return False
140  yarp.Network.init()
141  self.SIGNALS_TO_NAMES_DICT = dict(
142  (getattr(signal, n), n) for n in dir(signal) if n.startswith('SIG') and '_' not in n)
143 
144  # check if sam is already running by checking presence of /sam/rpc:i
145  proc = subprocess.Popen(['yarp', 'ping', '/sam/rpc:i'], stdout=subprocess.PIPE)
146  output = proc.stdout.read()
147  proc.wait()
148  del proc
149 
150  if output != '':
151  logging.error('samSupervisor already running. /sam/rpc:i port present')
152  return False
153 
154  rootPath = rf.check("root_path")
155  interactionConfPath = rf.check("config_path")
156 
157  if not interactionConfPath and not rootPath:
158  logging.error("Cannot find .ini settings")
159  return False
160  else:
161  self.interactionConfPath = rf.find("config_path").asString()
162  persistence = rf.check("persistence", yarp.Value("False")).asString()
163  useOPC = rf.check("useOPC", yarp.Value("False")).asString()
164  acceptableDelay = rf.check("acceptableDelay", yarp.Value("5")).asString()
165  windowed = rf.check("windowed", yarp.Value("True")).asString()
166  verbose = rf.check("verbose", yarp.Value("True")).asString()
167  startModels = rf.check("startModels", yarp.Value("True")).asString()
168  controllerIP = rf.check("controllerIP", yarp.Value("None")).asString()
169 
170  nodesGroupString = rf.findGroup('nodes').toString()
171  logging.info('nodesGroupString length: ' + str(len(nodesGroupString)))
172  if len(nodesGroupString) > 7:
173  nodesList = nodesGroupString.replace('"', '').replace(')', '').split(' (')[1:]
174  nodesDict = dict()
175  for j in nodesList:
176  t = j.split(' ')
177  if utils.RepresentsInt(t[1]):
178  nodesDict[t[0]] = int(t[1])
179 
180  lenList = []
181  for j in nodesDict.keys():
182  lenList.append(len(j))
183 
184  nodeMaxLen = max(lenList)
185  else:
186  nodesDict = dict()
187 
188  self.startModels = startModels == 'True'
189  self.persistence = True if(persistence == "True") else False
190  self.windowed = True if(windowed == "True") else False
191  self.verbose = True if(verbose == "True") else False
192  self.useOPC = True if (useOPC == "True") else False
193  try:
194  if int(acceptableDelay) > 5:
195  self.nonResponsiveThreshold = int(acceptableDelay)
196  else:
197  logging.info('Requested responsive delay = ' + str(int(acceptableDelay)) + ' Minimum allowed = 5')
198 
199  self.nonResponsiveThreshold = 5
200  except:
201  self.nonResponsiveThreshold = 5
202  logging.info('Responsive Delay = ' + str(self.nonResponsiveThreshold))
203 
204  logging.info('Root supervisor path: \t' + str(self.rootPath))
205  logging.info('Model configuration file: \t' + str(self.interactionConfPath))
206  logging.info('Bash Persistence set to: \t' + str(self.persistence))
207  logging.info('Windowed set to: \t' + str(self.windowed))
208  logging.info('Verbose set to: \t' + str(self.verbose))
209  logging.info('Controller IP: \t' + str(controllerIP))
210  logging.info('Nodes:')
211  if len(nodesDict) > 0:
212  for j in nodesDict.keys():
213  logging.info(' \t', j.ljust(nodeMaxLen+4) +str(nodesDict[j]))
214  logging.info('')
215 
216  self.modelPath = self.rootPath + '/Models'
217  self.dataPath = self.rootPath + '/Data'
218  # OLD
219  # self.trainingFunctionsPath = os.environ.get("ICUBCLIENT_DIR")+"/bin"
220  # NEW
221  self.trainingFunctionsPath = SAM.SAM_Drivers.__path__
222  self.trainingListHandles = dict()
223  self.loadedListHandles = dict()
224  self.iter = 0
225  self.rpcConnections = []
226  self.inputBottle = yarp.Bottle()
227  self.sendingBottle = yarp.Bottle()
228  self.responseBottle = yarp.Bottle()
229  self.outputBottle = yarp.Bottle()
230 
231  if not self.windowed:
232  self.devnull = open('/dev/null', 'w')
233 
234  out = yarp.Bottle()
235  self.checkAvailabilities(out)
236  if self.verbose:
237  logging.info(out.toString())
238 
239  self.supervisorPort = yarp.Port()
240  self.supervisorPort.open('/sam/rpc:i')
241  self.attach(self.supervisorPort)
242 
243  if self.useOPC:
244  self.opcPort = yarp.RpcClient()
245  self.opcPortName = '/sam/opcRpc:o'
246  self.opcRPCName = '/OPC/rpc'
247  self.opcPort.open(self.opcPortName)
248  yarp.Network.connect(self.opcPortName, self.opcRPCName)
249 
250  # if len(nodesDict) > 0:
251  # self.cluster = utils.ipyClusterManager(nodesDict, controllerIP, self.devnull, totalControl=True)
252  # success = self.cluster.startCluster()
253  #
254  # if not success:
255  # self.cluster = None
256  # cmd = 'ipcluster start -n 4'
257  # command = "bash -c \"" + cmd + "\""
258  #
259  # if self.windowed:
260  # c = subprocess.Popen([self.terminal, '-e', command], shell=False)
261  # else:
262  # c = subprocess.Popen([cmd], shell=True)
263  #
264  # self.trainingListHandles['Cluster'] = c
265 
266  if len(self.uptodateModels) + len(self.updateModels) > 0:
267  if self.verbose:
268  logging.info("Loading models according to " + self.interactionConfPath)
269  # start loading model configuration according to interactionConfPath file
270 
271  rfModel = yarp.ResourceFinder()
272  rfModel.setVerbose(self.verbose)
273  rfModel.setDefaultContext("samSupervisor")
274  self.interactionConfFile = rfModel.findFile(self.interactionConfPath)
275 
276  # Iterate over all sections within the interactionConfPath,
277  # create a list and check against the available models
278  # warn if model specified in interactionConfPath not loadable
279  self.interactionParser = SafeConfigParser()
280  self.interactionParser.read(self.interactionConfFile)
281  self.interactionSectionList = self.interactionParser.sections()
282  if self.verbose:
283  logging.info('')
284  logging.info(self.dataPath)
285  logging.info(self.interactionSectionList)
286  logging.info('')
287  if self.startModels:
288  for j in self.interactionSectionList:
289  command = yarp.Bottle()
290  command.addString("load")
291  command.addString(j)
292  if self.verbose:
293  logging.info(command.toString())
294  reply = yarp.Bottle()
295 
296  self.loadModel(reply, command)
297  if self.verbose:
298  logging.info(reply.toString())
299  logging.info("-----------------------------------------------")
300  logging.info('')
301  else:
302  logging.info('Config ready. Awaiting input ...')
303  elif len(self.noModels) > 0:
304  if self.verbose:
305  logging.info("Models available for training.")
306  # Train a model according to ineractionConfPath file
307  else:
308  if self.verbose:
309  logging.info("No available models to load or train")
310  # wait for a training command
311 
312  return True
313 
314  def close(self):
315  """
316  Close Yarp module.
317 
318  Description:
319  Starts by issuing an `EXIT` command to all loaded interaction modules. Terminates training processes. Closes samSupervisor ports and terminates cluster if one has been started.
320 
321  Args:
322  None
323 
324  Returns:
325  Boolean indicating success or no success in closing the Yarp module.
326  """
327  # close ports of loaded models
328  for j in self.rpcConnections:
329  j[1].write(yarp.Bottle('EXIT'), self.inputBottle)
330  j[1].interrupt()
331  time.sleep(1)
332  j[1].close()
333 
334  self.supervisorPort.interrupt()
335  self.supervisorPort.close()
336 
337  if self.opcPort is not None:
338  self.opcPort.interrupt()
339  self.opcPort.close()
340 
341  for i, v in self.trainingListHandles.iteritems():
342  v.send_signal(signal.SIGINT)
343  v.wait()
344 
345  for v in self.rpcConnections:
346  v[4].send_signal(signal.SIGINT)
347  v[4].wait()
348 
349  if self.cluster is not None:
350  self.cluster.terminateProcesses()
351 
352  def checkAvailabilities(self, reply):
353  """
354  Check model availabilities
355 
356  Description:
357  This function starts by compiling a list of models present in the `root_path/Models`. Then cross checks that the corresponding data folder exists in `root_path/Data` and that it contains an appropriate `config.ini` file. If a data folder exists but a model does not, this is added to __noModels__ list. If the folder and the model both exist and the modification date of the data is newer than that of the model it is added to __updateModels__ (requiring an update) if the modification date of the model is newer it is added to __uptodateModels__. If the model exists but not the data folder config file, the model is ignored. Each model and data folder pair is then paired with its respective driver from the SAM_Drivers folder. If a corresponding driver is not available, the data and model are ignored.
358 
359  Args:
360  reply : Yarp Bottle containing a many string formatted response to indicate the state of all the available drivers and the corresponding models.
361 
362  Returns:
363  None
364  """
365  # after finding the root path, go to models folder and compile list of all
366  # models together with the last time they were modified
367  onlyfiles = [f for f in listdir(self.modelPath) if isfile(join(self.modelPath, f))]
368 
369  # find number of .pickle files
370  self.modelsList = [s.replace(".pickle", "") for s in onlyfiles
371  if ".pickle" in s and '~' not in s and '__L' not in s]
372  if self.verbose:
373  logging.info('Models available: ' + ', '.join(self.modelsList))
374 
375  # likewise go to data folder and compile list of all folders and last time they were modified
376  dataList = [f for f in listdir(self.dataPath) if isdir(join(self.dataPath, f))]
377  if self.verbose:
378  logging.info("Data folders available: " + ', '.join(dataList))
379 
380  # likewise parse training functions folder
381  # OLD
382  # self.functionsList = [f.replace(".py","") for f in listdir(self.trainingFunctionsPath)
383  # if isfile(join(self.trainingFunctionsPath, f)) if ".py" in f if '~' not in f]
384  # self.functionsList.sort()
385  # NEW
386  self.functionsList = []
387  for importer, modname, ispkg in pkgutil.iter_modules(SAM.SAM_Drivers.__path__):
388  if 'SAMDriver_' in modname:
389  self.functionsList += [modname]
390  self.functionsList.sort()
391 
392  if self.verbose:
393  logging.info("Training functions available: " + ', '.join(self.functionsList))
394 
395  # format of training functions is expected to be train_modelName_anythingElseToDistinguish
396  # therefore data folders must contain .ini file pointing towards the preferred algorithm to be chosen
397  model_params = ["model_options"]
398  if self.verbose:
399  logging.info('-------------------')
400  logging.info('Finding trainable data ...')
401  logging.info('')
402  # exit if no training functions have been found
403  if len(self.functionsList) == 0:
404  if self.verbose:
405  logging.error("No training functions found. Exiting ...")
406  return False
407  else:
408  self.trainableModels = []
409  # check which data folders are trainable i.e training functions available
410  for f in dataList:
411  loc = join(self.dataPath, f)
412  if self.verbose:
413  logging.info("Checking " + loc + " ...")
414  try:
415  parser = SafeConfigParser()
416  found = parser.read(loc + "/config.ini")
417  if not found:
418  if self.verbose:
419  logging.warning("config.ini not found for " + str(f))
420  pass
421  else:
422  if parser.has_section(model_params[0]):
423  try:
424  # NEW
425  trainOptions = parser.get(model_params[0], 'driver').split(',')
426  # OLD
427  # trainOptions = parser.get(model_params[0], 'train').split(',')
428  # check training function exists
429  availableFuncs = [s for s in trainOptions for g in self.functionsList if s == g]
430  if len(availableFuncs) != 0:
431  if self.verbose:
432  logging.info("Training functions for data " + f + " are " + ','.join(trainOptions))
433  logging.info("Corresponding functions available: " + ','.join(availableFuncs))
434 
435  if len(availableFuncs) > 1:
436  if self.verbose:
437  logging.info("The first function will be chosen: " + availableFuncs[0])
438  # find latest modified date of directory and subdirectories
439  # thus checking for addition of new data
440  t = []
441  for dirName, dirs, filenames in os.walk(loc):
442  t.append(os.path.getmtime(dirName))
443  lastMod = max(t)
444  if self.verbose:
445  logging.info("Data folder last modified: %s" % time.ctime(lastMod))
446  # format of trainableModels is: dataFolder name, corresponding training function,
447  # date data last modified, train boolean
448  self.trainableModels += [[f, availableFuncs[0], lastMod, True]]
449  else:
450  if self.verbose:
451  logging.warning("Training functions for data " + f + \
452  " not found. Will not train " + f)
453  except:
454  logging.warning("No option 'driver' in section: 'model_options' for " + f)
455  else:
456  if self.verbose:
457  logging.warning("Training parameters for data " + f + " not found. Will not train " \
458  + f + "\nCheck config.ini is formatted correctly")
459  except IOError:
460  pass
461  if self.verbose:
462  logging.info('')
463  if self.verbose:
464  logging.info('-------------------')
465  logging.info('Checking corresponding models')
466  logging.info('')
467  # compare models and data folders. Assuming model names = folder names
468  # check if model exists
469  for f in self.trainableModels:
470  t = []
471  currModels = []
472  for g in self.modelsList:
473  if str(f[0]) + '_' in g and '~' not in g:
474  # compare time of model and data
475  currModels.append(g)
476  g += ".pickle"
477  loc = join(self.modelPath, g)
478  t.append(os.path.getmtime(loc))
479  if len(t) > 0:
480  lastMod = max(t)
481 
482  currModelsDict = dict()
483  currModelsDict['exp'] = ''
484  currModelsDict['best'] = ''
485  currModelsDict['backup'] = ''
486  for l in t:
487  tempModel = currModels[t.index(l)]
488  if 'exp' in tempModel:
489  currModelsDict['exp'] = tempModel
490  elif 'best' in tempModel:
491  currModelsDict['best'] = tempModel
492  elif 'backup' in tempModel:
493  currModelsDict['backup'] = tempModel
494 
495  # currModels = currModels[t.index(lastMod)]
496  # self.trainableModels[self.trainableModels.index(f)].append(currModels)
497  self.trainableModels[self.trainableModels.index(f)].append(currModelsDict)
498 
499  if self.verbose:
500  logging.info(str(f[0]) + " Model last modified: %s" % time.ctime(lastMod))
501  if lastMod < f[2]:
502  tdiff = datetime.datetime.fromtimestamp(f[2]).replace(microsecond=0) - \
503  datetime.datetime.fromtimestamp(lastMod).replace(microsecond=0)
504  if self.verbose:
505  logging.info(str(f[0]) + ' Model outdated by ' + str(tdiff) + '. Will be trained')
506  else:
507  if self.verbose:
508  logging.info(str(f[0]) + ' Model up-to-date')
509  f[3] = False
510  else:
511  self.trainableModels[self.trainableModels.index(f)].append('')
512  if self.verbose:
513  logging.info(str(f[0]) + ' Model not found. Training Required')
514  if self.verbose:
515  logging.info('')
516  if self.verbose:
517  logging.info('-------------------')
518  logging.info('')
519 
520  # provide option to train now or on close
521  # if train now provide option to change experiment number or leave default
522  self.updateModels = [s for s in self.trainableModels if s[3] if s[4] != '']
523  self.updateModelsNames = [s[0] for s in self.trainableModels if s[3] if s[4] != '']
524 
525  self.noModels = [s for s in self.trainableModels if s[3] if s[4] == '']
526  self.noModelsNames = [s[0] for s in self.trainableModels if s[3] if s[4] == '']
527 
528  self.uptodateModels = [s for s in self.trainableModels if not s[3]]
529  self.uptodateModelsNames = [s[0] for s in self.trainableModels if not s[3]]
530 
531  reply.addVocab(yarp.Vocab_encode("many"))
532  reply.addString(str(len(self.uptodateModels)) + " Models up-to-date " + str(self.uptodateModelsNames))
533  reply.addString(str(len(self.updateModels)) + " Models require an update " + str(self.updateModelsNames))
534  reply.addString(str(len(self.noModels)) + " new models to train " + str(self.noModelsNames))
535  reply.addString('')
536 
537  for j in self.updateModelsNames + self.uptodateModelsNames + self.noModelsNames:
538  rep = yarp.Bottle()
539  cmd = yarp.Bottle()
540  cmd.addString('check')
541  cmd.addString(j)
542  self.checkModel(rep, cmd, allCheck=True)
543  a = str(rep.toString())
544  reply.addString(a)
545 
546  return True
547 
548  def respond(self, command, reply):
549  """
550  Respond to external requests
551 
552  Description:
553  Available requests \n
554  1) __askOPC__ : Query the OPC for additional contextual data. calls askOPC() \n
555  2) __attentionModulation <>__ : Modulate attention of all models. Enable classification if <param> is `continue`. Disable classification if <param> is `stop`. Calls attentionModulation() \n
556  3) __check_all__ : Cross checks available models, data folders, config files and driver availability. Calls checkAvailabilities() \n
557  4) __check <modelName>__ : Cross check available models, data folders, config files and driver availability for a specific <modelName>. Calls checkModel()\n
558  5) __config <modelName>__ : Opens the data folder config file for the specific <modelName> to view or change the training parameters. Calls configModel() \n
559  6) __dataDir <modelName>__ : Returns the model filename that will be used for the specific <modelName>. Calls dataDirModel() \n
560  7) __delete <modelName>__ : Deletes model from hard disk. Calls deleteModel() \n
561  8) __help__ : Returns a many formatted list of available commands \n
562  9) __load <modelName>__ : Launches interactionSAMModel.py for the <modelName>. Calls loadModel() \n
563  10) __close <modelName>__ : Terminates the interactionSAMModel.py process for the <modelName>. Calls closeModel() \n
564  11) __optimise <modelName>__ : Launches samOptimiser.py to optimise the training of the specified model. Calls optimise().\n
565  12) __quit__ : Closes all loaded models and stops samSupervisor.\n
566  13) __report <modelName> <plotFlag>__ : Reports the confusion matrix of <modelName> if it is a trained model. Plot result if <plotFlag> is set to `plot`. Return a formatted confusion matrix if <plotFlag> is not `plot`. Calls reportModel()\n
567  14) __train <modelName>__ : Launches trainSAMModel.py to train the specified model. Calls train() \n
568  15) __list_callSigns__ : Returns a list of the `ask_X_label` and `ask_X_instance` call signs for all models that are currently loaded with interactionSAMModel.py\n
569  7) __ask_X_label__ : Forwards the received ask label request to the corresponding loaded model if the callsign is present in __list_callsigns__. This command has an enforced timeout of 10 seconds to respond otherwise returns `nack`. \n
570  8) __ask_X_instance__ : Forwards the received ask instance request to the corresponding loaded model if the callsign is present in __list_callsigns__. This command has an enforced timeout of 10 seconds to respond otherwise returns `nack`. \n
571 
572  Args:
573  command : Incoming Yarp bottle containing external request.
574  reply : Outgoing Yarp bottle containing reply to processed request.
575 
576  Returns:
577  Boolean indicating success or no success in responding to external requests.
578  """
579  helpMessage = ["Commands are: ", "\taskOPC", "\tattentionModulation mode", "\tcheck_all", "\tcheck modelName",
580  "\tclose modelName", "\tconfig modelName", "\tdataDir modelName", "\tdelete modelName", "\thelp",
581  "\tlistModels mode", "\tload modelName", "\toptimise modelName", "\tquit", "\treport modelName", "\ttrain modelName",
582  "\tlist_callSigns"]
583  b = yarp.Bottle()
584  self.checkAvailabilities(b)
585  reply.clear()
586 
587  if command.get(0).asString() == "askOPC":
588  self.askOPC(reply)
589  elif command.get(0).asString() == "attentionModulation":
590  self.attentionModulation(reply, command)
591  elif command.get(0).asString() == "check_all":
592  self.checkAvailabilities(reply)
593  elif command.get(0).asString() == "check":
594  self.checkModel(reply, command)
595  elif command.get(0).asString() == "listModels":
596  self.listModels(reply, command)
597  elif command.get(0).asString() == "close":
598  self.closeModel(reply, command, True)
599  elif command.get(0).asString() == "delete":
600  self.deleteModel(reply, command)
601  elif command.get(0).asString() == "report":
602  self.reportModel(reply, command)
603  elif command.get(0).asString() == "dataDir":
604  self.dataDirModel(reply, command)
605  elif command.get(0).asString() == "config":
606  self.configModel(reply, command)
607  elif command.get(0).asString() == "help":
608  reply.addVocab(yarp.Vocab_encode("many"))
609  for i in helpMessage:
610  reply.addString(i)
611  elif command.get(0).asString() == "load":
612  self.loadModel(reply, command)
613  elif command.get(0).asString() == "quit":
614  reply.addString("quitting")
615  return False
616  elif command.get(0).asString() == "train":
617  self.train(reply, command)
618  elif command.get(0).asString() == "optimise":
619  self.optimise(reply, command)
620  elif command.get(0).asString() == "list_callSigns":
621  reply.addVocab(yarp.Vocab_encode("many"))
622  for e in self.rpcConnections:
623  repStr = str(e[0]) + " Model: \t"
624  for f in e[3]:
625  repStr += str(f) + "\t"
626  reply.addString(repStr)
627 
628  elif any(command.get(0).asString() in e[3] for e in self.rpcConnections):
629  if 'instance' in command.get(0).asString() and command.size() != 2:
630  reply.addString("Instance name required. e.g. ask_face_instance Daniel")
631  else:
632  try:
633  self.forwardCommand(command, reply)
634  if reply.size() == 0:
635  reply.addString('nack')
636  reply.addString('No response')
637  except utils.TimeoutError:
638  reply.addString('nack')
639  reply.addString('Failed to respond within timeout')
640 
641  else:
642  reply.addString("nack")
643  reply.addString("Wrong command")
644  # reply.addVocab(yarp.Vocab_encode("many"))
645  # reply.addString("Wrong command. ")
646  # for i in helpMessage:
647  # reply.addString(i)
648  # reply.addString("Call signs available:")
649  # for e in self.rpcConnections:
650  # repStr = "\t" + e[0] + " Model: \t"
651  # for f in e[3]:
652  # repStr += str(f) + "\t"
653  # reply.addString(repStr)
654  return True
655 
656  @utils.timeout(10)
657  def forwardCommand(self, command, reply):
658  """
659  Helper function to forward a call sign to the respective model with an enforced timeout of 10 seconds for the reply so the module does not hang.
660 
661  Args:
662  command : Yarp bottle with the call sign.
663  reply : Yarp bottle for the reply from the model.
664 
665  Returns:
666  None
667  """
668  for e in self.rpcConnections:
669  if command.get(0).asString() in e[3]:
670  e[1].write(command, reply)
671 
672  def interruptModule(self):
673  """
674  Module interrupt logic.
675 
676  Returns : Boolean indicating success of logic or not.
677  """
678  return True
680  def listModels(self, reply, command):
681  """ Returns lists of models for train or interaction categpories.
682 
683  Args:
684  command : Yarp bottle with command. Example valid commands below.
685  reply : Yarp bottle for the reply from the model.
686 
687  Returns :
688  Boolean indicating success or not.
689 
690  Example :
691  listModels train - returns list of models that require training\n
692  listModels interaction - returns list of interaction ready models\n
693  listModels all - returns a list of all models
694 
695  """
696  logging.info(command.toString())
697  b = yarp.Bottle()
698  self.checkAvailabilities(b)
699 
700  reply.clear()
701  if command.size() < 2:
702  reply.addString("nack")
703  reply.addString("'train', 'interaction' or `all` required. eg listModels train")
704  elif command.get(1).asString() not in ["train", "interaction", "all"]:
705  reply.addString("nack")
706  reply.addString("'train', 'interaction' or `all` required. eg listModels train")
707  else:
708  if command.get(1).asString() == "train":
709  listOfModels = self.updateModelsNames + self.noModelsNames
710  elif command.get(1).asString() == "interaction":
711  listOfModels = self.updateModelsNames + self.uptodateModelsNames
712  elif command.get(1).asString() == "all":
713  listOfModels = self.updateModelsNames + self.noModelsNames + self.uptodateModelsNames
714 
715  reply.addString("ack")
716  for j in listOfModels:
717  reply.addString(j)
718 
719  return True
720 
721  def attentionModulation(self, reply, command):
722  """ Modulate attention of all models.
723 
724  Args:
725  command : Yarp bottle with command. Example valid commands below.
726  reply : Yarp bottle for the reply from the model.
727 
728  Returns :
729  Boolean indicating success or not.
730 
731  Example :
732  attentionModulation stop \n
733  attentionModulation continue
734 
735  """
736  reply.clear()
737  logging.info(command.toString())
738  if command.size() < 2:
739  reply.addString("nack")
740  reply.addString("'stop' or 'continue' required. eg attentionModulation stop")
741  elif command.get(1).asString() not in self.attentionModes:
742  reply.addString("nack")
743  reply.addString("'stop' or 'continue' required. eg attentionModulation stop")
744  else:
745  for j in self.rpcConnections:
746  rep = yarp.Bottle()
747  cmd = yarp.Bottle()
748  cmd.addString('attention')
749  cmd.addString(command.get(1).asString())
750  j[1].write(cmd, rep)
751  reply.addString('ack')
752  return True
753 
754  def askOPC(self, reply):
755  """ Query the OPC for additional contextual data.
756 
757  Description:
758  Query name of agent from OPC and forward it to Actions model if it is loaded. This is currently the only model that uses this method. Method will be generalised in future versions.
759 
760  Args:
761  reply : Yarp bottle for the reply from OPC.
762 
763  Returns :
764  Boolean indicating success or not.
765  """
766  reply.clear()
767  actionsLoadedList = [t for t in self.rpcConnections if 'Actions' in t[0]]
768 
769  # check network connection with OPC is present if not make it
770  if self.opcPort.getOutputCount() == 0:
771  yarp.Network.connect(self.opcPortName, self.opcRPCName)
772 
773  if self.opcPort.getOutputCount() > 0:
774  if len(actionsLoadedList) > 0:
775  # ask for all objects with item entity
776  cmd = yarp.Bottle()
777  cmd.fromString('[ask] (("entity" "==" "agent"))')
778 
779  rep = yarp.Bottle()
780  self.opcPort.write(cmd, rep)
781  # split items in the returned string
782  lID = rep.get(1).toString().split('(')[-1].replace(')', '').split(' ')
783 
784  # iterate over items to get agent name
785  agentList = []
786  for j in lID:
787  cmd = yarp.Bottle()
788  cmd.fromString('[get] (("id" ' + str(j) + ') (propSet ("name")))')
789  rep = yarp.Bottle()
790  self.opcPort.write(cmd, rep)
791  agentList.append(rep.toString().split('name')[-1].split(')')[0].replace(' ', ''))
792 
793  currAgent = [t for t in agentList if t != 'icub']
794 
795  if len(currAgent) != 0:
796  currAgent = currAgent[0]
797  if len(currAgent) > 0:
798  for j in actionsLoadedList:
799  cmd = yarp.Bottle()
800  cmd.addString('information')
801  cmd.addString('partnerName')
802  cmd.addString(currAgent)
803  rep = yarp.Bottle()
804  j[1].write(cmd, rep)
805 
806  reply.addString('ack')
807  reply.addString('Agent = ' + str(currAgent))
808  else:
809  reply.addString('nack')
810  reply.addString('No agent apart from icub present')
811  else:
812  reply.addString('nack')
813  reply.addString('No agent apart from icub present')
814  else:
815  reply.addString('ack')
816  reply.addString('No actions loaded')
817  else:
818  reply.addString('nack')
819  reply.addString('OPC port not present')
820  logging.warning('OPC not found!')
821 
822  return True
823 
824  def closeModel(self, reply, command, external=False):
825  """ Close a loaded model.
826 
827  Description:
828  Check that a model has been loaded and if it is close the loaded model.
829 
830  Args:
831  reply : Yarp bottle for the reply from the function.
832  command : Yarp bottle with the command and the model name to close.
833  external: Boolean indicating model to be closed is to be closed for good if `True`. Else model is to be closed for restart.
834 
835  Returns :
836  Boolean indicating success or not.
837  """
838  if command.size() != 2:
839  reply.addString('nack')
840  reply.addString("Model name required. e.g. close Actions")
841  else:
842  alreadyOpen = False
843  conn = -1
844  for k in range(len(self.rpcConnections)):
845  if self.rpcConnections[k][0] == command.get(1).asString():
846  alreadyOpen = True
847  conn = k
848 
849  logging.info("Already open = " + str(alreadyOpen))
850  if self.verbose:
851  logging.info(command.get(1).asString())
852  if alreadyOpen:
853  if external:
854  if command.get(1).asString() in self.modelConnections.keys():
855  self.modelConnections[command.get(1).asString()] = dict()
856  self.rpcConnections[conn][-1] = 'loading' # this will make it ignored by the non responsive countdown
857  self.rpcConnections[conn][1].write(yarp.Bottle('EXIT'), self.inputBottle)
858  self.rpcConnections[conn][1].interrupt()
859  time.sleep(1)
860  self.rpcConnections[conn][1].close()
861  time.sleep(1)
862  self.rpcConnections[conn][4].send_signal(signal.SIGINT)
863  self.rpcConnections[conn][4].wait()
864  del self.rpcConnections[conn]
865  reply.addString('ack')
866  reply.addString(command.get(1).asString() + " model closed.")
867  else:
868  reply.addString('ack')
869  reply.addString(command.get(1).asString() + " model is not running.")
870  return True
871 
872  def loadModel(self, reply, command):
873  """ Load a trained model
874 
875  Description:
876  Launches interactionSAMModel.py for the model mentioned in the command.
877 
878  Args:
879  reply : Yarp bottle for the reply from the function.
880  command : Yarp bottle with the command and the model name to load.
881 
882  Returns :
883  None
884  """
885  parser = SafeConfigParser()
887  if command.size() < 2:
888  reply.addString('nack')
889  reply.addString("Model name required. e.g. load Actions")
890  elif command.get(1).asString() in self.noModelsNames:
891  reply.addString('nack')
892  reply.addString("Cannot load model. Model training available but not yet trained.")
893  elif command.get(1).asString() in self.uptodateModelsNames+self.updateModelsNames:
894  ret = parser.read(join(self.dataPath, command.get(1).asString(), "config.ini"))
895  if len(ret) > 0:
896  # OLD
897  # if(parser.has_option('model_options', 'interaction')):
898  # interactionFunction = parser.get('model_options', 'interaction').split(',')
899  # NEW
900  if parser.has_option('model_options', 'driver') and parser.has_option('model_options', 'modelNameBase'):
901  interactionFunction = parser.get('model_options', 'driver').split(',')
902  # modelNameBase = parser.get('model_options', 'modelNameBase')
903 
904  interactionFunction = [s for s in interactionFunction for g in self.functionsList if s == g]
905  if len(interactionFunction) != 0:
906  j = [s for s in self.trainableModels if s[0] == command.get(1).asString()][0]
907 
908  interfacePortName = self.interactionParser.get(j[0], 'rpcBase') + ':o'
909  callSignList = self.interactionParser.get(j[0], 'callSign').replace(' ', '').split(',')
910 
911  # check if the model is already loaded
912  alreadyOpen = False
913  correctOperation = False
914  conn = -1
915  for k in range(len(self.rpcConnections)):
916  if self.rpcConnections[k][0] == j[0]:
917  alreadyOpen = True
918  conn = k
919 
920  logging.info("Loading " + str(interfacePortName) + " with " + str(callSignList))
921  if alreadyOpen:
922  if self.verbose:
923  logging.info("Model already open")
924  # check it is functioning correctly
925  correctOp_check1, correctOp_check2 = self.checkOperation(self.rpcConnections[conn])
926  correctOperation = correctOp_check1 and correctOp_check2
927  if not correctOp_check2 and correctOp_check1:
928  alreadyOpen = False
929 
930  if self.verbose:
931  logging.info("correct operation = " + str(correctOperation))
932  logging.info('')
933  else:
934  if self.verbose:
935  logging.info("Model not open")
936 
937  if alreadyOpen and correctOperation:
938  rep = yarp.Bottle()
939  cmd = yarp.Bottle()
940  cmd.addString("reload")
941  # logging.info(self.rpcConnections[conn][0], 'reload')
942  self.rpcConnections[conn][1].write(cmd, rep)
943  if rep.get(0).asString() == 'ack':
944  reply.addString('ack')
945  reply.addString(command.get(1).asString() + " model re-loaded correctly")
946  else:
947  reply.addString('nack')
948  reply.addString(command.get(1).asString() + " model did not re-loaded correctly")
949  elif alreadyOpen and not correctOperation:
950  # terminate model by finding process in self.rpcConnections[4]
951  alreadyOpen = False
952  rep = yarp.Bottle()
953  cmd = yarp.Bottle()
954  cmd.addString("close")
955  cmd.addString(command.get(1).asString())
956  self.closeModel(rep, cmd)
957  reply.addString('ack')
958  reply.addString(command.get(1).asString() + " model terminated ")
959 
960  if not alreadyOpen:
961  interfacePort = yarp.RpcClient()
962  successfulOpen = interfacePort.open(interfacePortName)
963 
964  if not successfulOpen:
965  logging.error('CRAP')
966  for jk in self.rpcConnections:
967  logging.info(jk[0])
968 
969  # OLD
970  # args = ' '.join([join(self.dataPath,j[0]), join(self.modelPath, j[4]),
971  # self.interactionConfFile])
972  # cmd = 'ipython ' + join(self.trainingFunctionsPath, interactionFunction[0]+'.py') + \
973  # ' -- ' + args
974  # NEW
975  modType = command.get(2).asString()
976  if modType != '' and modType in j[4].keys():
977  if j[4][modType] != '':
978  modToLoad = j[4][modType]
979  else:
980  for modType in self.modelPriority:
981  if j[4][modType] != '':
982  modToLoad = j[4][modType]
983  logging.info(str(modType) + ' ' + str(modToLoad))
984 
985  args = ' '.join([join(self.dataPath, j[0]), join(self.modelPath, modToLoad),
986  self.interactionConfFile, interactionFunction[0], str(self.windowed)])
987  cmd = 'interactionSAMModel.py ' + args
988 
989  if self.verbose:
990  logging.info('')
991  logging.info("cmd = " + str(cmd))
992  logging.info('')
993  if self.persistence:
994  command = "bash -c \"" + cmd + "; exec bash\""
995  else:
996  command = "bash -c \"" + cmd + "\""
997 
998  if self.windowed:
999  c = subprocess.Popen([self.terminal, '-e', command], shell=False)
1000  else:
1001  c = subprocess.Popen([cmd], shell=True)
1002 
1003  self.rpcConnections.append([j[0], interfacePort, interfacePortName[:-1], callSignList, c, 'loading'])
1004  # pause here
1005 
1006  noConn = True
1007  iters = 0
1008  if self.verbose:
1009  logging.info('connecting ' + self.rpcConnections[-1][2]+'o' +
1010  ' with ' + self.rpcConnections[-1][2]+'i')
1011  while noConn:
1012  try:
1013  noConn = yarp.Network.connect(self.rpcConnections[-1][2]+'o',
1014  self.rpcConnections[-1][2]+'i')
1015  except:
1016  noConn = False
1017 
1018  noConn = not noConn
1019  time.sleep(1)
1020  iters += 1
1021  if iters >= 10:
1022  break
1023 
1024  if noConn:
1025  reply.addString('nack')
1026  reply.addString("Failure to load " + str(interactionFunction[0]) + " model")
1027  rep = yarp.Bottle()
1028  cmd = yarp.Bottle()
1029  cmd.addString("close")
1030  cmd.addString(self.rpcConnections[-1][0])
1031  self.closeModel(rep, cmd)
1032  else:
1033  # then execute an interaction model check to verify correct startup
1034  logging.info('pinging portNames to ' + str(self.rpcConnections[-1][0]))
1035  rep = yarp.Bottle()
1036  cmd = yarp.Bottle()
1037  cmd.addString("portNames")
1038  self.rpcConnections[-1][1].write(cmd, rep)
1039  self.rpcConnections[-1][-1] = 'ready'
1040  logging.info('ping received ' + str(rep.toString()))
1041 
1042  if self.rpcConnections[-1][0] not in self.modelConnections.keys():
1043  self.modelConnections[self.rpcConnections[-1][0]] = dict()
1044  if rep.size() > 1 and rep.get(0).asString() == 'ack':
1045  for p in range(rep.size()):
1046  if rep.get(p).asString() != 'ack':
1047  if rep.get(p).asString() not in self.modelConnections[self.rpcConnections[-1][0]].keys():
1048  self.modelConnections[self.rpcConnections[-1][0]][rep.get(p).asString()] = []
1049  logging.info('Monitoring ' + rep.get(p).asString())
1050  else:
1051  # reinstate previously present connections
1052  for op in self.modelConnections[self.rpcConnections[-1][0]][rep.get(p).asString()]:
1053  if op[1] == 'in':
1054  logging.info('Connect ' + str(op[0]) + ' to ' + rep.get(p).asString())
1055  yarp.Network.connect(op[0], rep.get(p).asString())
1056  else:
1057  logging.info('Connect ' + rep.get(p).asString() + ' to ' + str(op[0]))
1058  yarp.Network.connect(rep.get(p).asString(), op[0])
1059  reply.addString('ack')
1060  reply.addString(str(interactionFunction[0]) + " model loaded at " +
1061  interfacePortName + " with call signs " + str(callSignList))
1062  else:
1063  reply.addString('nack')
1064  reply.addString('No interaction function found in ' + command.get(1).asString() +
1065  ' model path. Skipping model')
1066  else:
1067  reply.addString('nack')
1068  reply.addString('Parameters "driver" and "modelBaseName" not found in config.ini')
1069  else:
1070  reply.addString('nack')
1071  reply.addString("Failed to retrieve " + command.get(1).asString() + " model. Model not trained")
1072  else:
1073  reply.addString('nack')
1074  reply.addString(command.get(1).asString() + " model does not exist")
1075  del parser
1076 
1077  def checkModel(self, reply, command, allCheck=False):
1078  """ Check availability and status of a model.
1079 
1080  Description:
1081  Cross check available models, data folders, config files and driver availability for a specific model.
1082 
1083  Args:
1084  reply : Yarp bottle for the reply from the function.
1085  command : Yarp bottle with the command and the model name to check.
1086  allCheck : Boolean to check all models if True and check a single model if False.
1087 
1088  Returns :
1089  Boolean indicating success or not.
1090  """
1091  reply.clear()
1092  # update to show which models are loaded or not and which are currently training
1093  repStr = ''
1094  if command.size() != 2:
1095  repStr += "Model name required. e.g. check Actions"
1096  elif command.get(1).asString() in self.trainingListHandles:
1097  repStr += command.get(1).asString()+" in training"
1098  elif command.get(1).asString() in self.uptodateModelsNames:
1099  repStr += command.get(1).asString()+" is up-to-date"
1100  elif command.get(1).asString() in self.updateModelsNames:
1101  repStr += command.get(1).asString()+" requires update"
1102  elif command.get(1).asString() in self.noModelsNames:
1103  repStr += command.get(1).asString()+" has no model"
1104  else:
1105  repStr += command.get(1).asString()+" model not present"
1106 
1107  if any(e[0] == command.get(1).asString() for e in self.rpcConnections):
1108  repStr += " and is loaded"
1109  elif command.get(1).asString() in self.uptodateModelsNames + self.updateModelsNames:
1110  repStr += " and is not loaded"
1111  if not allCheck:
1112  reply.addString('ack')
1113  reply.addString(repStr)
1114  return True
1115 
1116  def train(self, reply, command):
1117  """ Logic for training a model.
1118 
1119  Description:
1120  Checks that requested model is present and available for training.
1121 
1122  Args:
1123  reply : Yarp bottle for the reply from the function.
1124  command : Yarp bottle with the command and the model name to train.
1125 
1126  Returns :
1127  Boolean indicating success or not.
1128  """
1129  reply.clear()
1131  if command.size() != 2:
1132  reply.addString('nack')
1133  reply.addString("Model name required. e.g. train Actions")
1134  elif str(command.get(1).asString()) in self.uptodateModelsNames:
1135  reply.addString('nack')
1136  reply.addString(command.get(1).asString() + " is already up to date.")
1137  elif str(command.get(1).asString()) in self.trainingListHandles:
1138  reply.addString('nack')
1139  reply.addString(command.get(1).asString() + " is already being trained.")
1140  elif command.get(1).asString() in self.updateModelsNames or command.get(1).asString() in self.noModelsNames:
1141  reply.addString('ack')
1142  reply.addString("Training " + command.get(1).asString() + " model ...")
1143  modelToTrain = [s for s in self.updateModels + self.noModels if s[0] == command.get(1).asString()][0]
1144  if self.verbose:
1145  logging.info(modelToTrain)
1146  self.train_model(modelToTrain)
1147  else:
1148  reply.addString('nack')
1149  reply.addString(command.get(1).asString() + " model not available to train")
1150 
1151  return True
1152 
1153  def optimise(self, reply, command):
1154  """ Logic for optimising a model.
1155 
1156  Description:
1157  Checks that requested model is present and available for optimisation.
1158 
1159  Args:
1160  reply : Yarp bottle for the reply from the function.
1161  command : Yarp bottle with the command and the model name to optimise.
1162 
1163  Returns :
1164  Boolean indicating success or not.
1165  """
1166  reply.clear()
1168  if command.size() < 2:
1169  reply.addString('nack')
1170  reply.addString("Model name required. e.g. optimise Actions")
1171  elif str(command.get(1).asString()) in self.trainingListHandles:
1172  reply.addString('nack')
1173  reply.addString(command.get(1).asString() + " is already being trained.")
1174  elif command.get(1).asString() in self.noModelsNames:
1175  reply.addString('nack')
1176  reply.addString("Train " + command.get(1).asString() + " model before optimising")
1177  elif command.get(1).asString() in self.updateModelsNames or \
1178  command.get(1).asString() in self.uptodateModelsNames:
1179  reply.addString('ack')
1180  reply.addString("Optimising " + command.get(1).asString() + " model ...")
1181  modelToTrain = [s for s in self.updateModels + self.uptodateModels if s[0] == command.get(1).asString()][0]
1182  if self.verbose:
1183  logging.info(modelToTrain)
1184  self.optimise_model(modelToTrain, modName=command.get(2).asString())
1185  else:
1186  reply.addString('nack')
1187  reply.addString(command.get(1).asString() + " model not available to optimise")
1188 
1189  return True
1190 
1191  def deleteModel(self, reply, command):
1192  """ Deletes model from hard disk.
1193 
1194  Args:
1195  reply : Yarp bottle for the reply from the function.
1196  command : Yarp bottle with the command and the model name to delete.
1197 
1198  Returns :
1199  Boolean indicating success or not.
1200  """
1201  reply.clear()
1202  b = yarp.Bottle()
1203  self.checkAvailabilities(b)
1204  alreadyOpen = False
1205  for k in self.rpcConnections:
1206  if k[0] == command.get(1).asString():
1207  alreadyOpen = True
1208 
1209  if command.size() < 2:
1210  reply.addString('nack')
1211  reply.addString("Model name required. e.g. delete Actions")
1212  elif command.get(1).asString() in self.trainingListHandles:
1213  reply.addString('nack')
1214  reply.addString("Cannot delete model. Model in training")
1215  elif alreadyOpen:
1216  reply.addString('nack')
1217  reply.addString("Cannot delete model. Model currently loaded")
1218  elif command.get(1).asString() in self.updateModelsNames or \
1219  command.get(1).asString() in self.uptodateModelsNames:
1220 
1221  modelToDelete = [s for s in self.updateModels + self.uptodateModels
1222  if s[0] == command.get(1).asString()][0][4]
1223 
1224  for j in modelToDelete.keys():
1225  if 'L' in modelToDelete[j].split('__')[-1]:
1226  modelToDelete[j] = '__'.join(modelToDelete.split('__')[:-1])
1227 
1228  filesToDelete = []
1229  if command.get(2).asString() != '':
1230  modName = command.get(2).asString()
1231  if modName in modelToDelete.keys():
1232  if modelToDelete[modName] != '':
1233  filesToDelete += glob.glob(join(self.modelPath, modelToDelete[modName] + '*'))
1234  else:
1235  for j in modelToDelete.keys():
1236  if modelToDelete[j] != '':
1237  filesToDelete += glob.glob(join(self.modelPath, modelToDelete[j] + '*'))
1238  logging.info(filesToDelete)
1239  failFlag = False
1240  for i in filesToDelete:
1241  try:
1242  os.remove(i)
1243  except:
1244  failFlag = True
1245 
1246  if len(filesToDelete) > 0 and not failFlag:
1247  reply.addString('ack')
1248  reply.addString(str(command.get(1).asString()) + " model deleted.")
1249  elif len(filesToDelete) == 0:
1250  reply.addString('nack')
1251  reply.addString('Model name with ' + command.get(2).asString() + ' not found')
1252  elif failFlag:
1253  reply.addString('nack')
1254  reply.addString('Error when deleting file')
1255 
1256  b = yarp.Bottle()
1257  self.checkAvailabilities(b)
1258  else:
1259  reply.addString('nack')
1260  reply.addString(str(command.get(1).asString()) + " model not present")
1261  return True
1262 
1263  def configModel(self, reply, command):
1264  """ Displays the model configuration file using a system call to gedit.
1265 
1266  Args:
1267  reply : Yarp bottle for the reply from the function.
1268  command : Yarp bottle with the command and the model name to display.
1269 
1270  Returns :
1271  Boolean indicating success or not.
1272  """
1273  reply.clear()
1274  if command.size() != 2:
1275  reply.addString('nack')
1276  reply.addString("Model name required. e.g. report Actions")
1277  elif command.get(1).asString() in self.updateModelsNames or \
1278  command.get(1).asString() in self.uptodateModelsNames or \
1279  command.get(1).asString() in self.noModelsNames:
1280  modelToCheck = [s for s in self.updateModels + self.uptodateModels + self.noModels
1281  if s[0] == command.get(1).asString()][0][0]
1282 
1283  logging.info(modelToCheck)
1284 
1285  modelConfFile = join(self.dataPath, modelToCheck, 'config.ini')
1286  os.system("gedit " + modelConfFile)
1287  reply.addString('ack')
1288  reply.addString(modelToCheck)
1289  else:
1290  reply.addString('nack')
1291  return True
1292 
1293  def dataDirModel(self, reply, command):
1294  """ Returns the chosen model
1295 
1296  Description:
1297  samOptimiser creates multiple models with different suffixes. `best` indicates the best performing model. `exp` indicates the last trained model. `backup` indicates the model present before optimisation started. The priority of suffixes is `best`, `exp`, `backup`.
1298 
1299  Args:
1300  reply : Yarp bottle for the reply from the function.
1301  command : Yarp bottle with the command and the model name to chose.
1302 
1303  Returns :
1304  Boolean indicating success or not.
1305  """
1306  reply.clear()
1307  if command.size() < 2:
1308  reply.addString('nack')
1309  reply.addString("Model name required. e.g. dataDir Actions")
1310  elif command.get(1).asString() in self.updateModelsNames or \
1311  command.get(1).asString() in self.uptodateModelsNames:
1312 
1313  modelToCheck = [s for s in self.updateModels + self.uptodateModels
1314  if s[0] == command.get(1).asString()][0][4]
1315 
1316  modType = command.get(2).asString()
1317  modToLoad = ''
1318  if modType != '':
1319  if modType in modelToCheck.keys():
1320  if modelToCheck[modType] != '':
1321  modToLoad = modelToCheck[modType]
1322  else:
1323  for modType in self.modelPriority:
1324  if modelToCheck[modType] != '':
1325  modToLoad = modelToCheck[modType]
1326 
1327  if modToLoad != '':
1328  modToLoad = join(self.modelPath, modToLoad) + '.pickle'
1329  logging.info(modToLoad)
1330  reply.addString('ack')
1331  reply.addString(modToLoad)
1332  else:
1333  reply.addString('nack')
1334  reply.addString('Could not find model type ' + modType)
1335  else:
1336  reply.addString('nack')
1337  reply.addString('Could not find ' + command.get(1).asString())
1338 
1339  return True
1340 
1341  def reportModel(self, reply, command):
1342  """ Returns the performance of the trained model.
1343 
1344  Description:
1345  Returns the confusion matrix for a trained model or returns the confusion matrix together with a plot of the matrix.
1346 
1347  Args:
1348  reply : Yarp bottle for the reply from the function.
1349  command : Yarp bottle with the command, the model name to report and a plot parameter to switch plot on or off.
1350 
1351  Examples:
1352  report <modelName> \n
1353  report <modelName> plot
1354 
1355  Returns :
1356  Boolean indicating success or not.
1357  """
1358  reply.clear()
1359 
1360  if command.size() < 2:
1361  reply.addString('nack')
1362  reply.addString("Model name required. e.g. report Actions")
1363  elif command.get(1).asString() in self.updateModelsNames or \
1364  command.get(1).asString() in self.uptodateModelsNames:
1365 
1366  plotFlag = False
1367  if command.size() == 3 and command.get(2).asString() == 'plot':
1368  plotFlag = True
1369 
1370  modelToCheck = [s for s in self.updateModels + self.uptodateModels
1371  if s[0] == command.get(1).asString()][0][4]
1372 
1373  for j in modelToCheck.keys():
1374  if 'L' in modelToCheck[j].split('__')[-1]:
1375  modelToCheck[j] = '__'.join(modelToCheck.split('__')[:-1])
1376 
1377  reply.addVocab(yarp.Vocab_encode("many"))
1378  filesToCheck = []
1379  for j in modelToCheck.keys():
1380  if modelToCheck[j] != '':
1381  filesToCheck = glob.glob(join(self.modelPath, modelToCheck[j] + '*'))
1382  for i in filesToCheck:
1383  if '.pickle' in i:
1384  modelPickle = pickle.load(open(i, 'rb'))
1385  reply.addString('ack')
1386  reply.addString(modelToCheck[j]+":")
1387  try:
1388  perfLabels = modelPickle['overallPerformanceLabels']
1389  except:
1390  perfLabels = None
1391  perfCM = modelPickle['overallPerformance']
1392 
1393  if plotFlag:
1394  utils.plot_confusion_matrix(perfCM, perfLabels, title=modelToCheck[j])
1395 
1396  if perfLabels is not None:
1397  reply.addString(str(perfLabels))
1398  reply.addString(str(perfCM))
1399  reply.addString("\t"+" ")
1400  else:
1401  reply.addString('nack')
1402  reply.addString(str(command.get(1).asString()) + " model not present")
1403  return True
1404 
1405  def train_model(self, mod):
1406  """ Train a model.
1407 
1408  Description:
1409  Launches trainSAMModel.py to train the specified model.
1410 
1411  Args:
1412  mod : String with the model name to load.
1413 
1414  Returns :
1415  Boolean indicating success or not.
1416  """
1417  if self.verbose:
1418  logging.info("Training Models:")
1419  logging.info('')
1420 
1421  # OLD
1422  # n = mod[1] + '.py'
1423  # trainPath = join(self.trainingFunctionsPath, n)
1424  # NEW
1425  trainPath = 'trainSAMModel.py'
1426 
1427  if self.verbose:
1428  logging.info('Training ' + mod[0] + ' ...')
1429  logging.info('Opening ' + trainPath)
1430  logging.info('')
1431  dPath = join(self.dataPath, mod[0])
1432 
1433  if mod[4] != '':
1434  modToTrain = mod[4]['exp']
1435  else:
1436  modToTrain = ''
1437 
1438  if modToTrain != '':
1439  mPath = join(self.modelPath, modToTrain) + '.pickle'
1440  else:
1441  mPath = join(self.modelPath, modToTrain)
1442 
1443  if self.verbose:
1444  logging.info(mPath)
1445 
1446  # #open separate ipython for training
1447  # #this will allow separate training across different computers in future
1448  # OLD
1449  # if(mod[0] in self.updateModelsNames):
1450  # args = ' '.join([dPath, mPath, mod[1], 'update'])
1451  # else:
1452  # args = ' '.join([dPath, mPath, mod[1], 'new'])
1453  # NEW
1454  if mod[0] in self.updateModelsNames:
1455  args = ' '.join([dPath, mPath, mod[1], 'update', mod[0], str(self.windowed)])
1456  else:
1457  args = ' '.join([dPath, mPath, mod[1], 'new', mod[0], str(self.windowed)])
1458 
1459  if self.verbose:
1460  logging.info('args: ' + str(args))
1461 
1462  # OLD
1463  # cmd = 'ipython ' + trainPath + ' -- ' + args
1464  # NEW
1465  cmd = trainPath + ' ' + args
1466  if self.persistence:
1467  command = "bash -c \"" + cmd + "; exec bash\""
1468  else:
1469  command = "bash -c \"" + cmd + "\""
1470 
1471  if self.verbose:
1472  logging.info('cmd: ' + str(cmd))
1473 
1474  if self.windowed:
1475  c = subprocess.Popen([self.terminal, '-e', command], shell=False)
1476  else:
1477  # c = subprocess.Popen([cmd], shell=True, stdout=self.devnull, stderr=self.devnull)
1478  c = subprocess.Popen([cmd], shell=True)
1479 
1480  self.trainingListHandles[mod[0]] = c
1481 
1482  return True
1483 
1484  def optimise_model(self, mod, modName):
1485  """ Optimise a model.
1486 
1487  Description:
1488  Launches samOptimiser.py to optimise the specified model.
1489 
1490  Args:
1491  mod : Model information.
1492  modName : Model name.
1493 
1494  Returns :
1495  Boolean indicating success or not.
1496  """
1497  if self.verbose:
1498  logging.info("Training Models:")
1499  logging.info('')
1500 
1501  # OLD
1502  # n = mod[1] + '.py'
1503  # trainPath = join(self.trainingFunctionsPath, n)
1504  # NEW
1505  trainPath = 'trainSAMModel.py'
1506 
1507  if self.verbose:
1508  logging.info('Optimising ' + mod[0] + ' ...')
1509  logging.info('')
1510  dPath = join(self.dataPath, mod[0])
1511 
1512  modToUse = ''
1513  if modName != '' and modName in mod[4].keys():
1514  if mod[4][modName] != '':
1515  modToUse = mod[4][modName]
1516  else:
1517  for nm in self.modelPriority:
1518  if mod[4][nm] != '':
1519  modToUse = mod[4][nm]
1520 
1521  if modToUse != '':
1522  mPath = join(self.modelPath, modToUse) + '.pickle'
1523  else:
1524  mPath = join(self.modelPath, modToUse)
1525 
1526  if self.verbose:
1527  logging.info(mPath)
1528 
1529  # #open separate ipython for training
1530  # #this will allow separate training across different computers in future
1531  # OLD
1532  # args = ' '.join([dPath, mPath, mod[1], 'new', 'False', 'False', 'True'])
1533  # NEW
1534  args = ' '.join([dPath, mPath, mod[1], 'new', mod[0], 'False', 'False', 'True'])
1535 
1536  if self.verbose:
1537  logging.info('args: ' + str(args))
1538 
1539  cmd = 'samOptimiser.py ' + trainPath + ' ' + args
1540  if self.persistence:
1541  command = "bash -c \"" + cmd + "; exec bash\""
1542  else:
1543  command = "bash -c \"" + cmd + "\""
1544 
1545  if self.verbose:
1546  logging.info('cmd: ' + str(cmd))
1547 
1548  if self.windowed:
1549  c = subprocess.Popen([self.terminal, '-e', command], shell=False)
1550  else:
1551  c = subprocess.Popen([cmd], shell=True)
1552 
1553  self.trainingListHandles[mod[0]] = c
1554 
1555  return True
1556 
1557  def getPeriod(self):
1558  """
1559  Module refresh rate.
1560 
1561  Returns : The period of the module in seconds.
1562  """
1563  return 0.1
1564 
1565  def onlineModelCheck(self):
1566  """ Check status of loaded models.
1567 
1568  Description:
1569  Checks that ports for all loaded models are alive and working. If a loaded model is not working restart the model.
1570 
1571  Args:
1572  None
1573  Returns :
1574  None
1575  """
1576  # check communication with loaded models
1577  readyList = []
1578  for i, v in self.trainingListHandles.iteritems():
1579  if i != 'Cluster':
1580  ret = v.poll()
1581  if ret is not None:
1582  if ret == 0:
1583  readyList += [i]
1584  logging.info(str(i) + ' terminated successfully')
1585  b = yarp.Bottle()
1586  self.checkAvailabilities(b)
1587  else:
1588  readyList += [i]
1589  logging.error(str(i) + ' terminated with ' + str(self.SIGNALS_TO_NAMES_DICT[abs(ret)]))
1590  else:
1591  # if(self.verbose): logging.info(i, "still training "
1592  pass
1593 
1594  for i in readyList:
1595  del self.trainingListHandles[i]
1596 
1597  for j in range(len(self.rpcConnections)):
1598  currModelName = copy.deepcopy(self.rpcConnections[j][0])
1599  correctOp_check1, correctOp_check2 = self.checkOperation(self.rpcConnections[j])
1600  correctOperation = correctOp_check1 and correctOp_check2
1601  if not correctOperation and self.rpcConnections[j][-1] != 'loading':
1602  self.connectionCheckCount = 0
1603  if self.rpcConnections[j][0] not in self.nonResponsiveDict.keys():
1604  self.nonResponsiveDict[self.rpcConnections[j][0]] = 1
1605  else:
1606  self.nonResponsiveDict[self.rpcConnections[j][0]] += 1
1607  logging.warning(str(self.rpcConnections[j][0]) + ' not responding ' + str(self.nonResponsiveDict[self.rpcConnections[j][0]]) +
1608  '/' + str(self.nonResponsiveThreshold))
1609  if self.nonResponsiveDict[self.rpcConnections[j][0]] >= self.nonResponsiveThreshold:
1610  logging.error('Restarting ' + str(self.rpcConnections[j][0]) + ' model')
1611  if not correctOp_check1 or currModelName in self.nonResponsiveDict.keys():
1612  rep = yarp.Bottle()
1613  cmd = yarp.Bottle()
1614  cmd.addString("load")
1615  cmd.addString(currModelName)
1616  self.loadModel(rep, cmd)
1617  else:
1618  rep = yarp.Bottle()
1619  cmd = yarp.Bottle()
1620  cmd.addString("close")
1621  cmd.addString(self.rpcConnections[j][0])
1622  self.closeModel(rep, cmd)
1623  # try:
1624  logging.info(currModelName)
1625  logging.info('')
1626  self.nonResponsiveDict[self.rpcConnections[j][0]] = 0
1627  # except:
1628  # pass
1629  else:
1630  self.nonResponsiveDict[self.rpcConnections[j][0]] = 0
1631  self.connectionCheckCount += 1
1632  if self.connectionCheckCount == 10:
1633  for n in range(len(self.rpcConnections)):
1634  logging.info('ALL KEYS ' + str(self.modelConnections.keys()))
1635  if self.rpcConnections[n][0] in self.modelConnections.keys():
1636  logging.info(self.rpcConnections[n][0])
1637  if len(self.modelConnections[self.rpcConnections[n][0]].keys()) == 0:
1638  logging.info('pinging portNames to ' + str(self.rpcConnections[n][0]))
1639  rep = yarp.Bottle()
1640  cmd = yarp.Bottle()
1641  cmd.addString("portNames")
1642  self.rpcConnections[n][1].write(cmd, rep)
1643  self.rpcConnections[n][-1] = 'ready'
1644  logging.info('ping received ' + str(rep.toString()))
1645 
1646  if rep.size() > 1 and rep.get(0).asString() == 'ack':
1647  for p in range(rep.size()):
1648  if rep.get(p).asString() != 'ack':
1649  if rep.get(p).asString() not in self.modelConnections[self.rpcConnections[n][0]].keys():
1650  self.modelConnections[self.rpcConnections[n][0]][rep.get(p).asString()] = []
1651  logging.info('Monitoring ' + str(rep.get(p).asString()))
1652  else:
1653  for k in self.modelConnections[self.rpcConnections[n][0]].keys():
1654  logging.info('ping ' + str(k))
1655  proc = subprocess.Popen(['yarp', 'ping', k], stdout=subprocess.PIPE)
1656  output = proc.stdout.read()
1657  proc.wait()
1658  del proc
1659 
1660  conStrings = output.split('\n')[1:]
1661  connList = []
1662  for g in conStrings:
1663  if 'output conn' in g:
1664  dirConnect = 'out'
1665  elif 'input conn' in g:
1666  dirConnect = 'in'
1667  else:
1668  dirConnect = None
1669 
1670  if 'from' in g and dirConnect is not None:
1671  parts = g.split(' ')
1672  if dirConnect == 'out':
1673  connList.append([parts[8], dirConnect])
1674  elif dirConnect == 'in' and '<ping>' not in g:
1675  connList.append([parts[6], dirConnect])
1676 
1677  self.modelConnections[self.rpcConnections[n][0]][k] = connList
1678 
1679  logging.info(self.modelConnections)
1680  logging.info('')
1681  self.connectionCheckCount = 0
1682 
1683  def checkOperation(self, j):
1684  """
1685  Check heartbeat of model.
1686 
1687  Args:
1688  j : Yarp rpc port for loaded model to check.
1689 
1690  Returns:
1691  correctOp_check1 : Check that output count for the model rpc port is > 0.
1692  correctOp_check2 : Check that the model returns `ack` to a `hartbeat` request.
1693 
1694  """
1695  correctOp_check1 = True if j[1].getOutputCount() > 0 else False
1696  rep = yarp.Bottle()
1697  cmd = yarp.Bottle()
1698  cmd.addString("heartbeat")
1699  j[1].write(cmd, rep)
1700  correctOp_check2 = True if rep.get(0).asString() == 'ack' else False
1701  # correctOperation = correctOp_check1 and correctOp_check2
1702  return correctOp_check1, correctOp_check2
1703 
1704  def updateModule(self):
1705  """
1706  Logic to execute every getPeriod() seconds.
1707 
1708  Description:
1709  This function makes checks that all loaded modules are still alive and if OPC querying is enabled query OPC.
1710 
1711  Returns: Boolean indicating success of logic or not.
1712  """
1713  if self.iter == 10:
1714  self.onlineModelCheck()
1715  if self.useOPC:
1716  rep = yarp.Bottle()
1717  self.askOPC(rep)
1718  self.iter = 0
1719  self.iter += 1
1720  time.sleep(0.05)
1721  return True
1722 
1723 
1724 def exception_hook(exc_type, exc_value, exc_traceback):
1725  """Callback function to record any errors that occur in the log files.
1726 
1727  Documentation:
1728  Substitutes the standard python exception_hook with one that records the error into a log file. Can only work if interactionSAMModel.py is called from python and not ipython because ipython overrides this substitution.
1729  Args:
1730  exc_type: Exception Type.
1731  exc_value: Exception Value.
1732  exc_traceback: Exception Traceback.
1733 
1734  Returns:
1735  None
1736  """
1737  logging.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
1738 
1739 sys.excepthook = exception_hook
1740 
1741 if __name__ == '__main__':
1743  plt.ion()
1744  yarp.Network.init()
1745  samMod = SamSupervisorModule()
1746  yrf = yarp.ResourceFinder()
1747  yrf.setVerbose(True)
1748  yrf.setDefaultContext("samSupervisor")
1749  yrf.setDefaultConfigFile("default.ini")
1750  yrf.configure(sys.argv)
1752  samMod.runModule(yrf)
def train(self, reply, command)
Logic for training a model.
def reportModel(self, reply, command)
Returns the performance of the trained model.
def listModels(self, reply, command)
Returns lists of models for train or interaction categpories.
def interruptModule(self)
Module interrupt logic.
def train_model(self, mod)
Train a model.
def configModel(self, reply, command)
Displays the model configuration file using a system call to gedit.
def forwardCommand(self, command, reply)
Helper function to forward a call sign to the respective model with an enforced timeout of 10 seconds...
def configure(self, rf)
Configure interactionSAMModel Yarp module.
def checkAvailabilities(self, reply)
Check model availabilities.
def optimise(self, reply, command)
Logic for optimising a model.
Model management and supervisor.
def askOPC(self, reply)
Query the OPC for additional contextual data.
def dataDirModel(self, reply, command)
Returns the chosen model.
def checkModel(self, reply, command, allCheck=False)
Check availability and status of a model.
def checkOperation(self, j)
Check heartbeat of model.
def deleteModel(self, reply, command)
Deletes model from hard disk.
def exception_hook(exc_type, exc_value, exc_traceback)
Callback function to record any errors that occur in the log files.
def updateModule(self)
Logic to execute every getPeriod() seconds.
def optimise_model(self, mod, modName)
Optimise a model.
def onlineModelCheck(self)
Check status of loaded models.
def closeModel(self, reply, command, external=False)
Close a loaded model.
def respond(self, command, reply)
Respond to external requests.
def loadModel(self, reply, command)
Load a trained model.
def getPeriod(self)
Module refresh rate.
def attentionModulation(self, reply, command)
Modulate attention of all models.