iCub-main
Loading...
Searching...
No Matches
ethReceiver.cpp
Go to the documentation of this file.
1// -*- Mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2
3
4/*
5 * Copyright (C) 2017 iCub Facility - Istituto Italiano di Tecnologia
6 * Author: Alberto Cardellino, Marco Accame
7 * email: alberto.cardellino@iit.it, marco.accame@iit.it
8 * website: www.robotcub.org
9 * Permission is granted to copy, distribute, and/or modify this program
10 * under the terms of the GNU General Public License, version 2 or any
11 * later version published by the Free Software Foundation.
12 *
13 * A copy of the license can be found at
14 * http://www.robotcub.org/icub/license/gpl.txt
15 *
16 * This program is distributed in the hope that it will be useful, but
17 * WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
19 * Public License for more details
20*/
21
22
23// --------------------------------------------------------------------------------------------------------------------
24// - public interface
25// --------------------------------------------------------------------------------------------------------------------
26
27#include "ethReceiver.h"
28
29
30
31// --------------------------------------------------------------------------------------------------------------------
32// - external dependencies
33// --------------------------------------------------------------------------------------------------------------------
34
35#include <yarp/os/Network.h>
36#include <yarp/os/NetType.h>
37#include <yarp/conf/environment.h>
38
39//#include <yarp/os/SystemClock.h>
40#include <yarp/os/Log.h>
41#include <yarp/os/LogStream.h>
42using yarp::os::Log;
43
44#include "ethManager.h"
45#include "ethResource.h"
46
47
48// --------------------------------------------------------------------------------------------------------------------
49// - pimpl: private implementation (see scott meyers: item 22 of effective modern c++, item 31 of effective c++
50// --------------------------------------------------------------------------------------------------------------------
51
52
53
54// --------------------------------------------------------------------------------------------------------------------
55// - the class
56// --------------------------------------------------------------------------------------------------------------------
57
58
59// - class eth::EthReceiver
60
61using namespace eth;
62
63
64
65EthReceiver::EthReceiver(int raterx): PeriodicThread((double)raterx/1000.0)
66{
67 rateofthread = raterx;
68 yDebug() << "EthReceiver is a PeriodicThread with rxrate =" << rateofthread << "ms";
69 // ok, and now i get it from xml file ... if i find it.
70
71// std::string tmp = yarp::conf::environment::get_string("ETHSTAT_PRINT_INTERVAL");
72// if (tmp != "")
73// {
74// statPrintInterval = (double)yarp::conf::numeric::from_string(tmp, 0.);
75// }
76// else
77// {
78// statPrintInterval = 0.0;
79// }
80#ifdef NETWORK_PERFORMANCE_BENCHMARK
81 /* We would like to verify if the receiver thread is ticked(running) every 5 millisecond, with a tollerance of 0.05 millisec.
82 the m_perEvtVerifier object after 1 second, prints an istogram with values from 4 to 6 millisec with a step of 0.1 millisec
83 */
84 double raterx_sec = (double)raterx/1000;//raterx is in milliseconds
85 m_perEvtVerifier.init(raterx_sec, (raterx_sec/100), raterx_sec-0.001, raterx_sec+0.001, 0.0001, 1, "Receiver");
86#endif
87}
88
90{
91 // in here i send a small packet to ... myself ?
92 uint8_t tmp = 0;
93 ethManager->sendPacket( &tmp, 1, ethManager->getLocalIPV4addressing());
94}
95
100
101bool EthReceiver::config(ACE_SOCK_Dgram *pSocket, TheEthManager* _ethManager)
102{
103 yTrace();
104 recv_socket = pSocket;
105 ethManager = _ethManager;
106
107 ACE_HANDLE sockfd = pSocket->get_handle();
108 int retval;
109 int32_t mysize = 1024*1024; // 1Mb note:actually kernel uses memory with size doblem of mysize
110 // with this size i'm sure ems pkts are not lost
111 int len = sizeof(mysize);
112
113 // the user can change buffer size by environment variable ETHRECEIVER_BUFFER_SIZE
114 std::string _dgram_buffer_size = yarp::conf::environment::get_string("ETHRECEIVER_BUFFER_SIZE");
115 if (_dgram_buffer_size!="")
116 mysize = yarp::conf::numeric::from_string(_dgram_buffer_size, 0);
117
118 retval = ACE_OS::setsockopt (sockfd, SOL_SOCKET, SO_RCVBUF, (char *)&mysize, sizeof(mysize));
119 if (retval != 0)
120 {
121 int myerr = errno;
122 yError()<< "ERROR in SetSockOpt SO_RCVBUF";
123 }
124
125 int32_t sock_input_buf_size = 0;
126 retval = ACE_OS::getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, (char *)&sock_input_buf_size, &len);
127 if (retval != 0)
128 {
129 int myerr = errno;
130 yError() << "ERROR inGetSockOpt SO_RCVBUF";
131 }
132
133 yWarning() << "in EthReceiver::config() the config socket has queue size = "<< sock_input_buf_size<< "; you request ETHRECEIVER_BUFFER_SIZE=" << _dgram_buffer_size;
134
135 // On Windows the MSG_DONTWAIT flag set in EthReceiver::run(), so the recv is actually blocking, and this prevents
136 // the clean close. So, we instead use ACE_NONBLOCK that works also on Windows
137#ifdef WIN32
138 recv_socket->enable(ACE_NONBLOCK);
139#endif
140
141 return true;
142}
143
144
146{
147 yTrace() << "Do some initialization here if needed";
148
149#if defined(__unix__)
155 struct sched_param thread_param;
156 thread_param.sched_priority = sched_get_priority_max(SCHED_FIFO)/2; // = 49
157 pthread_setschedparam(pthread_self(), SCHED_FIFO, &thread_param);
158#endif
159
160 return true;
161}
162
163
164uint64_t getRopFrameAge(char *pck)
165{
166 return(eo_ropframedata_age_Get((EOropframeData*)pck));
167}
168
169
170
172{
173 ssize_t incoming_msg_size = 0;
174 ACE_INET_Addr sender_addr;
175 uint64_t incoming_msg_data[TheEthManager::maxRXpacketsize/8]; // 8-byte aligned local buffer for incoming packet: it must be able to accomodate max size of packet
176 const ssize_t incoming_msg_capacity = TheEthManager::maxRXpacketsize;
177
178#ifdef NETWORK_PERFORMANCE_BENCHMARK
179 m_perEvtVerifier.tick(yarp::os::Time::now());
180#endif
181
182
183 int flags = 0;
184#ifndef WIN32
185 flags |= MSG_DONTWAIT;
186#endif
187
188 static uint8_t earlyexit_prev = 0;
189 static uint8_t earlyexit_prevprev = 0;
190
191 // marco.accame: set maxUDPpackets as a fixed minimum number (2) + the number of boards. all is multipled by rateofthread and by a gain which depends on past activity
192 // the gain on past activity is usually 1. if maxUDPpackets is not enough the gain becomes (1+f1). if again it is not enough, the gain becomes 1+f1+f2+f3 and stays as
193 // such until it is enough. at this time it becomes 1+f2 and then 1 again
194 const double f1 = 0.5;
195 const double f2 = 0.5;
196 const double f3 = 8.0;
197 double gain = 1.0 + f1*(1-earlyexit_prev) + f2*(1-earlyexit_prevprev) + f3*(1-earlyexit_prev)*(1-earlyexit_prevprev);
198 int maxUDPpackets = (2 + ethManager->getNumberOfResources()) * EthReceiver::rateofthread * gain;
199
200
201 earlyexit_prevprev = earlyexit_prev; // save previous early exit
202 earlyexit_prev = 0; // consider no early exit this time
203
204 for(int i=0; i<maxUDPpackets; i++)
205 {
206 incoming_msg_size = recv_socket->recv((void *) incoming_msg_data, incoming_msg_capacity, sender_addr, flags);
207 if(incoming_msg_size <= 0)
208 { // marco.accame: i prefer using <= 0.
209 earlyexit_prev = 1; // yes, we have an early exit
210 break; // we break and do not return because we want to be sure to execute what is after the for() loop
211 }
212
213 // we have a packet ... we give it to the ethmanager for it parsing
214 //bool collectStatistics = (statPrintInterval > 0) ? true : false;
215 ethManager->Reception(ethManager->toipv4addr(sender_addr), incoming_msg_data, incoming_msg_size);
216 }
217
218 // execute the check on presence of all eth boards.
219 ethManager->CheckPresence();
220}
221
222
223
224// - end-of-file (leave a blank line after)----------------------------------------------------------------------------
225
226
227
228
229
bool config(ACE_SOCK_Dgram *pSocket, eth::TheEthManager *_ethManager)
EthReceiver(int rxrate)
int sendPacket(const void *udpframe, size_t len, const eOipv4addressing_t &toaddressing)
eOipv4addr_t toipv4addr(const ACE_INET_Addr &aceinetaddr)
bool CheckPresence(void)
bool Reception(eOipv4addr_t from, uint64_t *data, ssize_t size)
int getNumberOfResources(void)
const eOipv4addressing_t & getLocalIPV4addressing(void)
uint64_t getRopFrameAge(char *pck)