himrep
main.cpp
1 /*
2  * Copyright (C) 2016 iCub Facility - Istituto Italiano di Tecnologia
3  * Author: Giulia Pasquale
4  * email: giulia.pasquale@iit.it
5  * Permission is granted to copy, distribute, and/or modify this program
6  * under the terms of the GNU General Public License, version 2 or any
7  * later version published by the Free Software Foundation.
8  *
9  * A copy of the license can be found at
10  * http://www.robotcub.org/icub/license/gpl.txt
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
15  * Public License for more details
16  */
17 
18 // General includes
19 #include <cstdio>
20 #include <cstdlib> // getenv
21 #include <string>
22 #include <vector>
23 #include <iostream>
24 #include <fstream>
25 #include <numeric>
26 #include <utility>
27 #include <mutex>
28 
29 #include <yarp/os/Network.h>
30 #include <yarp/os/RFModule.h>
31 #include <yarp/os/Time.h>
32 #include <yarp/os/BufferedPort.h>
33 #include <yarp/os/RpcClient.h>
34 #include <yarp/os/PortReport.h>
35 #include <yarp/os/Stamp.h>
36 
37 #include <yarp/sig/Vector.h>
38 #include <yarp/sig/Image.h>
39 
40 #include <yarp/cv/Cv.h>
41 
42 #include <yarp/math/Math.h>
43 
44 #include "CaffeFeatExtractor.hpp"
45 
46 using namespace std;
47 using namespace yarp;
48 using namespace yarp::os;
49 using namespace yarp::sig;
50 using namespace yarp::cv;
51 using namespace yarp::math;
52 
53 #define CMD_HELP yarp::os::createVocab32('h','e','l','p')
54 #define DUMP_CODE yarp::os::createVocab32('d','u','m','p')
55 #define DUMP_STOP yarp::os::createVocab32('s','t','o','p')
56 
57 class CaffeCoderPort: public BufferedPort<Image>
58 {
59 private:
60 
61  // Resource Finder and module options
62 
63  ResourceFinder &rf;
64 
65  string contextPath;
66 
67  bool dump_code;
68 
69  double rate;
70  double last_read;
71 
72  // Data (common to all methods)
73 
74  ::cv::Mat matImg;
75 
76  Port port_out_img;
77  Port port_out_code;
78 
79  FILE *fout_code;
80 
81  mutex mtx;
82 
83  list<double> time_measurements_prep;
84  list<double> time_measurements_net;
85  int time_avg_window;
86 
87  // Data (specific for each method - instantiate only those are needed)
88 
89  CaffeFeatExtractor<float> *caffe_extractor;
90 
91  void onRead(Image &img)
92  {
93  // Read at specified rate
94  if (Time::now() - last_read < rate)
95  return;
96 
97  lock_guard<mutex> lg(mtx);
98 
99  // If something arrived...
100  if (img.width()>0 && img.height()>0)
101  {
102  // Convert the image
103  ImageOf<PixelRgb> tmp_img;
104  tmp_img.copy(img);
105  matImg = toCvMat(tmp_img);
106 
107  // Extract the feature vector
108 
109  std::vector<float> codingVecFloat;
110  float times[2];
111  if (!caffe_extractor->extract_singleFeat_1D(matImg, codingVecFloat, times))
112  {
113  std::cout << "CaffeFeatExtractor::extract_singleFeat_1D(): failed..." << std::endl;
114  return;
115  }
116  std::vector<double> codingVec(codingVecFloat.begin(), codingVecFloat.end());
117 
118  if (caffe_extractor->timing)
119  {
120  time_measurements_prep.push_back(times[0]);
121  time_measurements_net.push_back(times[1]);
122 
123  while (time_measurements_prep.size()>time_avg_window)
124  {
125  time_measurements_prep.pop_front();
126  time_measurements_net.pop_front();
127  }
128 
129  double prep_sum = std::accumulate(time_measurements_prep.begin(), time_measurements_prep.end(), 0.0);
130  double prep_mean = prep_sum / time_measurements_prep.size();
131  double prep_sq_sum = std::inner_product(time_measurements_prep.begin(), time_measurements_prep.end(), time_measurements_prep.begin(), 0.0);
132  double prep_stdev = std::sqrt(prep_sq_sum / time_measurements_prep.size() - prep_mean * prep_mean);
133 
134  double net_sum = std::accumulate(time_measurements_net.begin(), time_measurements_net.end(), 0.0);
135  double net_mean = net_sum / time_measurements_net.size();
136  double net_sq_sum = std::inner_product(time_measurements_net.begin(), time_measurements_net.end(), time_measurements_net.begin(), 0.0);
137  double net_stdev = std::sqrt(net_sq_sum / time_measurements_net.size() - net_mean * net_mean);
138 
139  std::cout << "PREP: " << prep_mean << " - " << prep_stdev << endl;
140  std::cout << "NET: " << net_mean << " - " << net_stdev << endl;
141 
142  std::cout << "time_avg_window = " << time_measurements_prep.size() << endl;
143  }
144 
145  // Dump if required
146  if (dump_code)
147  {
148  fwrite (&codingVec[0], sizeof(double), codingVec.size(), fout_code);
149  }
150 
151  Stamp stamp;
152  this->getEnvelope(stamp);
153 
154  if(port_out_code.getOutputCount())
155  {
156  port_out_code.setEnvelope(stamp);
157  Vector codingYarpVec(codingVec.size(), &codingVec[0]);
158  port_out_code.write(codingYarpVec);
159  }
160 
161  if(port_out_img.getOutputCount())
162  {
163  port_out_img.write(img);
164  }
165  }
166  }
167 
168 public:
169 
170  CaffeCoderPort(ResourceFinder &_rf) :BufferedPort<Image>(),rf(_rf)
171  {
172  // Resource Finder and module options
173  contextPath = rf.getHomeContextPath().c_str();
174 
175  // Data initialization (specific for Caffe method)
176 
177  // Binary file (.caffemodel) containing the network's weights
178  string caffemodel_file = rf.check("caffemodel_file", Value("bvlc_googlenet.caffemodel")).asString().c_str();
179  cout << "Setting .caffemodel file to " << caffemodel_file << endl;
180 
181  // Text file (.prototxt) defining the network structure
182  string prototxt_file = rf.check("prototxt_file", Value(contextPath + "/bvlc_googlenet_val_cutpool5.prototxt")).asString().c_str();
183  cout << "Setting .prototxt file to " << prototxt_file << endl;
184 
185  // Name of blobs to be extracted
186  string blob_name = rf.check("blob_name", Value("pool5/7x7_s1")).asString().c_str();
187  cout << "Setting blob_name to " << blob_name << endl;
188 
189  bool timing = rf.check("timing");
190  if (timing)
191  {
192  time_avg_window = rf.check("timing",Value("1000")).asInt32();
193  }
194  else
195  {
196  time_avg_window = -1;
197  }
198 
199  // Compute mode and eventually GPU ID to be used
200  int device_id;
201  string compute_mode;
202 
203  #ifdef HAS_CUDA
204  compute_mode = rf.check("compute_mode", Value("GPU")).asString();
205  device_id = rf.check("device_id", Value(0)).asInt32();
206  #else
207  compute_mode = "CPU";
208  device_id = -1;
209  #endif
210 
211  int resizeWidth = rf.check("resizeWidth", Value(256)).asFloat64();
212  int resizeHeight = rf.check("resizeHeight", Value(256)).asFloat64();
213 
214  caffe_extractor = NULL;
215  caffe_extractor = new CaffeFeatExtractor<float>(caffemodel_file,
216  prototxt_file, resizeWidth, resizeHeight,
217  blob_name,
218  compute_mode,
219  device_id,
220  timing);
221 
222  // Data (common to all methods)
223  string name = rf.find("name").asString().c_str();
224 
225  port_out_img.open(("/"+name+"/img:o").c_str());
226  port_out_code.open(("/"+name+"/code:o").c_str());
227 
228  BufferedPort<Image>::useCallback();
229 
230  rate = rf.check("rate",Value(0.0)).asFloat64();
231  last_read = 0.0;
232 
233  dump_code = rf.check("dump_code");
234  if(dump_code)
235  {
236  string code_path = rf.check("dump_code",Value("codes.bin")).asString().c_str();
237  code_path = contextPath + "/" + code_path;
238  string code_write_mode = rf.check("append")?"wb+":"wb";
239 
240  fout_code = fopen(code_path.c_str(),code_write_mode.c_str());
241  }
242  }
243 
244  void interrupt()
245  {
246  lock_guard<mutex> lg(mtx);
247 
248  port_out_code.interrupt();
249  port_out_img.interrupt();
250 
251  BufferedPort<Image>::interrupt();
252  }
253 
254  void resume()
255  {
256  lock_guard<mutex> lg(mtx);
257 
258  port_out_code.resume();
259  port_out_img.resume();
260 
261  BufferedPort<Image>::resume();
262  }
263 
264  void close()
265  {
266  lock_guard<mutex> lg(mtx);
267 
268  if (dump_code)
269  {
270  fclose(fout_code);
271  }
272 
273  port_out_code.close();
274  port_out_img.close();
275 
276  BufferedPort<Image>::close();
277  }
278 
279  bool execReq(const Bottle &command, Bottle &reply)
280  {
281  switch(command.get(0).asVocab32())
282  {
283  case(CMD_HELP):
284  {
285  reply.clear();
286  reply.add(Value::makeVocab32("many"));
287  reply.addString("[dump] [path-to-file] [a] to start dumping the codes in the context directory. Use 'a' for appending.");
288  reply.addString("[stop] to stop dumping.");
289  return true;
290  }
291 
292  case(DUMP_CODE):
293  {
294  lock_guard<mutex> lg(mtx);
295 
296  dump_code = true;
297  string code_path;
298  string code_write_mode;
299 
300  if (command.size()==1)
301  {
302  code_path = contextPath + "/codes.bin";
303  code_write_mode="wb";
304  }
305  else if (command.size()==2)
306  {
307  if (strcmp(command.get(1).asString().c_str(),"a")==0)
308  {
309  code_write_mode="wb+";
310  code_path = contextPath + "/codes.bin";
311  } else
312  {
313  code_write_mode="wb";
314  code_path = command.get(1).asString().c_str();
315  }
316  } else if (command.size()==3)
317  {
318  code_write_mode="wb+";
319  code_path = command.get(2).asString().c_str();
320  }
321 
322  fout_code = fopen(code_path.c_str(),code_write_mode.c_str());
323  reply.addString("Start dumping codes...");
324 
325  return true;
326  }
327 
328  case(DUMP_STOP):
329  {
330  lock_guard<mutex> lg(mtx);
331 
332  dump_code = false;
333  fclose(fout_code);
334  reply.addString("Stopped code dump.");
335 
336  return true;
337  }
338 
339  default:
340  return false;
341  }
342  }
343 };
344 
345 
346 class CaffeCoderModule: public RFModule
347 {
348 protected:
349  CaffeCoderPort *caffePort;
350  Port rpcPort;
351 
352 public:
353 
354  CaffeCoderModule()
355  {
356  caffePort=NULL;
357  }
358 
359  bool configure(ResourceFinder &rf)
360  {
361  string name = rf.find("name").asString().c_str();
362 
363  caffePort = new CaffeCoderPort(rf);
364 
365  caffePort->open(("/"+name+"/img:i").c_str());
366 
367  rpcPort.open(("/"+name+"/rpc").c_str());
368  attach(rpcPort);
369 
370  return true;
371  }
372 
373  bool interruptModule()
374  {
375  if (caffePort!=NULL)
376  caffePort->interrupt();
377 
378  rpcPort.interrupt();
379 
380  return true;
381  }
382 
383  bool close()
384  {
385  if(caffePort!=NULL)
386  {
387  caffePort->close();
388  delete caffePort;
389  }
390 
391  rpcPort.close();
392 
393  return true;
394  }
395 
396  bool respond(const Bottle &command, Bottle &reply)
397  {
398  if (caffePort->execReq(command,reply))
399  return true;
400  else
401  return RFModule::respond(command,reply);
402  }
403 
404  double getPeriod() { return 1.0; }
405 
406  bool updateModule()
407  {
408  //caffePort->update();
409 
410  return true;
411  }
412 };
413 
414 
415 int main(int argc, char *argv[])
416 {
417  Network yarp;
418  if (!yarp.checkNetwork())
419  return 1;
420 
421  ResourceFinder rf;
422  rf.setDefaultContext("himrep");
423  rf.setDefaultConfigFile("caffeCoder.ini");
424  rf.configure(argc,argv);
425  rf.setDefault("name","caffeCoder");
426 
427  CaffeCoderModule mod;
428  return mod.runModule(rf);
429 }