$darkmode
tcp_transceiver.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2020 Robert Bosch GmbH
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * SPDX-License-Identifier: Apache-2.0
17  */
22 #pragma once
23 
24 #include <chrono> // for duration<>
25 #include <memory> // for unique_ptr<>
26 #include <string> // for string, to_string
27 #include <thread> // for this_thread, sleep_for
28 #include <utility> // for move
29 
30 #include <boost/asio.hpp> // for iostream
31 
32 #include <cloe/core.hpp> // for Error, Logger
33 #include <cloe/core/abort.hpp> // for AbortFlag, abort_checkpoint
34 #include <cloe/utility/tcp_transceiver_config.hpp> // for TcpTransceiverConfiguration
35 
36 namespace cloe {
37 namespace utility {
38 
48 class TcpReadError : public Error {
49  public:
50  using Error::Error;
51  virtual ~TcpReadError() noexcept = default;
52 };
53 
60  public:
61  TcpTransceiver() = default;
62  TcpTransceiver(const std::string& host, uint16_t port) { tcp_connect(host, port); }
63  virtual ~TcpTransceiver() { TcpTransceiver::tcp_disconnect(); } // NOLINT
64 
69  void tcp_connect(const std::string& host, uint16_t port) {
70  tcp_stream_.clear();
71  tcp_stream_.connect(host, std::to_string(port));
72  if (!tcp_stream_) {
73  throw std::ios_base::failure(tcp_stream_.error().message());
74  }
75  tcp_connected_ = true;
76  tcp_host_ = host;
77  tcp_port_ = port;
78  }
79 
85  bool tcp_is_connected() const { return tcp_connected_; }
86 
92  bool tcp_is_ok() const { return static_cast<bool>(tcp_stream_); }
93 
99  void tcp_disconnect() {
100  tcp_stream_.close();
101  tcp_connected_ = false;
102  }
103 
104  uint16_t tcp_port() const { return tcp_port_; }
105  const std::string& tcp_host() const { return tcp_host_; }
106  std::string tcp_endpoint() const { return fmt::format("tcp://{}:{}", tcp_host_, tcp_port_); }
107 
108  protected:
112  std::streamsize tcp_available_data() const {
113  return tcp_stream_.rdbuf()->in_avail() + tcp_stream_.rdbuf()->available();
114  }
115 
116  template <typename M>
117  void tcp_send(M* msg, size_t sz) {
118  tcp_stream_.write(reinterpret_cast<const char*>(msg), sz);
119  tcp_stream_.flush();
120  }
121 
122  protected:
123  boost::asio::ip::tcp::iostream tcp_stream_;
124  bool tcp_connected_{false};
125  std::string tcp_host_{};
126  uint16_t tcp_port_{};
127 };
128 
148 template <typename T>
150  public:
151  TcpTransceiverFactory() = default;
152  TcpTransceiverFactory(int attempts, std::chrono::duration<float> delay) {
153  config_.retry_attempts = attempts;
154  config_.retry_delay = delay;
155  }
156  explicit TcpTransceiverFactory(const TcpTransceiverConfiguration& c) : config_(c) {}
157  explicit TcpTransceiverFactory(TcpTransceiverConfiguration&& c) : config_(std::move(c)) {}
158  virtual ~TcpTransceiverFactory() = default;
159 
160  int retry_attempts() const { return config_.retry_attempts; }
161  void set_retry_attempts(int attempts) { config_.retry_attempts = attempts; }
162 
163  std::chrono::duration<float> retry_delay() const { return config_.retry_delay; }
164  void set_retry_delay(std::chrono::duration<float> delay) { config_.retry_delay = delay; }
165 
169  std::unique_ptr<T> create_or_null(const std::string& host, uint16_t port) const {
170  try {
171  return this->create_or_throw(host, port);
172  } catch (std::ios_base::failure&) {
173  return std::unique_ptr<T>{nullptr};
174  }
175  }
176 
180  std::unique_ptr<T> create_or_throw(const std::string& host, uint16_t port) const {
181  // On a 32-bit machine, this will overflow in 48 days.
182  for (int32_t attempts = 0; true; attempts++) {
183  try {
184  if (attempts == 0) {
185  factory_logger()->info("{} connect tcp://{}:{}", instance_name(), host, port);
186  } else {
187  factory_logger()->info("{} connect tcp://{}:{} [attempt {}/{}]", instance_name(), host,
188  port, attempts + 1, config_.retry_attempts + 1);
189  }
190  return std::make_unique<T>(host, port);
191  } catch (std::ios_base::failure&) {
192  if (attempts == config_.retry_attempts) {
193  throw;
194  }
195  }
196  std::this_thread::sleep_for(config_.retry_delay);
197  }
198  }
199 
205  std::unique_ptr<T> create_or_throw(const std::string& host, uint16_t port, AbortFlag& sig) const {
206  // On a 32-bit machine, this will overflow in 48 days.
207  for (int32_t attempts = 0; true; attempts++) {
208  try {
209  if (attempts == 0) {
210  factory_logger()->info("{} connect tcp://{}:{}", instance_name(), host, port);
211  } else {
212  factory_logger()->info("{} connect tcp://{}:{} [attempt {}/{}]", instance_name(), host,
213  port, attempts + 1, config_.retry_attempts + 1);
214  }
215  return std::make_unique<T>(host, port);
216  } catch (std::ios_base::failure&) {
217  if (attempts == config_.retry_attempts) {
218  throw;
219  }
220  }
221  abort_checkpoint(sig);
222  std::this_thread::sleep_for(config_.retry_delay);
223  }
224  }
225 
226  friend void to_json(Json& j, const TcpTransceiverFactory<T>& f) { to_json(j, f.config_); }
227  friend void from_json(const Json& j, TcpTransceiverFactory<T>& f) { from_json(j, f.config_); }
228 
229  protected:
230  virtual Logger factory_logger() const = 0;
231  virtual const char* instance_name() const = 0;
232 
233  protected:
234  TcpTransceiverConfiguration config_;
235 };
236 
242 template <typename F>
244  -> decltype(F{c}.create_or_throw(c.host, c.port)) {
245  return F{c}.create_or_throw(c.host, c.port);
246 }
247 
253 template <typename F>
255  -> decltype(F{c}.create_or_throw(c.host, c.port, sig)) {
256  return F{c}.create_or_throw(c.host, c.port, sig);
257 }
258 
264 template <typename F>
266  -> decltype(F{c}.create_or_null(c.host, c.port)) {
267  return F{c}.create_or_null(c.host, c.port);
268 }
269 
270 } // namespace utility
271 } // namespace cloe
std::atomic_bool AbortFlag
Definition: abort.hpp:40
void abort_checkpoint(AbortFlag &sig)
Definition: abort.hpp:58
Definition: error.hpp:35
Definition: tcp_transceiver.hpp:48
Definition: tcp_transceiver.hpp:149
std::unique_ptr< T > create_or_throw(const std::string &host, uint16_t port) const
Definition: tcp_transceiver.hpp:180
std::unique_ptr< T > create_or_null(const std::string &host, uint16_t port) const
Definition: tcp_transceiver.hpp:169
std::unique_ptr< T > create_or_throw(const std::string &host, uint16_t port, AbortFlag &sig) const
Definition: tcp_transceiver.hpp:205
Definition: tcp_transceiver.hpp:59
void tcp_disconnect()
Definition: tcp_transceiver.hpp:99
void tcp_connect(const std::string &host, uint16_t port)
Definition: tcp_transceiver.hpp:69
bool tcp_is_ok() const
Definition: tcp_transceiver.hpp:92
std::streamsize tcp_available_data() const
Definition: tcp_transceiver.hpp:112
bool tcp_is_connected() const
Definition: tcp_transceiver.hpp:85
Definition: tcp_transceiver_config.hpp:38
int32_t retry_attempts
Definition: tcp_transceiver_config.hpp:51
std::chrono::duration< float > retry_delay
Definition: tcp_transceiver_config.hpp:59
Definition: tcp_transceiver_config.hpp:82
auto create_or_null_with(const TcpTransceiverFullConfiguration &c) -> decltype(F{c}.create_or_null(c.host, c.port))
Definition: tcp_transceiver.hpp:265
auto create_or_throw_with(const TcpTransceiverFullConfiguration &c) -> decltype(F{c}.create_or_throw(c.host, c.port))
Definition: tcp_transceiver.hpp:243