View on GitHub

Thrift Asio

Asynchronous client and server for apache thrift implemented via boost::asio

Download this project as a .zip file Download this project as a tar.gz file
thrift_asio_transport.hpp
1 //
2 // Created by Benjamin Schulz on 15/03/15.
3 //
4 
5 #ifndef _THRIFT_ASIO_TRANSPORT_HPP_
6 #define _THRIFT_ASIO_TRANSPORT_HPP_
7 
8 #pragma once
9 
10 #include <thrift/transport/TVirtualTransport.h>
11 //#include <boost/asio.hpp>
12 #include <boost/asio/ip/tcp.hpp>
13 #include <boost/asio/write.hpp>
14 #include <boost/make_shared.hpp>
15 #include <deque>
16 
17 namespace betabugs {
18 namespace networking {
19 
26 class thrift_asio_transport : public apache::thrift::transport::TVirtualTransport<thrift_asio_transport>
27 {
28  public:
33  {
35  virtual void on_error(const boost::system::error_code& ec)
36  {
37  (void) ec;
38  }
39 
41  virtual void on_connected()
42  {
43  }
44 
46  virtual void on_disconnected()
47  {
48  }
49  };
50 
52  typedef std::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr;
53 
56  : socket_(socket)
57  , event_handlers_(event_handlers)
58  {
59  assert(event_handlers);
60  };
61 
62 
64 
74  uint32_t read(uint8_t* buf, uint32_t len)
75  {
76  while (available_bytes() < len)
77  {
78  socket_->get_io_service().run_one();
79  }
80 
81  auto bytes_to_copy = std::min<size_t>(len, incomming_bytes_.size());
82 
83  std::copy_n(incomming_bytes_.begin(), bytes_to_copy, buf);
84 
85  incomming_bytes_.erase(
86  incomming_bytes_.begin(),
87  incomming_bytes_.begin() + std::string::difference_type(bytes_to_copy)
88  );
89 
90  return uint32_t(bytes_to_copy);
91  }
92 
94  size_t available_bytes() const
95  {
96  return incomming_bytes_.size();
97  }
98 
99  /*uint32_t readAll(uint8_t* buf, uint32_t len)
100  {
101  return read(buf, len);
102  }*/
103 
112  void write(const uint8_t* buf, uint32_t len)
113  {
114  auto holder = boost::make_shared<std::string>(buf, buf + len);
115 
116  boost::asio::async_write(
117  *socket_,
118  boost::asio::buffer(holder->data(), holder->size()),
119  [this, holder](const boost::system::error_code& ec, std::size_t /*bytes_transferred*/)
120  {
121  if (ec)
122  {
124  this->close();
125  }
126  }
127  );
128  }
129 
130  /*const uint8_t* borrow(uint8_t* buf, uint32_t* len)
131  {
132  return nullptr;
133  }
134 
135  void consume(uint32_t len)
136  {
137  }*/
138 
140  virtual bool isOpen() override
141  {
142  return state_ == OPEN;
143  }
144 
146 
151  bool isClosed()
152  {
153  return state_ == CLOSED;
154  }
155 
157  virtual bool peek() override
158  {
159  return isOpen() && !incomming_bytes_.empty();
160  }
161 
163  virtual void open() override
164  {
166  state_ = OPEN;
167 
168  socket_->set_option(boost::asio::ip::tcp::no_delay(true));
169  auto receive_buffer = std::make_shared<std::array<char, 1024>>();
170 
171  socket_->async_receive(
172  boost::asio::buffer(*receive_buffer, 1024),
173  0,
174  [this, receive_buffer]
175  (const boost::system::error_code& ec, std::size_t bytes_transferred)
176  {
177  this->on_receive(ec, receive_buffer, bytes_transferred);
178  }
179  );
180  }
181 
183  virtual void close() override
184  {
185  if (state_ == OPEN)
186  {
187  boost::system::error_code ec;
188  socket_->cancel(ec);
189  if (ec) event_handlers_->on_error(ec);
190  socket_->close(ec);
191  if (ec) event_handlers_->on_error(ec);
192  }
194  state_ = CLOSED;
195  incomming_bytes_.clear();
196  }
197 
198 
207  virtual const std::string getOrigin() override
208  {
209  return socket_->remote_endpoint().address().to_string() + ":" + std::to_string(socket_->remote_endpoint().port());
210  }
211 
212  protected:
214  enum State
215  {
220  };
221 
225 
226  private:
227  std::deque<uint8_t> incomming_bytes_;
228 
229  void on_receive(
230  const boost::system::error_code& ec,
231  std::shared_ptr<std::array<char, 1024>> receive_buffer,
232  std::size_t bytes_transferred)
233  {
234  if (ec)
235  {
237  this->close();
238  }
239  else
240  {
241  incomming_bytes_.insert(
242  incomming_bytes_.end(),
243  begin(*receive_buffer),
244  begin(*receive_buffer) + bytes_transferred
245  );
246 
247  socket_->async_receive(
248  boost::asio::buffer(*receive_buffer, sizeof(receive_buffer)),
249  0,
250  [this, receive_buffer](const boost::system::error_code& ec,
251  std::size_t bytes_transferred)
252  {
253  this->on_receive(ec, receive_buffer, bytes_transferred);
254  }
255  );
256  }
257  }
258 };
259 
260 }
261 }
262 
263 #endif //_THRIFT_ASIO_TRANSPORT_HPP_
Definition: thrift_asio_transport.hpp:32
virtual void on_disconnected()
Gets invoked when the transport is disconnected.
Definition: thrift_asio_transport.hpp:46
virtual void on_error(const boost::system::error_code &ec)
Gets invoked when an error occurred while communication over the transport.
Definition: thrift_asio_transport.hpp:35
virtual bool peek() override
return true, if there is data available to be processed
Definition: thrift_asio_transport.hpp:157
void write(const uint8_t *buf, uint32_t len)
Definition: thrift_asio_transport.hpp:112
bool isClosed()
Checks wether this transport is closed.
Definition: thrift_asio_transport.hpp:151
State
enum to represent the state-machine of the connection
Definition: thrift_asio_transport.hpp:214
virtual void close() override
closes the transport
Definition: thrift_asio_transport.hpp:183
virtual void open() override
opens the transport
Definition: thrift_asio_transport.hpp:163
we're currently trying to resolve host_name:service_name
Definition: thrift_asio_transport.hpp:218
event_handlers * event_handlers_
handles events like on_error, etc.
Definition: thrift_asio_transport.hpp:224
uint32_t read(uint8_t *buf, uint32_t len)
Attempt to read up to the specified number of bytes into the string.
Definition: thrift_asio_transport.hpp:74
virtual void on_connected()
Gets invoked when the transport was successfully connected.
Definition: thrift_asio_transport.hpp:41
size_t available_bytes() const
the number of bytes, that have been received on not yet read()
Definition: thrift_asio_transport.hpp:94
virtual bool isOpen() override
return true unless an error occured or the transport was closed
Definition: thrift_asio_transport.hpp:140
virtual const std::string getOrigin() override
Definition: thrift_asio_transport.hpp:207
std::shared_ptr< boost::asio::ip::tcp::socket > socket_ptr
a shared_ptr to a tcp socket
Definition: thrift_asio_transport.hpp:52
socket_ptr socket_
the underlying socket
Definition: thrift_asio_transport.hpp:223
thrift_asio_transport(socket_ptr socket, event_handlers *event_handlers)
creates a thrift_asio_transport from a socket_ptr
Definition: thrift_asio_transport.hpp:55
State state_
the state of this transport
Definition: thrift_asio_transport.hpp:222
Definition: thrift_asio_transport.hpp:26
the transport is open and ready for communication
Definition: thrift_asio_transport.hpp:219
the transport is closed
Definition: thrift_asio_transport.hpp:216
the transport is currently connecting
Definition: thrift_asio_transport.hpp:217