32 #include <yarp/os/BufferedPort.h>
33 #include <yarp/os/ResourceFinder.h>
34 #include <yarp/os/RpcClient.h>
35 #include <yarp/os/RFModule.h>
36 #include <yarp/os/Network.h>
37 #include <yarp/os/Time.h>
38 #include <yarp/os/Log.h>
39 #include <yarp/os/LogStream.h>
40 #include <yarp/os/Semaphore.h>
41 #include <yarp/sig/SoundFile.h>
42 #include <yarp/dev/PolyDriver.h>
43 #include <yarp/sig/SoundFile.h>
45 #include <grpc++/grpc++.h>
46 #include "google/cloud/speech/v1/cloud_speech.grpc.pb.h"
48 #include "googleSpeech_IDL.h"
50 using google::cloud::speech::v1::RecognitionConfig;
51 using google::cloud::speech::v1::Speech;
52 using google::cloud::speech::v1::RecognizeRequest;
53 using google::cloud::speech::v1::RecognizeResponse;
57 static const std::map<grpc::StatusCode, std::string> status_code_to_string {
59 {grpc::CANCELLED,
"cancelled"},
60 {grpc::UNKNOWN,
"unknown"},
61 {grpc::INVALID_ARGUMENT,
"invalid_argument"},
62 {grpc::DEADLINE_EXCEEDED,
"deadline_exceeded"},
63 {grpc::NOT_FOUND,
"not_found"},
64 {grpc::ALREADY_EXISTS,
"already_exists"},
65 {grpc::PERMISSION_DENIED,
"permission_denied"},
66 {grpc::UNAUTHENTICATED,
"unauthenticated"},
67 {grpc::RESOURCE_EXHAUSTED ,
"resource_exhausted"},
68 {grpc::FAILED_PRECONDITION,
"failed_precondition"},
69 {grpc::ABORTED,
"aborted"},
70 {grpc::OUT_OF_RANGE,
"out_of_range"},
71 {grpc::UNIMPLEMENTED,
"unimplemented"},
72 {grpc::INTERNAL,
"internal"},
73 {grpc::UNAVAILABLE,
"unavailable"},
74 {grpc::DATA_LOSS,
"data_loss"},
75 {grpc::DO_NOT_USE,
"do_not_use"}
78 class Processing :
public yarp::os::TypedReaderCallback<yarp::sig::Sound>
80 std::string moduleName;
81 yarp::os::RpcServer handlerPort;
82 yarp::os::BufferedPort<yarp::sig::Sound> port;
83 yarp::os::BufferedPort<yarp::os::Bottle> targetPort;
84 yarp::os::RpcClient audioCommand;
86 std::int64_t &elapsed_seconds;
88 std::deque<yarp::sig::Sound> sounds;
100 std::chrono::time_point<std::chrono::system_clock> start, end;
105 Processing(
const std::string &moduleName,
const std::string &language,
const int sample_rate, std::string &state, std::int64_t &elapsed_seconds ) : state(state), elapsed_seconds(elapsed_seconds)
107 this->moduleName = moduleName;
108 yInfo() <<
"language " << language;
109 yInfo() <<
"sample_rate " << sample_rate;
110 this->language = language;
111 this->sample_rate = sample_rate;
113 port.useCallback(*
this);
119 sendForQuery =
false;
124 void setUsingUniqueSound()
140 port.setStrict(
true);
142 port.open(
"/" + moduleName +
"/sound:i");
143 targetPort.open(
"/"+ moduleName +
"/result:o");
144 audioCommand.open(
"/"+ moduleName +
"/commands:rpc");
154 audioCommand.close();
158 using yarp::os::TypedReaderCallback<yarp::sig::Sound>::onRead;
159 void onRead( yarp::sig::Sound& sound )
override
161 std::lock_guard<std::mutex> lg(mtx);
165 int ct = port.getPendingReads();
168 ct = port.getPendingReads();
169 yWarning() <<
"Dropping sound packet -- " << ct <<
" packet(s) behind";
178 yarp::sig::Sound total;
179 total.resize(samples,channels);
181 while (!sounds.empty()) {
182 yarp::sig::Sound& tmp = sounds.front();
184 yDebug() <<
"channels " << channels;
185 yDebug() <<
"samples " << tmp.getSamples();
186 yDebug() <<
"values " << tmp.get(0,0);
188 for (
int i=0; i<channels; i++) {
189 for (
int j=0; j<tmp.getSamples(); j++) {
190 total.set(tmp.get(j,i),at+j,i);
193 total.setFrequency(tmp.getFrequency());
194 at += tmp.getSamples();
197 yarp::os::Bottle &outTargets = targetPort.prepare();
199 yarp::os::Bottle cmd, rep;
200 cmd.addString(
"stop");
201 if (audioCommand.write(cmd, rep))
203 yDebug() <<
"cmd.addString(stop)" << rep.toString().c_str();
206 outTargets = queryGoogle(total);
209 sendForQuery =
false;
213 if(outTargets.size()>0){
216 yDebug() <<
"done querying google";
218 yarp::os::Time::yield();
222 void collectFrame(yarp::sig::Sound& sound)
224 sounds.push_back(sound);
225 samples += sound.getSamples();
226 channels = sound.getChannels();
227 yDebug() << (
long int) sounds.size() <<
"sound frames buffered in memory ( " << (
long int) samples <<
" samples)";
231 yarp::os::Bottle queryGoogle(yarp::sig::Sound& sound)
233 RecognizeRequest request;
235 yDebug() <<
"in queryGoogle";
236 yDebug() <<
"language" << language;
239 auto creds = grpc::GoogleDefaultCredentials();
240 auto channel = grpc::CreateChannel(
"speech.googleapis.com", creds);
241 std::unique_ptr<Speech::Stub> speech(Speech::NewStub(channel));
243 setArguments(request.mutable_config());
245 yInfo() <<
"getFrequency " << sound.getFrequency();
246 yInfo() <<
"getSamples " << sound.getSamples();
247 yInfo() <<
"getChannels " << sound.getChannels();
248 yInfo() <<
"getBytesPerSamples " << sound.getBytesPerSample();
250 auto vec_i = sound.getNonInterleavedAudioRawData();
251 auto s1 = std::vector<short>(vec_i.begin(), vec_i.end());
253 yInfo() <<
"AudioRawData s1.size()" << s1.size();
255 request.mutable_audio()->mutable_content()->assign((
char*)s1.data(), s1.size()*2);
257 end = std::chrono::system_clock::now();
259 double start_elapsed_seconds = std::chrono::duration_cast<std::chrono::milliseconds> (end-start).count();
261 yInfo() <<
"From start to mutable audio " << start_elapsed_seconds / 1000 <<
" seconds passed";
263 grpc::ClientContext context;
264 RecognizeResponse response;
266 start = std::chrono::system_clock::now();
270 yarp::os::Time::delay(0.2);
271 grpc::Status rpc_status = speech->Recognize(&context, request, &response);
272 std::string status_string = status_code_to_string.at(rpc_status.error_code());
273 end = std::chrono::system_clock::now();
275 elapsed_seconds = std::chrono::duration_cast<std::chrono::milliseconds> (end-start).count();
276 yInfo() <<
"Sending to google took " << elapsed_seconds <<
" ms";
278 if (!rpc_status.ok()) {
280 yInfo() << rpc_status.error_message();
282 checkState(
"Failure_" + status_string);
285 yInfo() <<
"Size of response " << response.results_size();
286 if(response.results_size()>0){
290 for (
int r = 0; r < response.results_size(); ++r)
292 auto result = response.results(r);
293 for (
int a = 0; a < result.alternatives_size(); ++a)
295 auto alternative = result.alternatives(a);
296 yInfo() << alternative.confidence();
297 yInfo() << alternative.transcript();
298 b.addString(alternative.transcript());
310 bool setLanguageCode(
const std::string &languageCode)
312 language = languageCode;
317 std::string getLanguageCode()
323 void setArguments(RecognitionConfig* config)
325 config->set_language_code(language.c_str());
326 config->set_sample_rate_hertz(sample_rate);
327 config->set_encoding(RecognitionConfig::LINEAR16);
329 config->set_use_enhanced(
true);
330 auto metadata = config->mutable_metadata();
332 metadata->set_microphone_distance(google::cloud::speech::v1::RecognitionMetadata_MicrophoneDistance_MIDFIELD);
333 metadata->set_recording_device_type(google::cloud::speech::v1::RecognitionMetadata_RecordingDeviceType_OTHER_INDOOR_DEVICE);
334 metadata->set_interaction_type(google::cloud::speech::v1::RecognitionMetadata_InteractionType_VOICE_COMMAND);
335 metadata->set_original_media_type(google::cloud::speech::v1::RecognitionMetadata_OriginalMediaType_AUDIO);
339 bool start_acquisition()
341 std::lock_guard<std::mutex> lg(mtx);
342 yarp::os::Bottle cmd, rep;
344 cmd.addString(
"start");
345 if (audioCommand.write(cmd, rep))
347 yDebug() <<
"cmd.addString(start)" << rep.toString().c_str();
350 start = std::chrono::system_clock::now();
352 checkState(
"Listening");
357 bool stop_acquisition()
359 std::lock_guard<std::mutex> lg(mtx);
369 bool checkState(std::string new_state)
371 if(new_state!=state){
383 class Module :
public yarp::os::RFModule,
public googleSpeech_IDL
385 yarp::os::ResourceFinder *rf;
386 yarp::os::RpcServer rpcPort;
388 std::int64_t elapsed_seconds;
389 yarp::os::BufferedPort<yarp::os::Bottle> statePort;
391 Processing *processing;
392 friend class processing;
396 std::vector<std::string> allLanguageCodes;
399 bool attach(yarp::os::RpcServer &source)
401 return this->yarp().attachAsServer(source);
407 bool configure(yarp::os::ResourceFinder &rf)
411 this->elapsed_seconds=0;
414 std::string moduleName = rf.check(
"name", yarp::os::Value(
"googleSpeech"),
"module name (string)").asString();
415 std::string language = rf.check(
"language_code", yarp::os::Value(
"en-US"),
"language (string)").asString();
416 int sample_rate = rf.check(
"sample_rate_hertz", yarp::os::Value(16000),
"sample rate (int)").asInt32();
418 if (rf.check(
"uniqueSound",
"use a yarp::sig::Sound instead of a microphone"))
421 if (rf.check(
"languageCodes",
"Getting language codes"))
423 yarp::os::Bottle &grp=rf.findGroup(
"languageCodes");
426 for (
int i=0; i<sz; i++)
427 allLanguageCodes.push_back(grp.get(1+i).asString());
430 setName(moduleName.c_str());
432 rpcPort.open((
"/"+getName(
"/rpc")).c_str());
433 statePort.open(
"/"+ moduleName +
"/state:o");
437 processing =
new Processing( moduleName, language, sample_rate, state, elapsed_seconds);
443 processing->setUsingUniqueSound();
451 bool setLanguage(
const std::string& languageCode)
453 bool returnVal =
false;
455 std::string language;
457 for (
int i = 0; i < allLanguageCodes.size(); i++)
459 if (languageCode == allLanguageCodes[i])
461 language = languageCode;
462 processing->setLanguageCode(languageCode);
472 std::string getLanguageCode()
474 return processing->getLanguageCode();
490 processing->start_acquisition();
497 processing->stop_acquisition();
519 yarp::os::Bottle &outTargets = statePort.prepare();
521 outTargets.addString(state);
522 yDebug() <<
"outTarget:" << outTargets.toString().c_str();
528 std::string getState()
534 std::int64_t getProcessingTime()
536 return elapsed_seconds;
542 int main(
int argc,
char *argv[])
544 yarp::os::Network::init();
546 yarp::os::Network yarp;
547 if (!yarp.checkNetwork())
549 yError(
"YARP server not available!");
554 yarp::os::ResourceFinder rf;
556 rf.setVerbose(
true );
557 rf.setDefaultContext(
"googleSpeech" );
558 rf.setDefaultConfigFile(
"config.ini" );
559 rf.setDefault(
"name",
"googleSpeech");
560 rf.configure(argc,argv);
562 return module.runModule(rf);