19 #ifndef __VSURFACEHANDLER__ 20 #define __VSURFACEHANDLER__ 22 #include <yarp/os/all.h> 23 #include "event-driven/vBottle.h" 24 #include "event-driven/vCodec.h" 25 #include "event-driven/vWindow_basic.h" 26 #include "event-driven/vWindow_adv.h" 27 #include "event-driven/vFilters.h" 28 #include "event-driven/vPort.h" 40 std::deque<ev::vQueue *> qq;
41 std::deque<yarp::os::Stamp> sq;
43 yarp::os::Semaphore dataready;
46 unsigned int delay_nv;
47 long unsigned int delay_t;
70 for(std::deque<ev::vQueue *>::iterator i = qq.begin(); i != qq.end(); i++)
83 if(qlimit && qq.size() >= qlimit) {
87 qq.push_back(
new vQueue);
88 yarp::os::Stamp yarpstamp;
89 getEnvelope(yarpstamp);
90 sq.push_back(yarpstamp);
100 delay_nv += qq.back()->size();
101 int dt = qq.back()->back()->stamp - qq.back()->front()->stamp;
105 event_rate = qq.back()->size() / (double)dt;
113 ev::vQueue*
read(yarp::os::Stamp &yarpstamp)
115 static vQueue * working_queue =
nullptr;
119 delay_nv -= qq.front()->size();
120 int dt = qq.front()->back()->stamp - qq.front()->front()->stamp;
131 yarpstamp = sq.front();
132 working_queue = qq.front();
133 return working_queue;
146 delay_nv -= qq.front()->size();
147 int dt = qq.front()->back()->stamp - qq.front()->front()->stamp;
161 qlimit = number_of_qs;
195 std::string delayStatString()
197 std::ostringstream oss;
216 yarp::os::Stamp yarpstamp;
230 void configure(
int height,
int width)
236 bool open(std::string portname)
238 if(!allocatorCallback.open(portname))
246 allocatorCallback.close();
255 while(!q && !isStopping()) {
256 q = allocatorCallback.
read(yarpstamp);
258 if(isStopping())
break;
260 for(ev::vQueue::iterator qi = q->begin(); qi != q->end(); qi++) {
266 ctime = (*qi)->stamp;
268 if((*qi)->getChannel() == 0)
269 surfaceLeft.fastAddEvent(*qi);
270 else if((*qi)->getChannel() == 1)
271 surfaceRight.fastAddEvent(*qi);
273 std::cout <<
"Unknown channel" << std::endl;
285 yarp::os::Stamp queryROI(ev::vQueue &fillq,
int c,
unsigned int t,
int x,
int y,
int r)
292 fillq = surfaceLeft.getSurf_Tlim(t, x, y, r);
294 fillq = surfaceRight.getSurf_Tlim(t, x, y, r);
300 yarp::os::Stamp queryWindow(ev::vQueue &fillq,
int c,
unsigned int t)
305 fillq = surfaceLeft.getSurf_Tlim(t);
307 fillq = surfaceRight.getSurf_Tlim(t);
314 unsigned int queryVTime()
334 yarp::os::Stamp ystamp;
349 cpudelayL = cpudelayR = 0;
350 cputimeL = cputimeR = yarp::os::Time::now();
354 void configure(
int height,
int width,
double maxcpudelay)
357 surfaceleft.initialise(height, width);
358 surfaceright.initialise(height, width);
361 bool open(std::string portname)
363 if(!allocatorCallback.open(portname))
372 allocatorCallback.close();
379 static int maxqs = 4;
380 bool allowproc =
true;
385 while(!q && !isStopping()) {
386 q = allocatorCallback.
read(ystamp);
388 if(isStopping())
break;
401 int dt = q->back()->stamp - vstamp;
405 vstamp = q->back()->stamp;
407 for(ev::vQueue::iterator qi = q->begin(); qi != q->end(); qi++) {
409 if((*qi)->getChannel() == 0)
410 surfaceleft.addEvent(*qi);
411 else if((*qi)->getChannel() == 1)
412 surfaceright.addEvent(*qi);
425 vQueue queryROI(
int channel,
int numEvts,
int r)
431 double cpunow = yarp::os::Time::now();
438 if(cpudelayL < 0) cpudelayL = 0;
439 if(cpudelayL > maxcpudelay) {
440 yWarning() <<
"CPU delay hit maximum";
441 cpudelayL = maxcpudelay;
444 surfaceleft.getSurfaceN(q, cpudelayL, numEvts, r);
451 if(cpudelayR < 0) cpudelayR = 0;
452 if(cpudelayR > maxcpudelay) {
453 yWarning() <<
"CPU delay hit maximum";
454 cpudelayR = maxcpudelay;
457 surfaceright.getSurfaceN(q, cpudelayR, numEvts, r);
465 vQueue queryROI(
int channel,
unsigned int querySize,
int x,
int y,
int r)
473 double cpunow = yarp::os::Time::now();
480 if(cpudelayL < 0) cpudelayL = 0;
481 if(cpudelayL > maxcpudelay) {
482 yWarning() <<
"CPU delay hit maximum";
483 cpudelayL = maxcpudelay;
486 q = surfaceleft.getSurface(cpudelayL, querySize, r, x, y);
492 if(cpudelayR < 0) cpudelayR = 0;
493 if(cpudelayR > maxcpudelay) {
494 yWarning() <<
"CPU delay hit maximum";
495 cpudelayR = maxcpudelay;
498 q = surfaceright.getSurface(cpudelayR, querySize, r, x, y);
506 vQueue queryWindow(
int channel,
unsigned int querySize)
512 double cpunow = yarp::os::Time::now();
519 if(cpudelayL < 0) cpudelayL = 0;
520 if(cpudelayL > maxcpudelay) {
521 yWarning() <<
"CPU delay hit maximum";
522 cpudelayL = maxcpudelay;
525 q = surfaceleft.getSurface(cpudelayL, querySize);
532 if(cpudelayR < 0) cpudelayR = 0;
533 if(cpudelayR > maxcpudelay) {
534 yWarning() <<
"CPU delay hit maximum";
535 cpudelayR = maxcpudelay;
538 q = surfaceright.getSurface(cpudelayR, querySize);
546 double queryDelay(
int channel = 0)
555 yarp::os::Stamp queryYstamp()
560 int queryVstamp(
int channel = 0)
565 modvstamp = vstamp - cpudelayR;
567 modvstamp = vstamp - cpudelayL;
594 yarp::os::Mutex safety;
596 int strictUpdatePeriod;
598 yarp::os::Mutex waitforquery;
599 yarp::os::Stamp yarpstamp;
608 strictUpdatePeriod = 0;
613 bool open(std::string portname,
int period = 0)
615 strictUpdatePeriod = period;
616 if(strictUpdatePeriod) yInfo() <<
"Forced update every" << period *
vtsHelper::tsscaler <<
"s, or"<< period <<
"event timestamps";
617 if(!allocatorCallback.open(portname))
625 allocatorCallback.close();
627 waitforquery.unlock();
632 if(strictUpdatePeriod) {
637 while(!isStopping()) {
640 const ev::vQueue *q = allocatorCallback.
read(yarpstamp);
643 if(!strictUpdatePeriod) safety.lock();
645 if(!ctime) ctime = q->front()->stamp;
647 for(ev::vQueue::const_iterator qi = q->begin(); qi != q->end(); qi++) {
648 if((*qi)->getChannel() == 0)
649 windowleft.addEvent(*qi);
650 else if((*qi)->getChannel() == 1)
651 windowright.addEvent(*qi);
654 if(strictUpdatePeriod) {
655 int dt = q->back()->stamp - ctime;
658 if(currentPeriod > strictUpdatePeriod) {
667 ctime = q->back()->stamp;
671 if(!strictUpdatePeriod) safety.unlock();
674 if(strictUpdatePeriod)
678 vQueue queryWindow(
int channel)
685 q = windowleft.getWindow();
687 q = windowright.getWindow();
689 waitforquery.unlock();
694 void queryStamps(yarp::os::Stamp &yStamp,
int &vStamp)
705 unsigned int queryUnprocd()
710 std::string readDelayStats()
712 return allocatorCallback.delayStatString();
723 std::map<std::string, ev::tWinThread> iPorts;
725 yarp::os::Stamp yStamp;
727 int strictUpdatePeriod;
728 bool using_yarp_stamps;
735 strictUpdatePeriod = 0;
737 using_yarp_stamps =
false;
740 bool open(std::string moduleName, std::string eventType)
743 if(iPorts.count(eventType))
747 if(!iPorts[eventType].open(moduleName +
"/" + eventType +
":i", strictUpdatePeriod))
753 vQueue queryWindow(std::string vType,
int channel)
757 return iPorts[vType].queryWindow(channel);
763 yarp::os::Stamp ys;
int vs;
764 std::map<std::string, ev::tWinThread>::iterator i;
765 for(i = iPorts.begin(); i != iPorts.end(); i++) {
766 i->second.queryStamps(ys, vs);
769 if(!using_yarp_stamps) vStamp = vs;
770 if(!ys.isValid())
continue;
773 using_yarp_stamps =
true;
774 double pt = yStamp.getTime();
775 double ct = ys.getTime();
777 if(ct > pt || ct < pt - 5.0) {
786 std::map<std::string, ev::tWinThread>::iterator i;
787 for(i = iPorts.begin(); i != iPorts.end(); i++)
791 yarp::os::Stamp getystamp()
801 void setStrictUpdatePeriod(
int period)
803 strictUpdatePeriod = period;
808 if(strictUpdatePeriod)
return true;
809 std::map<std::string, ev::tWinThread>::iterator i;
810 for(i = iPorts.begin(); i != iPorts.end(); i++)
811 if(i->second.queryUpdated())
return true;
815 unsigned int queryMaxUnproced()
817 unsigned int unprocd = 0;
818 std::map<std::string, ev::tWinThread>::iterator i;
819 for(i = iPorts.begin(); i != iPorts.end(); i++)
820 unprocd = std::max(i->second.queryUnprocd(), unprocd);
824 std::string delayStats()
826 std::ostringstream oss;
827 std::map<std::string, ev::tWinThread>::iterator i;
828 for(i = iPorts.begin(); i != iPorts.end(); i++)
829 oss << i->first <<
": " << i->second.readDelayStats() <<
" ";
yarp::os::Bottle wrapper for sending events through the yarp system with ensuring compatibility with ...
Definition: vBottle.h:33
static double tsscaler
a multiplier to convert an event timestamp to seconds
Definition: vtsHelper.h:41
void scrapQ()
remove the most recently read vQueue from the list and deallocate the memory
Definition: vSurfaceHandlerTh.h:142
automatically accept multiple event types from different ports (e.g. as in the vFramer) ...
Definition: vSurfaceHandlerTh.h:719
an event with a pixel location, camera number and polarity
Definition: vCodec.h:103
queueAllocator()
constructor
Definition: vSurfaceHandlerTh.h:53
asynchronously read events and push them in a vSurface
Definition: vSurfaceHandlerTh.h:206
void onRead(ev::vBottle &inputbottle)
the callback decodes the incoming vBottle and adds it to the list of received vBottles. The yarp, and event timestamps are updated.
Definition: vSurfaceHandlerTh.h:78
void addtoendof(vQueue &q)
add a specific event-type from the vBottle to the end of a vQueue
Definition: vBottle.h:124
unsigned int queryunprocessed()
ask for the number of vQueues currently allocated.
Definition: vPort.h:558
~queueAllocator()
desctructor
Definition: vSurfaceHandlerTh.h:67
asynchronously read events and push them in a historicalSurface
Definition: vSurfaceHandlerTh.h:322
static unsigned int max_stamp
the maximum value of the timestamp before a wrap occurs
Definition: vtsHelper.h:39
a surface that can be queried at any time in the past.
Definition: vWindow_adv.h:224
void setQLimit(unsigned int number_of_qs)
set the maximum number of qs that can be stored in the buffer. A value of 0 keeps all qs...
Definition: vSurfaceHandlerTh.h:159
ev::vQueue * read(yarp::os::Stamp &yarpstamp)
ask for a pointer to the next vQueue. Blocks if no data is ready.
Definition: vSurfaceHandlerTh.h:113
double queryDelayT()
ask for the total time spanned by all vQueues.
Definition: vSurfaceHandlerTh.h:184
static double vtsscaler
a multiplier to convert seconds to an event timestamp
Definition: vtsHelper.h:43
unsigned int queryDelayN()
ask for the number of events in all vQueues.
Definition: vSurfaceHandlerTh.h:178
double queryRate()
ask for the high precision event rate
Definition: vSurfaceHandlerTh.h:190
void releaseDataLock()
unBlocks the blocking call in getNextQ. Useful to ensure a graceful shutdown. No guarantee the return...
Definition: vSurfaceHandlerTh.h:166
a spatio-temporal surface storing events for a limited time
Definition: vWindow_adv.h:123
an asynchronous reading port that accepts vBottles and decodes them
Definition: vSurfaceHandlerTh.h:36
store events for a fixed amount of time in a vQueue
Definition: vWindow_basic.h:88
const T * read(yarp::os::Stamp &yarpstamp, bool wait=true)
ask for a pointer to the next vQueue. if wait is true Blocks if no data is ready. ...
Definition: vPort.h:498
int queryunprocessed()
ask for the number of vQueues currently allocated.
Definition: vSurfaceHandlerTh.h:172
automatically accept events from a port and push them into a vTempWindow
Definition: vSurfaceHandlerTh.h:585