iCub-main
ethReceiver.cpp
Go to the documentation of this file.
1 // -*- Mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 
3 
4 /*
5  * Copyright (C) 2017 iCub Facility - Istituto Italiano di Tecnologia
6  * Author: Alberto Cardellino, Marco Accame
7  * email: alberto.cardellino@iit.it, marco.accame@iit.it
8  * website: www.robotcub.org
9  * Permission is granted to copy, distribute, and/or modify this program
10  * under the terms of the GNU General Public License, version 2 or any
11  * later version published by the Free Software Foundation.
12  *
13  * A copy of the license can be found at
14  * http://www.robotcub.org/icub/license/gpl.txt
15  *
16  * This program is distributed in the hope that it will be useful, but
17  * WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
19  * Public License for more details
20 */
21 
22 
23 // --------------------------------------------------------------------------------------------------------------------
24 // - public interface
25 // --------------------------------------------------------------------------------------------------------------------
26 
27 #include "ethReceiver.h"
28 
29 
30 
31 // --------------------------------------------------------------------------------------------------------------------
32 // - external dependencies
33 // --------------------------------------------------------------------------------------------------------------------
34 
35 #include <yarp/os/Network.h>
36 #include <yarp/os/NetType.h>
37 #include <yarp/conf/environment.h>
38 
39 //#include <yarp/os/SystemClock.h>
40 #include <yarp/os/Log.h>
41 #include <yarp/os/LogStream.h>
42 using yarp::os::Log;
43 
44 #include "ethManager.h"
45 #include "ethResource.h"
46 
47 
48 // --------------------------------------------------------------------------------------------------------------------
49 // - pimpl: private implementation (see scott meyers: item 22 of effective modern c++, item 31 of effective c++
50 // --------------------------------------------------------------------------------------------------------------------
51 
52 
53 
54 // --------------------------------------------------------------------------------------------------------------------
55 // - the class
56 // --------------------------------------------------------------------------------------------------------------------
57 
58 
59 // - class eth::EthReceiver
60 
61 using namespace eth;
62 
63 
64 
65 EthReceiver::EthReceiver(int raterx): PeriodicThread((double)raterx/1000.0)
66 {
67  rateofthread = raterx;
68  yDebug() << "EthReceiver is a PeriodicThread with rxrate =" << rateofthread << "ms";
69  // ok, and now i get it from xml file ... if i find it.
70 
71 // std::string tmp = yarp::conf::environment::get_string("ETHSTAT_PRINT_INTERVAL");
72 // if (tmp != "")
73 // {
74 // statPrintInterval = (double)yarp::conf::numeric::from_string(tmp, 0.);
75 // }
76 // else
77 // {
78 // statPrintInterval = 0.0;
79 // }
80 #ifdef NETWORK_PERFORMANCE_BENCHMARK
81  /* We would like to verify if the receiver thread is ticked(running) every 5 millisecond, with a tollerance of 0.05 millisec.
82  the m_perEvtVerifier object after 1 second, prints an istogram with values from 4 to 6 millisec with a step of 0.1 millisec
83  */
84  double raterx_sec = (double)raterx/1000;//raterx is in milliseconds
85  m_perEvtVerifier.init(raterx_sec, (raterx_sec/100), raterx_sec-0.001, raterx_sec+0.001, 0.0001, 1, "Receiver");
86 #endif
87 }
88 
90 {
91  // in here i send a small packet to ... myself ?
92  uint8_t tmp = 0;
93  ethManager->sendPacket( &tmp, 1, ethManager->getLocalIPV4addressing());
94 }
95 
97 {
98 
99 }
100 
101 bool EthReceiver::config(ACE_SOCK_Dgram *pSocket, TheEthManager* _ethManager)
102 {
103  yTrace();
104  recv_socket = pSocket;
105  ethManager = _ethManager;
106 
107  ACE_HANDLE sockfd = pSocket->get_handle();
108  int retval;
109  int32_t mysize = 1024*1024; // 1Mb note:actually kernel uses memory with size doblem of mysize
110  // with this size i'm sure ems pkts are not lost
111  int len = sizeof(mysize);
112 
113  // the user can change buffer size by environment variable ETHRECEIVER_BUFFER_SIZE
114  std::string _dgram_buffer_size = yarp::conf::environment::get_string("ETHRECEIVER_BUFFER_SIZE");
115  if (_dgram_buffer_size!="")
116  mysize = yarp::conf::numeric::from_string(_dgram_buffer_size, 0);
117 
118  retval = ACE_OS::setsockopt (sockfd, SOL_SOCKET, SO_RCVBUF, (char *)&mysize, sizeof(mysize));
119  if (retval != 0)
120  {
121  int myerr = errno;
122  yError()<< "ERROR in SetSockOpt SO_RCVBUF";
123  }
124 
125  int32_t sock_input_buf_size = 0;
126  retval = ACE_OS::getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, (char *)&sock_input_buf_size, &len);
127  if (retval != 0)
128  {
129  int myerr = errno;
130  yError() << "ERROR inGetSockOpt SO_RCVBUF";
131  }
132 
133  yWarning() << "in EthReceiver::config() the config socket has queue size = "<< sock_input_buf_size<< "; you request ETHRECEIVER_BUFFER_SIZE=" << _dgram_buffer_size;
134 
135  // On Windows the MSG_DONTWAIT flag set in EthReceiver::run(), so the recv is actually blocking, and this prevents
136  // the clean close. So, we instead use ACE_NONBLOCK that works also on Windows
137 #ifdef WIN32
138  recv_socket->enable(ACE_NONBLOCK);
139 #endif
140 
141  return true;
142 }
143 
144 
146 {
147  yTrace() << "Do some initialization here if needed";
148 
149 #if defined(__unix__)
155  struct sched_param thread_param;
156  thread_param.sched_priority = sched_get_priority_max(SCHED_FIFO)/2; // = 49
157  pthread_setschedparam(pthread_self(), SCHED_FIFO, &thread_param);
158 #endif
159 
160  return true;
161 }
162 
163 
164 uint64_t getRopFrameAge(char *pck)
165 {
166  return(eo_ropframedata_age_Get((EOropframeData*)pck));
167 }
168 
169 
170 
172 {
173  ssize_t incoming_msg_size = 0;
174  ACE_INET_Addr sender_addr;
175  uint64_t incoming_msg_data[TheEthManager::maxRXpacketsize/8]; // 8-byte aligned local buffer for incoming packet: it must be able to accomodate max size of packet
176  const ssize_t incoming_msg_capacity = TheEthManager::maxRXpacketsize;
177 
178 #ifdef NETWORK_PERFORMANCE_BENCHMARK
179  m_perEvtVerifier.tick(yarp::os::Time::now());
180 #endif
181 
182 
183  int flags = 0;
184 #ifndef WIN32
185  flags |= MSG_DONTWAIT;
186 #endif
187 
188  static uint8_t earlyexit_prev = 0;
189  static uint8_t earlyexit_prevprev = 0;
190 
191  // marco.accame: set maxUDPpackets as a fixed minimum number (2) + the number of boards. all is multipled by rateofthread and by a gain which depends on past activity
192  // the gain on past activity is usually 1. if maxUDPpackets is not enough the gain becomes (1+f1). if again it is not enough, the gain becomes 1+f1+f2+f3 and stays as
193  // such until it is enough. at this time it becomes 1+f2 and then 1 again
194  const double f1 = 0.5;
195  const double f2 = 0.5;
196  const double f3 = 8.0;
197  double gain = 1.0 + f1*(1-earlyexit_prev) + f2*(1-earlyexit_prevprev) + f3*(1-earlyexit_prev)*(1-earlyexit_prevprev);
198  int maxUDPpackets = (2 + ethManager->getNumberOfResources()) * EthReceiver::rateofthread * gain;
199 
200 
201  earlyexit_prevprev = earlyexit_prev; // save previous early exit
202  earlyexit_prev = 0; // consider no early exit this time
203 
204  for(int i=0; i<maxUDPpackets; i++)
205  {
206  incoming_msg_size = recv_socket->recv((void *) incoming_msg_data, incoming_msg_capacity, sender_addr, flags);
207  if(incoming_msg_size <= 0)
208  { // marco.accame: i prefer using <= 0.
209  earlyexit_prev = 1; // yes, we have an early exit
210  break; // we break and do not return because we want to be sure to execute what is after the for() loop
211  }
212 
213  // we have a packet ... we give it to the ethmanager for it parsing
214  //bool collectStatistics = (statPrintInterval > 0) ? true : false;
215  ethManager->Reception(ethManager->toipv4addr(sender_addr), incoming_msg_data, incoming_msg_size);
216  }
217 
218  // execute the check on presence of all eth boards.
219  ethManager->CheckPresence();
220 }
221 
222 
223 
224 // - end-of-file (leave a blank line after)----------------------------------------------------------------------------
225 
226 
227 
228 
229 
bool config(ACE_SOCK_Dgram *pSocket, eth::TheEthManager *_ethManager)
EthReceiver(int rxrate)
Definition: ethReceiver.cpp:65
int sendPacket(const void *udpframe, size_t len, const eOipv4addressing_t &toaddressing)
Definition: ethManager.cpp:636
eOipv4addr_t toipv4addr(const ACE_INET_Addr &aceinetaddr)
Definition: ethManager.cpp:645
bool CheckPresence(void)
Definition: ethManager.cpp:275
bool Reception(eOipv4addr_t from, uint64_t *data, ssize_t size)
Definition: ethManager.cpp:666
int getNumberOfResources(void)
Definition: ethManager.cpp:696
const eOipv4addressing_t & getLocalIPV4addressing(void)
Definition: ethManager.cpp:508
uint64_t getRopFrameAge(char *pck)