266 #include <yarp/os/all.h>
269 using namespace yarp::os;
271 #define CMD_ADD createVocab32('a','d','d')
272 #define CMD_DEL createVocab32('d','e','l')
273 #define CMD_GET createVocab32('g','e','t')
274 #define CMD_SET createVocab32('s','e','t')
275 #define CMD_LOCK createVocab32('l','o','c','k')
276 #define CMD_UNLOCK createVocab32('u','n','l','o')
277 #define CMD_OWNER createVocab32('o','w','n','e')
278 #define CMD_TIME createVocab32('t','i','m','e')
279 #define CMD_DUMP createVocab32('d','u','m','p')
280 #define CMD_ASK createVocab32('a','s','k')
281 #define CMD_SYNC createVocab32('s','y','n','c')
282 #define CMD_ASYNC createVocab32('a','s','y','n')
283 #define CMD_QUIT createVocab32('q','u','i','t')
284 #define CMD_BYE createVocab32('b','y','e')
286 #define REP_ACK createVocab32('a','c','k')
287 #define REP_NACK createVocab32('n','a','c','k')
288 #define REP_UNKNOWN createVocab32('u','n','k','n')
290 #define OPT_ALL createVocab32('a','l','l')
291 #define OPT_DISABLED (-1.0)
292 #define OPT_OWNERSHIP_ALL ("all")
294 #define PROP_ID ("id")
295 #define PROP_LIFETIMER ("lifeTimer")
296 #define PROP_SET ("propSet")
297 #define BCTAG_EMPTY ("empty")
298 #define BCTAG_SYNC ("sync")
299 #define BCTAG_ASYNC ("async")
315 if (a.isFloat64() && b.isFloat64())
316 return (a.asFloat64()>b.asFloat64());
317 else if (a.isInt32() && b.isInt32())
318 return (a.asInt32()>b.asInt32());
327 if (a.isFloat64() && b.isFloat64())
328 return (a.asFloat64()>=b.asFloat64());
329 else if (a.isInt32() && b.isInt32())
330 return (a.asInt32()>=b.asInt32());
339 if (a.isFloat64() && b.isFloat64())
340 return (a.asFloat64()<b.asFloat64());
341 else if (a.isInt32() && b.isInt32())
342 return (a.asInt32()<b.asInt32());
351 if (a.isFloat64() && b.isFloat64())
352 return (a.asFloat64()<=b.asFloat64());
353 else if (a.isInt32() && b.isInt32())
354 return (a.asInt32()<=b.asInt32());
363 if (a.isFloat64() && b.isFloat64())
364 return (a.asFloat64()==b.asFloat64());
365 else if (a.isInt32() && b.isInt32())
366 return (a.asInt32()==b.asInt32());
367 else if (a.isString() && b.isString())
369 string aStr=a.asString();
370 string bStr=b.asString();
381 if (a.isFloat64() && b.isFloat64())
382 return (a.asFloat64()!=b.asFloat64());
383 else if (a.isInt32() && b.isInt32())
384 return (a.asInt32()!=b.asInt32());
385 else if (a.isString() && b.isString())
387 string aStr=a.asString();
388 string bStr=b.asString();
418 bool (*compare)(Value&,Value&);
436 for (map<int,Item>::iterator it=itemsMap.begin(); it!=itemsMap.end(); it++)
437 delete it->second.prop;
445 delete it->second.prop;
453 for (map<int,Item>::iterator it=itemsMap.begin(); it!=itemsMap.end(); it++)
454 fprintf(stream,
"item_%d (%s %d) (%s)\n",
455 i++,
PROP_ID,it->first,it->second.prop->toString().c_str());
460 deque<string> &opList,
const unsigned int i=0)
463 if (item->check(condList[i].prop))
466 Value &val=item->find(condList[i].prop);
469 result=(*condList[i].compare)(val,condList[i].val);
474 if ((i+1)>=condList.size())
476 else if (opList[i]==
"||")
477 return (result||recursiveCheck(item,condList,opList,i+1));
481 for (j=i+1; j<condList.size(); j++)
485 if (item->check(condList[j].prop))
487 Value &val=item->find(condList[j].prop);
488 result=result&&(*condList[j].compare)(val,condList[j].val);
499 if (j>=condList.size())
502 return (result||recursiveCheck(item,condList,opList,j+1));
517 asyncBroadcast=
false;
540 yWarning(
"database already initialized ...");
544 nosavedb=rf.check(
"no-save-db");
545 if (!rf.check(
"no-load-db"))
550 yInfo(
"database ready ...");
552 if (rf.check(
"sync-bc"))
554 setPeriod(rf.check(
"sync-bc",Value(1.0)).asFloat64());
558 asyncBroadcast=rf.check(
"async-bc");
564 pBroadcastPort=&broadcastPort;
570 string dbFileName=rf->findFile(
"db");
571 if (dbFileName.empty())
573 yWarning(
"requested database to be loaded not found!");
577 yInfo(
"loading database from %s ...",dbFileName.c_str());
579 lock_guard<mutex> lck(mtx);
583 Property finProperty;
584 finProperty.fromConfigFile(dbFileName);
586 Bottle finBottle; finBottle.read(finProperty);
587 for (
int i=0; i<finBottle.size(); i++)
591 Bottle &b1=finBottle.findGroup(tag.str());
598 yWarning(
"error while loading %s!",tag.str().c_str());
602 Bottle *b2=b1.get(1).asList();
603 Bottle *b3=b1.get(2).asList();
604 if ((b2==NULL) || (b3==NULL))
606 yWarning(
"error while loading %s!",tag.str().c_str());
612 yWarning(
"error while loading %s!",tag.str().c_str());
616 int id=b2->get(1).asInt32();
617 itemsMap[id].prop=
new Property(b3->toString().c_str());
623 yInfo(
"database loaded");
632 lock_guard<mutex> lck(mtx);
633 string dbFileName=rf->getHomeContextPath();
635 dbFileName+=rf->find(
"db").asString();
636 yInfo(
"saving database in %s ...",dbFileName.c_str());
638 FILE *fout=fopen(dbFileName.c_str(),
"w");
642 yInfo(
"database stored");
648 lock_guard<mutex> lck(mtx);
649 yInfo(
"dumping database content ...");
651 if (itemsMap.size()==0)
660 if (pBroadcastPort!=NULL)
662 if (pBroadcastPort->getOutputCount()>0)
664 lock_guard<mutex> lck(mtx);
665 Bottle &bottle=pBroadcastPort->prepare();
668 bottle.addString(type);
669 if (itemsMap.empty())
671 else for (map<int,Item>::iterator it=itemsMap.begin(); it!=itemsMap.end(); it++)
673 Bottle &item=bottle.addList();
674 item.read(*it->second.prop);
676 Bottle &idList=item.addList();
678 idList.addInt32(it->first);
681 pBroadcastPort->writeStrict();
694 yWarning(
"%s field cannot be specified as a property!",
PROP_ID);
698 lock_guard<mutex> lck(mtx);
699 itemsMap[idCnt].prop=
new Property(content->toString().c_str());
700 itemsMap[idCnt].lastUpdate=Time::now();
711 if (content->size()==1)
713 if (content->get(0).isVocab32() || content->get(0).isString())
715 if (content->get(0).asVocab32()==
OPT_ALL)
717 lock_guard<mutex> lck(mtx);
719 yInfo(
"database cleared");
727 yWarning(
"%s field not present within the request!",
PROP_ID);
731 int id=content->find(
PROP_ID).asInt32();
733 lock_guard<mutex> lck(mtx);
734 map<int,Item>::iterator it=itemsMap.find(
id);
735 if (it!=itemsMap.end())
737 Bottle *propSet=content->find(
PROP_SET).asList();
740 for (
int i=0; i<propSet->size(); i++)
741 it->second.prop->unput(propSet->get(i).asString());
743 it->second.lastUpdate=Time::now();
755 bool get(Bottle *content, Bottle &response)
762 yWarning(
"%s field not present within the request!",
PROP_ID);
766 int id=content->find(
PROP_ID).asInt32();
768 lock_guard<mutex> lck(mtx);
769 map<int,Item>::iterator it=itemsMap.find(
id);
770 if (it!=itemsMap.end())
772 Property *pProp=it->second.prop;
775 Bottle *propSet=content->find(
PROP_SET).asList();
779 for (
int i=0; i<propSet->size(); i++)
781 string propName=propSet->get(i).asString();
782 if (pProp->check(propName))
783 prop.put(propName,pProp->find(propName));
789 response.read(*pProp);
798 bool set(Bottle *content,
const string &agent)
805 yWarning(
"%s field not present within the request!",
PROP_ID);
809 int id=content->find(
PROP_ID).asInt32();
811 lock_guard<mutex> lck(mtx);
812 map<int,Item>::iterator it=itemsMap.find(
id);
813 if (it!=itemsMap.end())
815 string owner=it->second.owner;
818 Property *pProp=it->second.prop;
819 for (
int i=0; i<content->size(); i++)
821 if (Bottle *option=content->get(i).asList())
823 if (option->size()<2)
826 string prop=option->get(0).asString();
827 Value val=option->get(1);
833 pProp->put(prop,val);
839 it->second.lastUpdate=Time::now();
848 bool lock(Bottle *content,
const string &agent)
855 yWarning(
"%s field not present within the request!",
PROP_ID);
859 int id=content->find(
PROP_ID).asInt32();
861 lock_guard<mutex> lck(mtx);
862 map<int,Item>::iterator it=itemsMap.find(
id);
863 if (it!=itemsMap.end())
865 string owner=it->second.owner;
868 it->second.owner=agent;
877 bool unlock(Bottle *content,
const string &agent)
884 yWarning(
"%s field not present within the request!",
PROP_ID);
888 int id=content->find(
PROP_ID).asInt32();
890 lock_guard<mutex> lck(mtx);
891 map<int,Item>::iterator it=itemsMap.find(
id);
892 if (it!=itemsMap.end())
894 string owner=it->second.owner;
906 bool owner(Bottle *content, Bottle &response)
913 yWarning(
"%s field not present within the request!",
PROP_ID);
917 int id=content->find(
PROP_ID).asInt32();
919 lock_guard<mutex> lck(mtx);
920 map<int,Item>::iterator it=itemsMap.find(
id);
921 if (it!=itemsMap.end())
924 string &owner=it->second.owner;
925 response.addString(owner);
933 bool time(Bottle *content, Bottle &response)
940 yWarning(
"%s field not present within the request!",
PROP_ID);
944 int id=content->find(
PROP_ID).asInt32();
946 lock_guard<mutex> lck(mtx);
947 map<int,Item>::iterator it=itemsMap.find(
id);
948 if (it!=itemsMap.end())
951 if (it->second.lastUpdate<0.0)
952 response.addFloat64(it->second.lastUpdate);
955 double dt=Time::now()-it->second.lastUpdate;
956 response.addFloat64(dt);
965 bool ask(Bottle *content, Bottle &response)
970 lock_guard<mutex> lck(mtx);
971 if (content->size()==1)
973 if (content->get(0).isVocab32() || content->get(0).isString())
975 if (content->get(0).asVocab32()==
OPT_ALL)
979 for (map<int,Item>::iterator it=itemsMap.begin(); it!=itemsMap.end(); it++)
980 response.addInt32(it->first);
987 deque<Condition> condList;
988 deque<string> opList;
992 if (!(content->size()&0x01))
994 yWarning(
"uncorrect conditions received!");
999 for (
int i=0; i<content->size(); i+=2)
1001 if (Bottle *b=content->get(i).asList())
1008 condition.
prop=b->get(0).asString();
1011 else if (b->size()>2)
1013 condition.
prop=b->get(0).asString();
1014 operation=b->get(1).asString();
1015 condition.
val=b->get(2);
1019 else if (operation==
">=")
1021 else if (operation==
"<")
1023 else if (operation==
"<=")
1025 else if (operation==
"==")
1027 else if (operation==
"!=")
1031 yWarning(
"unknown relational operator '%s'!",operation.c_str());
1037 yWarning(
"wrong condition given!");
1041 condList.push_back(condition);
1043 if ((i+1)<content->size())
1045 operation=content->get(i+1).asString();
1046 if ((operation!=
"||") && (operation!=
"&&"))
1048 yWarning(
"unknown boolean operator '%s'!",operation.c_str());
1052 opList.push_back(operation);
1057 yWarning(
"wrong condition given!");
1065 for (map<int,Item>::iterator it=itemsMap.begin(); it!=itemsMap.end(); it++)
1069 if (recursiveCheck(it->second.prop,condList,opList))
1070 response.addInt32(it->first);
1081 for (map<int,Item>::iterator it=itemsMap.begin(); it!=itemsMap.end(); it++)
1083 Property *pProp=it->second.prop;
1102 if (asyncBroadcast && erased)
1113 void respond(ConnectionReader &connection,
const Bottle &command, Bottle &reply)
1115 string agent=connection.getRemoteContact().getName();
1116 if (command.size()<1)
1123 int cmd=command.get(0).asVocab32();
1129 if (command.size()<2)
1135 Bottle *content=command.get(1).asList();
1139 Bottle &b=reply.addList();
1156 if (command.size()<2)
1162 Bottle *content=command.get(1).asList();
1163 if (remove(content))
1178 if (command.size()<2)
1185 Bottle *content=command.get(1).asList();
1186 if (get(content,response))
1189 reply.addList()=response;
1200 if (command.size()<2)
1206 Bottle *content=command.get(1).asList();
1207 if (set(content,agent))
1222 if (command.size()<2)
1228 Bottle *content=command.get(1).asList();
1229 if (lock(content,agent))
1240 if (command.size()<2)
1246 Bottle *content=command.get(1).asList();
1247 if (unlock(content,agent))
1258 if (command.size()<2)
1265 Bottle *content=command.get(1).asList();
1266 if (owner(content,response))
1269 reply.addList()=response;
1280 if (command.size()<2)
1287 Bottle *content=command.get(1).asList();
1288 if (
time(content,response))
1291 reply.addList()=response;
1310 if (command.size()<2)
1316 int opt=command.get(1).asVocab32();
1317 if (opt==Vocab32::encode(
"start"))
1319 if (command.size()>=3)
1320 setPeriod(command.get(2).asFloat64());
1324 else if (isSuspended())
1329 else if (opt==Vocab32::encode(
"stop"))
1331 if (isRunning() && !isSuspended())
1345 if (command.size()<2)
1351 int opt=command.get(1).asVocab32();
1352 if (opt==Vocab32::encode(
"on"))
1354 asyncBroadcast=
true;
1357 else if (opt==Vocab32::encode(
"off"))
1359 asyncBroadcast=
false;
1371 if (command.size()<2)
1378 Bottle *content=command.get(1).asList();
1379 if (ask(content,response))
1382 Bottle &b=reply.addList();
1384 b.addList()=response;
1404 yWarning(
"received unknown command!");
1413 if (content.size()==0)
1416 string type=content.get(0).asString();
1426 for (
int i=1; i<content.size(); i++)
1428 if (Bottle *item=content.get(i).asList())
1430 if (Bottle *idList=item->get(0).asList())
1432 if (idList->size()==2)
1434 if (idList->get(0).asString()==
PROP_ID)
1436 int id=idList->get(1).asInt32();
1437 itemsMap[id].prop=
new Property(item->tail().toString().c_str());
1467 bool read(ConnectionReader &connection)
1470 if (!command.read(connection) || (pDataBase==NULL))
1474 double t0=Time::now();
1475 pDataBase->
respond(connection,command,reply);
1476 cumTime+=Time::now()-t0;
1479 if (ConnectionWriter *writer=connection.getWriter())
1480 reply.write(*writer);
1497 pDataBase=&dataBase;
1501 void getStats(
unsigned int &nCalls,
double &cumTime)
const
1503 nCalls=this->nCalls;
1504 cumTime=this->cumTime;
1518 if (pDataBase!=NULL)
1519 pDataBase->
modify(content);
1529 pDataBase=&dataBase;
1542 BufferedPort<Bottle> bcPort;
1546 unsigned int nCallsOld;
1554 stats=rf.check(
"stats");
1556 string name=rf.check(
"name",Value(
"objectsPropertiesCollector")).asString();
1559 rpcPort.setReader(rpcProcessor);
1561 modifyPort.useCallback();
1562 rpcPort.open(
"/"+name+
"/rpc");
1563 bcPort.open(
"/"+name+
"/broadcast:o");
1564 modifyPort.open(
"/"+name+
"/modify:i");
1576 modifyPort.disableCallback();
1579 rpcPort.interrupt();
1581 modifyPort.interrupt();
1596 if ((++cnt)*getPeriod()>(15.0*60.0))
1604 unsigned int nCalls;
double cumTime;
1605 rpcProcessor.
getStats(nCalls,cumTime);
1607 unsigned int calls=nCalls-nCallsOld;
1608 double timeSpent=cumTime-cumTimeOld;
1609 yInfo(
"*** Statistics: received %d/%g [requests/s]; %g [ms/request] spent on average",
1610 calls,getPeriod(),timeSpent==0.0?0.0:(1e3*timeSpent)/(
double)calls);
1633 rf.setDefaultContext(
"objectsPropertiesCollector");
1634 rf.setDefault(
"db",
"dataBase.ini");
1635 rf.configure(
argc,argv);
1637 if (rf.check(
"help"))
1639 printf(
"Options\n");
1640 printf(
"\t--name <name>: collector name (default: objectsPropertiesCollector)\n");
1641 printf(
"\t--db <fileName>: database file name to load at startup/save at shutdown (default: dataBase.ini)\n");
1642 printf(
"\t--context <context>: context to search for database file (default: objectsPropertiesCollector)\n");
1643 printf(
"\t--no-load-db : start an empty database\n");
1644 printf(
"\t--no-save-db : prevent from saving the content of database at shutdown\n");
1645 printf(
"\t--sync-bc <T>: broadcast the database content each T seconds\n");
1646 printf(
"\t--async-bc : broadcast the database content whenever a change occurs\n");
1647 printf(
"\t--stats : enable statistics printouts\n");
1652 if (!
yarp.checkNetwork())
1654 yError(
"YARP server not available!");
1659 return collector.runModule(rf);
void setDataBase(DataBase &dataBase)
void onRead(Bottle &content)
BufferedPort< Bottle > * pBroadcastPort
map< int, Item > itemsMap
bool owner(Bottle *content, Bottle &response)
void broadcast(const string &type)
bool add(Bottle *content)
bool modify(const Bottle &content)
bool remove(Bottle *content)
void respond(ConnectionReader &connection, const Bottle &command, Bottle &reply)
bool time(Bottle *content, Bottle &response)
bool get(Bottle *content, Bottle &response)
bool recursiveCheck(Property *item, deque< Condition > &condList, deque< string > &opList, const unsigned int i=0)
bool unlock(Bottle *content, const string &agent)
void setBroadcastPort(BufferedPort< Bottle > &broadcastPort)
bool set(Bottle *content, const string &agent)
void periodicHandler(const double dt)
void configure(ResourceFinder &rf)
void eraseItem(map< int, Item >::iterator &it)
bool ask(Bottle *content, Bottle &response)
bool lock(Bottle *content, const string &agent)
void getStats(unsigned int &nCalls, double &cumTime) const
bool read(ConnectionReader &connection)
void setDataBase(DataBase &dataBase)
bool configure(ResourceFinder &rf)
int main(int argc, char *argv[])
#define OPT_OWNERSHIP_ALL
bool equal(Value &a, Value &b)
bool lower(Value &a, Value &b)
bool greater(Value &a, Value &b)
bool alwaysTrue(Value &a, Value &b)
bool greaterEqual(Value &a, Value &b)
bool notEqual(Value &a, Value &b)
bool lowerEqual(Value &a, Value &b)
Q15 add(const Q15 a, const Q15 b)
bool write(const std::string filename, const FullRegulation ®)
Copyright (C) 2008 RobotCub Consortium.
bool(* compare)(Value &, Value &)