View on GitHub

Thrift Asio

Asynchronous client and server for apache thrift implemented via boost::asio

Download this project as a .zip file Download this project as a tar.gz file
thrift_asio_server.hpp
1 //
2 // Created by Benjamin Schulz on 15/03/15.
3 //
4 
5 #ifndef _THRIFT_ASIO_SERVER_HPP_
6 #define _THRIFT_ASIO_SERVER_HPP_
7 
8 #pragma once
9 
10 #include <boost/smart_ptr.hpp>
11 #include <boost/asio.hpp>
12 #include <thrift/TProcessor.h>
13 #include <thrift/transport/TBufferTransports.h>
14 #include <thrift/protocol/TBinaryProtocol.h>
15 #include <iostream>
16 #include "./thrift_asio_transport.hpp"
17 
18 namespace betabugs{
19 namespace networking{
20 
41 template <typename HandlerType>
43 {
44  typedef boost::shared_ptr<HandlerType> Handler_ptr;
45 
46  // forward typedefs to minimize pollution
47  typedef apache::thrift::TProcessor TProcessor;
48  typedef apache::thrift::transport::TMemoryBuffer TMemoryBuffer;
49  typedef apache::thrift::protocol::TBinaryProtocol TBinaryProtocol;
50 
51  public:
71  static void serve(
72  boost::asio::io_service& io_service,
73  TProcessor& processor,
74  Handler_ptr handler,
75  unsigned short port
76  )
77  {
78  using boost::asio::ip::tcp;
79 
80  auto acceptor = std::make_shared<tcp::acceptor>(
81  io_service,
82  boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
83  true
84  );
85 
86  start_accept(io_service, acceptor, processor, handler);
87  }
88 
89  private:
90  static void start_accept(
91  boost::asio::io_service& io_service,
92  std::shared_ptr<boost::asio::ip::tcp::acceptor> acceptor,
93  TProcessor& processor,
94  Handler_ptr handler
95  )
96  {
97  using boost::asio::ip::tcp;
98 
99  auto socket = std::make_shared<tcp::socket>(io_service);
100  acceptor->async_accept(
101  *socket,
102  [&io_service, acceptor, socket, &processor, handler]
103  (boost::system::error_code ec)
104  {
105  if (ec)
106  {
107  std::clog << ec.message() << std::endl;
108  }
109  else
110  {
111  std::clog << "client connected" << std::endl;
112  on_accept(io_service, socket, processor, handler);
113  }
114 
115  // Note: this will accept new connections without any bounds
116  start_accept(io_service, acceptor, processor, handler);
117  }
118  );
119  }
120 
121  // called when a new client connection was established (accepted)
122  static void on_accept(
123  boost::asio::io_service& io_service,
124  std::shared_ptr<boost::asio::ip::tcp::socket> socket,
125  TProcessor& processor,
126  Handler_ptr handler
127  )
128  {
129  using boost::make_shared;
130 
131  // construct the output_protocol and call the handler
132  auto t1 = boost::make_shared<thrift_asio_transport>(socket, handler.get());
133  auto t2 = boost::make_shared<apache::thrift::transport::TFramedTransport>(t1);
134  auto output_protocol = boost::make_shared<TBinaryProtocol>(t2);
135  handler->on_client_connected(output_protocol);
136 
137  read_frame_size(io_service, socket, output_protocol, processor, handler);
138  }
139 
140  // read the size of a frame. Clients are expected to use the framed protocol
141  static void read_frame_size(
142  boost::asio::io_service& io_service,
143  std::shared_ptr<boost::asio::ip::tcp::socket> socket,
144  boost::shared_ptr<TBinaryProtocol> output_protocol,
145  TProcessor& processor,
146  Handler_ptr handler
147  )
148  {
149  auto frame_size = std::make_shared<uint32_t>(0);
150  boost::asio::async_read(
151  *socket, boost::asio::buffer(frame_size.get(), sizeof(uint32_t)),
152  [&io_service, socket, output_protocol, &processor, handler, frame_size]
153  (const boost::system::error_code& ec, std::size_t /*bytes_transferred*/)
154  {
155  if(ec)
156  {
157  std::clog << ec.message() << std::endl;
158  handler->on_client_disconnected(output_protocol, ec);
159  }
160  else
161  {
162  *frame_size = ntohl(*frame_size);
163  read_frame_data(io_service, socket, output_protocol, processor, handler, *frame_size);
164  }
165  }
166  );
167  }
168 
169  // read the data of the frame
170  static void read_frame_data(
171  boost::asio::io_service& io_service,
172  std::shared_ptr<boost::asio::ip::tcp::socket> socket,
173  boost::shared_ptr<TBinaryProtocol> output_protocol,
174  TProcessor& processor,
175  Handler_ptr handler,
176  uint32_t frame_size
177  )
178  {
179  auto frame_bytes = std::make_shared<std::vector<uint8_t>>(frame_size);
180  boost::asio::async_read(
181  *socket, boost::asio::buffer(frame_bytes->data(), frame_size),
182  [&io_service, socket, output_protocol, &processor, handler, frame_bytes]
183  (const boost::system::error_code& ec, std::size_t /*bytes_transferred*/)
184  {
185  if(ec)
186  {
187  std::clog << ec.message() << std::endl;
188  handler->on_client_disconnected(output_protocol, ec);
189  }
190  else
191  {
192  std::clog << "got frame bytes: " << frame_bytes->size() << std::endl;
193 
194  auto input_transport = boost::make_shared<TMemoryBuffer>(frame_bytes->data(), frame_bytes->size());
195  auto input_protocol = boost::make_shared<TBinaryProtocol>(input_transport);
196 
197  void* connection_context = nullptr;
198 
199  handler->before_process(output_protocol);
200  processor.process(input_protocol, output_protocol, connection_context);
201  handler->after_process();
202 
203  // read the next frame
204  read_frame_size(io_service, socket, output_protocol, processor, handler);
205  }
206  }
207  );
208  }
209 };
210 
215 }
216 }
217 
218 
219 #endif //_THRIFT_ASIO_SERVER_HPP_
static void serve(boost::asio::io_service &io_service, TProcessor &processor, Handler_ptr handler, unsigned short port)
Definition: thrift_asio_server.hpp:71
Definition: thrift_asio_server.hpp:42