5 #ifndef _THRIFT_ASIO_SERVER_HPP_
6 #define _THRIFT_ASIO_SERVER_HPP_
10 #include <boost/smart_ptr.hpp>
11 #include <boost/asio.hpp>
12 #include <thrift/TProcessor.h>
13 #include <thrift/transport/TBufferTransports.h>
14 #include <thrift/protocol/TBinaryProtocol.h>
16 #include "./thrift_asio_transport.hpp"
41 template <
typename HandlerType>
44 typedef boost::shared_ptr<HandlerType> Handler_ptr;
47 typedef apache::thrift::TProcessor TProcessor;
48 typedef apache::thrift::transport::TMemoryBuffer TMemoryBuffer;
49 typedef apache::thrift::protocol::TBinaryProtocol TBinaryProtocol;
72 boost::asio::io_service& io_service,
73 TProcessor& processor,
78 using boost::asio::ip::tcp;
80 auto acceptor = std::make_shared<tcp::acceptor>(
82 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
86 start_accept(io_service, acceptor, processor, handler);
90 static void start_accept(
91 boost::asio::io_service& io_service,
92 std::shared_ptr<boost::asio::ip::tcp::acceptor> acceptor,
93 TProcessor& processor,
97 using boost::asio::ip::tcp;
99 auto socket = std::make_shared<tcp::socket>(io_service);
100 acceptor->async_accept(
102 [&io_service, acceptor, socket, &processor, handler]
103 (boost::system::error_code ec)
107 std::clog << ec.message() << std::endl;
111 std::clog <<
"client connected" << std::endl;
112 on_accept(io_service, socket, processor, handler);
116 start_accept(io_service, acceptor, processor, handler);
122 static void on_accept(
123 boost::asio::io_service& io_service,
124 std::shared_ptr<boost::asio::ip::tcp::socket> socket,
125 TProcessor& processor,
129 using boost::make_shared;
132 auto t1 = boost::make_shared<thrift_asio_transport>(socket, handler.get());
133 auto t2 = boost::make_shared<apache::thrift::transport::TFramedTransport>(t1);
134 auto output_protocol = boost::make_shared<TBinaryProtocol>(t2);
135 handler->on_client_connected(output_protocol);
137 read_frame_size(io_service, socket, output_protocol, processor, handler);
141 static void read_frame_size(
142 boost::asio::io_service& io_service,
143 std::shared_ptr<boost::asio::ip::tcp::socket> socket,
144 boost::shared_ptr<TBinaryProtocol> output_protocol,
145 TProcessor& processor,
149 auto frame_size = std::make_shared<uint32_t>(0);
150 boost::asio::async_read(
151 *socket, boost::asio::buffer(frame_size.get(),
sizeof(uint32_t)),
152 [&io_service, socket, output_protocol, &processor, handler, frame_size]
153 (
const boost::system::error_code& ec, std::size_t )
157 std::clog << ec.message() << std::endl;
158 handler->on_client_disconnected(output_protocol, ec);
162 *frame_size = ntohl(*frame_size);
163 read_frame_data(io_service, socket, output_protocol, processor, handler, *frame_size);
170 static void read_frame_data(
171 boost::asio::io_service& io_service,
172 std::shared_ptr<boost::asio::ip::tcp::socket> socket,
173 boost::shared_ptr<TBinaryProtocol> output_protocol,
174 TProcessor& processor,
179 auto frame_bytes = std::make_shared<std::vector<uint8_t>>(frame_size);
180 boost::asio::async_read(
181 *socket, boost::asio::buffer(frame_bytes->data(), frame_size),
182 [&io_service, socket, output_protocol, &processor, handler, frame_bytes]
183 (
const boost::system::error_code& ec, std::size_t )
187 std::clog << ec.message() << std::endl;
188 handler->on_client_disconnected(output_protocol, ec);
192 std::clog <<
"got frame bytes: " << frame_bytes->size() << std::endl;
194 auto input_transport = boost::make_shared<TMemoryBuffer>(frame_bytes->data(), frame_bytes->size());
195 auto input_protocol = boost::make_shared<TBinaryProtocol>(input_transport);
197 void* connection_context =
nullptr;
199 handler->before_process(output_protocol);
200 processor.process(input_protocol, output_protocol, connection_context);
201 handler->after_process();
204 read_frame_size(io_service, socket, output_protocol, processor, handler);
219 #endif //_THRIFT_ASIO_SERVER_HPP_
static void serve(boost::asio::io_service &io_service, TProcessor &processor, Handler_ptr handler, unsigned short port)
Definition: thrift_asio_server.hpp:71
Definition: thrift_asio_server.hpp:42