35 #include <yarp/os/Network.h>
36 #include <yarp/os/NetType.h>
37 #include <yarp/conf/environment.h>
40 #include <yarp/os/Log.h>
41 #include <yarp/os/LogStream.h>
67 rateofthread = raterx;
68 yDebug() <<
"EthReceiver is a PeriodicThread with rxrate =" << rateofthread <<
"ms";
80 #ifdef NETWORK_PERFORMANCE_BENCHMARK
84 double raterx_sec = (double)raterx/1000;
85 m_perEvtVerifier.init(raterx_sec, (raterx_sec/100), raterx_sec-0.001, raterx_sec+0.001, 0.0001, 1,
"Receiver");
104 recv_socket = pSocket;
105 ethManager = _ethManager;
107 ACE_HANDLE sockfd = pSocket->get_handle();
109 int32_t mysize = 1024*1024;
111 int len =
sizeof(mysize);
114 std::string _dgram_buffer_size = yarp::conf::environment::get_string(
"ETHRECEIVER_BUFFER_SIZE");
115 if (_dgram_buffer_size!=
"")
116 mysize = yarp::conf::numeric::from_string(_dgram_buffer_size, 0);
118 retval = ACE_OS::setsockopt (sockfd, SOL_SOCKET, SO_RCVBUF, (
char *)&mysize,
sizeof(mysize));
122 yError()<<
"ERROR in SetSockOpt SO_RCVBUF";
125 int32_t sock_input_buf_size = 0;
126 retval = ACE_OS::getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, (
char *)&sock_input_buf_size, &len);
130 yError() <<
"ERROR inGetSockOpt SO_RCVBUF";
133 yWarning() <<
"in EthReceiver::config() the config socket has queue size = "<< sock_input_buf_size<<
"; you request ETHRECEIVER_BUFFER_SIZE=" << _dgram_buffer_size;
138 recv_socket->enable(ACE_NONBLOCK);
147 yTrace() <<
"Do some initialization here if needed";
149 #if defined(__unix__)
155 struct sched_param thread_param;
156 thread_param.sched_priority = sched_get_priority_max(SCHED_FIFO)/2;
157 pthread_setschedparam(pthread_self(), SCHED_FIFO, &thread_param);
166 return(eo_ropframedata_age_Get((EOropframeData*)pck));
173 ssize_t incoming_msg_size = 0;
174 ACE_INET_Addr sender_addr;
178 #ifdef NETWORK_PERFORMANCE_BENCHMARK
179 m_perEvtVerifier.tick(yarp::os::Time::now());
185 flags |= MSG_DONTWAIT;
188 static uint8_t earlyexit_prev = 0;
189 static uint8_t earlyexit_prevprev = 0;
194 const double f1 = 0.5;
195 const double f2 = 0.5;
196 const double f3 = 8.0;
197 double gain = 1.0 + f1*(1-earlyexit_prev) + f2*(1-earlyexit_prevprev) + f3*(1-earlyexit_prev)*(1-earlyexit_prevprev);
198 int maxUDPpackets = (2 + ethManager->
getNumberOfResources()) * EthReceiver::rateofthread * gain;
201 earlyexit_prevprev = earlyexit_prev;
204 for(
int i=0; i<maxUDPpackets; i++)
206 incoming_msg_size = recv_socket->recv((
void *) incoming_msg_data, incoming_msg_capacity, sender_addr, flags);
207 if(incoming_msg_size <= 0)
215 ethManager->
Reception(ethManager->
toipv4addr(sender_addr), incoming_msg_data, incoming_msg_size);
bool config(ACE_SOCK_Dgram *pSocket, eth::TheEthManager *_ethManager)
int sendPacket(const void *udpframe, size_t len, const eOipv4addressing_t &toaddressing)
eOipv4addr_t toipv4addr(const ACE_INET_Addr &aceinetaddr)
bool Reception(eOipv4addr_t from, uint64_t *data, ssize_t size)
int getNumberOfResources(void)
const eOipv4addressing_t & getLocalIPV4addressing(void)
uint64_t getRopFrameAge(char *pck)