$darkmode
rdb_transceiver_tcp.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 Robert Bosch GmbH
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * SPDX-License-Identifier: Apache-2.0
17  */
24 #pragma once
25 
26 #include <memory> // for shared_ptr<>
27 #include <vector> // for vector<>
28 
29 #include <boost/asio.hpp> // for streamsize
30 
31 #include <cloe/core.hpp> // for Json
32 #include <cloe/utility/tcp_transceiver.hpp> // for TcpTransceiver
33 
34 #include "rdb_transceiver.hpp" // for RdbTransceiver
35 #include "vtd_logger.hpp" // for rdb_logger
36 
37 #define VTD_RDB_WAIT_SLEEP_MS 1
38 
39 namespace vtd {
40 
45  public:
46  using TcpTransceiver::TcpTransceiver;
47 
48  bool has() const override {
49  return this->tcp_available_data() >= static_cast<std::streamsize>(sizeof(RDB_MSG_HDR_t));
50  }
51 
52  std::vector<std::shared_ptr<RDB_MSG_t>> receive() override {
53  std::vector<std::shared_ptr<RDB_MSG_t>> msgs;
54  while (!this->has()) {
55  std::this_thread::sleep_for(cloe::Milliseconds{VTD_RDB_WAIT_SLEEP_MS});
56  }
57  while (this->has()) {
58  num_received_++;
59  msgs.push_back(this->receive_wait());
60  }
61  return msgs;
62  }
63 
64  void send(const RDB_MSG_t* message, size_t size) override {
65  num_sent_++;
66  this->tcp_send(message, size);
67  }
68 
69  void to_json(cloe::Json& j) const override {
70  j = cloe::Json{
71  {"connection_endpoint", this->tcp_endpoint()},
72  {"connection_ok", this->tcp_is_ok()},
73  {"num_errors", this->num_errors_},
74  {"num_messages_sent", this->num_sent_},
75  {"num_messages_received", this->num_received_},
76  };
77  }
78 
79  friend void to_json(cloe::Json& j, const RdbTransceiverTcp& t) { t.to_json(j); }
80 
81  protected:
85  std::shared_ptr<RDB_MSG_t> receive_wait();
86 
87  private:
88  // Statistics for interest's sake
89  uint64_t num_errors_{0};
90  uint64_t num_sent_{0};
91  uint64_t num_received_{0};
92 };
93 
95  public:
96  using TcpTransceiverFactory::TcpTransceiverFactory;
97 
98  protected:
99  cloe::Logger factory_logger() const override { return rdb_logger(); }
100  const char* instance_name() const override { return "RdbTransceiverTcp"; }
101 };
102 
103 } // namespace vtd
Definition: tcp_transceiver.hpp:149
Definition: tcp_transceiver.hpp:59
bool tcp_is_ok() const
Definition: tcp_transceiver.hpp:92
std::streamsize tcp_available_data() const
Definition: tcp_transceiver.hpp:112
Definition: rdb_transceiver_tcp.hpp:94
Definition: rdb_transceiver_tcp.hpp:44
bool has() const override
Definition: rdb_transceiver_tcp.hpp:48
std::vector< std::shared_ptr< RDB_MSG_t > > receive() override
Definition: rdb_transceiver_tcp.hpp:52
void send(const RDB_MSG_t *message, size_t size) override
Definition: rdb_transceiver_tcp.hpp:64
std::shared_ptr< RDB_MSG_t > receive_wait()
Definition: rdb_transceiver_tcp.cpp:36
Definition: rdb_transceiver.hpp:59