$darkmode
stdstream.hpp
1 #include <condition_variable>
2 #include <deque>
3 #include <mutex>
4 #include <string>
5 #include <thread>
6 
7 // experimental implementation
8 
9 #define LRDB_IOSTREAM_PREFIX "lrdb_stream_message:"
10 namespace lrdb {
11 
13  public:
14  command_stream_stdstream(std::istream& in, std::ostream& out)
15  : end_(false), istream_(in), ostream_(out) {
16  thread_ = std::thread([&] { read_thread(); });
17  }
18  ~command_stream_stdstream() { close(); }
19 
20  void close() {
21  {
22  std::unique_lock<std::mutex> lk(mutex_);
23  end_ = true;
24  cond_.notify_all();
25  }
26  ostream_ << 1;
27  if (thread_.joinable()) {
28  thread_.join();
29  }
30  }
31  std::function<void(const std::string& data)> on_data;
32  std::function<void()> on_connection;
33  std::function<void()> on_close;
34  std::function<void(const std::string&)> on_error;
35 
36  bool is_open() const { return true; }
37  void poll() {
38  std::string msg = pop_message();
39  if (!msg.empty()) {
40  on_data(msg);
41  }
42  }
43  void run_one() {
44  std::string msg = wait_message();
45  if (!msg.empty()) {
46  on_data(msg);
47  }
48  }
49  void wait_for_connection() {}
50 
51  // sync
52  bool send_message(const std::string& message) {
53  ostream_ << (LRDB_IOSTREAM_PREFIX + message + "\r\n");
54  return true;
55  }
56 
57  private:
58  std::string pop_message() {
59  std::unique_lock<std::mutex> lk(mutex_);
60  if (command_buffer_.empty()) {
61  return "";
62  }
63 
64  std::string message = std::move(command_buffer_.front());
65  command_buffer_.pop_front();
66  return message;
67  }
68  std::string wait_message() {
69  std::unique_lock<std::mutex> lk(mutex_);
70  while (command_buffer_.empty() && !end_) {
71  cond_.wait(lk);
72  }
73  if (command_buffer_.empty()) {
74  return "";
75  }
76  std::string message = std::move(command_buffer_.front());
77  command_buffer_.pop_front();
78  return message;
79  }
80  void push_message(std::string message) {
81  std::unique_lock<std::mutex> lk(mutex_);
82  command_buffer_.push_back(std::move(message));
83  cond_.notify_one();
84  }
85 
86  void read_thread() {
87  std::unique_lock<std::mutex> lk(mutex_);
88  std::string msg;
89  while (!end_) {
90  mutex_.unlock();
91  std::getline(istream_, msg);
92  if (msg.find(LRDB_IOSTREAM_PREFIX) == 0) {
93  push_message(msg.substr(sizeof(LRDB_IOSTREAM_PREFIX)));
94  }
95  mutex_.lock();
96  }
97  }
98 
99  bool end_;
100  std::istream& istream_;
101  std::ostream& ostream_;
102  std::deque<std::string> command_buffer_;
103  std::mutex mutex_;
104  std::condition_variable cond_;
105  std::thread thread_;
106 };
107 }
Definition: stdstream.hpp:12