30 #include <boost/asio.hpp>
71 tcp_stream_.connect(host, std::to_string(port));
73 throw std::ios_base::failure(tcp_stream_.error().message());
75 tcp_connected_ =
true;
92 bool tcp_is_ok()
const {
return static_cast<bool>(tcp_stream_); }
101 tcp_connected_ =
false;
104 uint16_t tcp_port()
const {
return tcp_port_; }
105 const std::string& tcp_host()
const {
return tcp_host_; }
106 std::string tcp_endpoint()
const {
return fmt::format(
"tcp://{}:{}", tcp_host_, tcp_port_); }
113 return tcp_stream_.rdbuf()->in_avail() + tcp_stream_.rdbuf()->available();
116 template <
typename M>
117 void tcp_send(M* msg,
size_t sz) {
118 tcp_stream_.write(
reinterpret_cast<const char*
>(msg), sz);
123 boost::asio::ip::tcp::iostream tcp_stream_;
124 bool tcp_connected_{
false};
125 std::string tcp_host_{};
126 uint16_t tcp_port_{};
148 template <
typename T>
161 void set_retry_attempts(
int attempts) { config_.
retry_attempts = attempts; }
163 std::chrono::duration<float> retry_delay()
const {
return config_.
retry_delay; }
164 void set_retry_delay(std::chrono::duration<float> delay) { config_.
retry_delay = delay; }
169 std::unique_ptr<T>
create_or_null(
const std::string& host, uint16_t port)
const {
172 }
catch (std::ios_base::failure&) {
173 return std::unique_ptr<T>{
nullptr};
182 for (int32_t attempts = 0;
true; attempts++) {
185 factory_logger()->info(
"{} connect tcp://{}:{}", instance_name(), host, port);
187 factory_logger()->info(
"{} connect tcp://{}:{} [attempt {}/{}]", instance_name(), host,
190 return std::make_unique<T>(host, port);
191 }
catch (std::ios_base::failure&) {
207 for (int32_t attempts = 0;
true; attempts++) {
210 factory_logger()->info(
"{} connect tcp://{}:{}", instance_name(), host, port);
212 factory_logger()->info(
"{} connect tcp://{}:{} [attempt {}/{}]", instance_name(), host,
215 return std::make_unique<T>(host, port);
216 }
catch (std::ios_base::failure&) {
227 friend void from_json(
const Json& j, TcpTransceiverFactory<T>& f) { from_json(j, f.config_); }
230 virtual Logger factory_logger()
const = 0;
231 virtual const char* instance_name()
const = 0;
234 TcpTransceiverConfiguration config_;
242 template <
typename F>
244 -> decltype(F{c}.create_or_throw(c.host, c.port)) {
245 return F{c}.create_or_throw(c.host, c.port);
253 template <
typename F>
255 -> decltype(F{c}.create_or_throw(c.host, c.port, sig)) {
256 return F{c}.create_or_throw(c.host, c.port, sig);
264 template <
typename F>
266 -> decltype(F{c}.create_or_null(c.host, c.port)) {
267 return F{c}.create_or_null(c.host, c.port);
std::atomic_bool AbortFlag
Definition: abort.hpp:40
void abort_checkpoint(AbortFlag &sig)
Definition: abort.hpp:58
Definition: tcp_transceiver.hpp:48
Definition: tcp_transceiver.hpp:149
std::unique_ptr< T > create_or_throw(const std::string &host, uint16_t port) const
Definition: tcp_transceiver.hpp:180
std::unique_ptr< T > create_or_null(const std::string &host, uint16_t port) const
Definition: tcp_transceiver.hpp:169
std::unique_ptr< T > create_or_throw(const std::string &host, uint16_t port, AbortFlag &sig) const
Definition: tcp_transceiver.hpp:205
Definition: tcp_transceiver.hpp:59
void tcp_disconnect()
Definition: tcp_transceiver.hpp:99
void tcp_connect(const std::string &host, uint16_t port)
Definition: tcp_transceiver.hpp:69
bool tcp_is_ok() const
Definition: tcp_transceiver.hpp:92
std::streamsize tcp_available_data() const
Definition: tcp_transceiver.hpp:112
bool tcp_is_connected() const
Definition: tcp_transceiver.hpp:85
Definition: tcp_transceiver_config.hpp:38
int32_t retry_attempts
Definition: tcp_transceiver_config.hpp:51
std::chrono::duration< float > retry_delay
Definition: tcp_transceiver_config.hpp:59
Definition: tcp_transceiver_config.hpp:82
auto create_or_null_with(const TcpTransceiverFullConfiguration &c) -> decltype(F{c}.create_or_null(c.host, c.port))
Definition: tcp_transceiver.hpp:265
auto create_or_throw_with(const TcpTransceiverFullConfiguration &c) -> decltype(F{c}.create_or_throw(c.host, c.port))
Definition: tcp_transceiver.hpp:243