$darkmode
rdb_transceiver_tcp.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 Robert Bosch GmbH
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * SPDX-License-Identifier: Apache-2.0
17  */
24 #pragma once
25 
26 #include <memory> // for shared_ptr<>
27 #include <string> // for string
28 #include <vector> // for vector<>
29 
30 #include <boost/asio.hpp> // for streamsize
31 
32 #include <cloe/core.hpp> // for Json
33 #include <cloe/utility/tcp_transceiver.hpp> // for TcpTransceiver
34 
35 #include "rdb_transceiver.hpp" // for RdbTransceiver
36 #include "vtd_logger.hpp" // for rdb_logger
37 
38 #define VTD_RDB_WAIT_SLEEP_MS 1
39 
40 namespace vtd {
41 
46  public:
47  using TcpTransceiver::TcpTransceiver;
48 
49  bool has() const override {
50  return this->tcp_available_data() >= static_cast<std::streamsize>(sizeof(RDB_MSG_HDR_t));
51  }
52 
53  std::vector<std::shared_ptr<RDB_MSG_t>> receive() override {
54  std::vector<std::shared_ptr<RDB_MSG_t>> msgs;
55  while (!this->has()) {
56  std::this_thread::sleep_for(cloe::Milliseconds{VTD_RDB_WAIT_SLEEP_MS});
57  }
58  while (this->has()) {
59  num_received_++;
60  msgs.push_back(this->receive_wait());
61  }
62  return msgs;
63  }
64 
65  void send(const RDB_MSG_t* message, size_t size) override {
66  num_sent_++;
67  this->tcp_send(message, size);
68  }
69 
70  void to_json(cloe::Json& j) const override {
71  j = cloe::Json{
72  {"connection_endpoint", this->tcp_endpoint()},
73  {"connection_ok", this->tcp_is_ok()},
74  {"num_errors", this->num_errors_},
75  {"num_messages_sent", this->num_sent_},
76  {"num_messages_received", this->num_received_},
77  };
78  }
79 
80  friend void to_json(cloe::Json& j, const RdbTransceiverTcp& t) { t.to_json(j); }
81 
82  protected:
86  std::shared_ptr<RDB_MSG_t> receive_wait();
87 
88  private:
89  // Statistics for interest's sake
90  uint64_t num_errors_{0};
91  uint64_t num_sent_{0};
92  uint64_t num_received_{0};
93 };
94 
96  public:
97  using TcpTransceiverFactory::TcpTransceiverFactory;
98 
99  protected:
100  cloe::Logger factory_logger() const override { return rdb_logger(); }
101  const char* instance_name() const override { return "RdbTransceiverTcp"; }
102 };
103 
104 } // namespace vtd
Definition: tcp_transceiver.hpp:149
Definition: tcp_transceiver.hpp:59
bool tcp_is_ok() const
Definition: tcp_transceiver.hpp:92
std::streamsize tcp_available_data() const
Definition: tcp_transceiver.hpp:112
Definition: rdb_transceiver_tcp.hpp:95
Definition: rdb_transceiver_tcp.hpp:45
bool has() const override
Definition: rdb_transceiver_tcp.hpp:49
std::vector< std::shared_ptr< RDB_MSG_t > > receive() override
Definition: rdb_transceiver_tcp.hpp:53
void send(const RDB_MSG_t *message, size_t size) override
Definition: rdb_transceiver_tcp.hpp:65
std::shared_ptr< RDB_MSG_t > receive_wait()
Definition: rdb_transceiver_tcp.cpp:36
Definition: rdb_transceiver.hpp:59