View on GitHub

Asio Service Discovery

Components for service discovery via udp multicasting. It's using boost::asio for async networking. It's non-blocking and non-locking.

Download this project as a .zip file Download this project as a tar.gz file
service_discoverer.hpp
1 //
2 // service_discoverer.hpp
3 // ~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2015 Benjamin Schulz (beschulz at betabugs dot de)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #ifndef BB_SERVICE_DISCOVERER_HPP_INCLUDED
12 #define BB_SERVICE_DISCOVERER_HPP_INCLUDED
13 
14 #pragma once
15 
16 #include <iostream>
17 #include <string>
18 #include <set>
19 #include <boost/asio.hpp>
20 #include "detail/std_chrono_time_traits.hpp"
21 
22 namespace betabugs {
23 namespace networking {
24 
55 {
56  public:
60  struct service
61  {
62  /* const */ std::string service_name;
63  /* const */ std::string computer_name;
64  /* const */ boost::asio::ip::tcp::endpoint endpoint;
65  /* const */ std::chrono::steady_clock::time_point last_seen;
66 
67  bool operator<(const service& o) const
68  {
69  // last_seen is ignored
70  return std::tie(service_name, computer_name, endpoint) <
71  std::tie(o.service_name, o.computer_name, o.endpoint);
72  }
73 
74  bool operator==(const service& o) const
75  {
76  // again, last_seen is ignored
77  return std::tie(service_name, computer_name, endpoint) ==
78  std::tie(o.service_name, o.computer_name, o.endpoint);
79  }
80 
81  double age_in_seconds() const
82  {
83  auto age = std::chrono::steady_clock::now() - last_seen;
84  return std::chrono::duration_cast<std::chrono::duration<double>>(age).count();
85  }
86 
87  // this uses "name injection"
88  friend std::ostream& operator<<(std::ostream& os, const service_discoverer::service& service)
89  {
90  os << service.service_name << " on " << service.computer_name << "(" << service.endpoint << ") " <<
91  service.age_in_seconds() << " seconds ago";
92  return os;
93  }
94  };
95 
97  typedef std::set<service> services;
98 
100  typedef std::function<void(const services& services)> on_services_changed_t;
101 
112  service_discoverer(boost::asio::io_service& io_service,
113  const std::string& listen_for_service,
114  const on_services_changed_t on_services_changed,
115  const std::chrono::steady_clock::duration max_idle = std::chrono::seconds(30),
116  const size_t max_services = 10,
117  const unsigned short multicast_port = 30001,
118  const boost::asio::ip::address& listen_address = boost::asio::ip::address::from_string("0.0.0.0"),
119  const boost::asio::ip::address& multicast_address = boost::asio::ip::address::from_string("239.255.0.1")
120  )
121  : listen_for_service_(listen_for_service)
122  , socket_(io_service)
123  , idle_check_timer_(io_service)
124  , on_services_changed_(on_services_changed)
125  , max_idle_(max_idle)
126  , max_services_(max_services)
127  {
128  assert(max_services_ > 0);
129 
130  // Create the socket so that multiple may be bound to the same address.
131  boost::asio::ip::udp::endpoint listen_endpoint(
132  listen_address, multicast_port);
133  socket_.open(listen_endpoint.protocol());
134  socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
135  socket_.bind(listen_endpoint);
136 
137  // Join the multicast group.
138  socket_.set_option(
139  boost::asio::ip::multicast::join_group(multicast_address));
140 
141  start_receive();
142  }
143 
144  private:
145  void handle_message(const std::string& message, const boost::asio::ip::udp::endpoint& sender_endpoint)
146  {
147  std::vector<std::string> tokens;
148  { // simpleton "parser"
149  std::istringstream f(message);
150  std::string s;
151  while (getline(f, s, ':'))
152  tokens.push_back(s);
153 
154  if (tokens.size() != 3)
155  {
156  std::cerr << "invalid number of tokens in received service announcement: " << std::endl;
157  std::cerr << " message: " << message << std::endl;
158  std::cerr << " tokens: " << tokens.size() << std::endl;
159  return;
160  }
161  }
162  assert(tokens.size() == 3);
163 
164  std::string service_name = tokens[0];
165  std::string computer_name = tokens[1];
166  std::string port_string = tokens[2];
167 
168  // unsigned long, because it's the smalles value that suports unsigned parsing via stl :/
169  unsigned long port = 0;
170 
171  try
172  {
173  port = std::stoul(port_string);
174  }
175  catch (const std::exception& e)
176  {
177  std::cerr << "failed to parse port number from: " << port_string << std::endl;
178  return;
179  }
180 
181  if (port > std::numeric_limits<unsigned short>::max())
182  {
183  std::cerr << "failed to parse port number from: " << port_string << std::endl;
184  return;
185  }
186 
187  auto discovered_service = service
188  {
189  service_name,
190  computer_name,
191  boost::asio::ip::tcp::endpoint(sender_endpoint.address(), (unsigned short)port),
192  std::chrono::steady_clock::now()
193  };
194 
195  if (service_name == listen_for_service_)
196  {
197  // we need to do a replace here, because discovered_service might compare equal
198  // to an item already in the set. In this case no assignment would be performed and
199  // therefore last_seen would not be updated
200  discovered_services_.erase(discovered_service);
201  discovered_services_.insert(discovered_service);
202 
203  remove_idle_services();
204 
205  // if we have to much services, we need to drop the oldest one
206  if (discovered_services_.size() > max_services_)
207  {
208  // determine service whose last_seen time point is the smallest (i.e. the oldest)
209  services::iterator oldest_pos =
210  std::min_element(
211  discovered_services_.begin(),
212  discovered_services_.end(),
213  [](const service& a, const service& b)
214  {
215  return a.last_seen < b.last_seen;
216  }
217  );
218  assert(oldest_pos != discovered_services_.end());
219  discovered_services_.erase(oldest_pos);
220  }
221 
222  { // manage the idle_check_timer in case the service dies and we receive no other announcements
223 
224  { // cancel the idle_check_timer
225  boost::system::error_code ec;
226  idle_check_timer_.cancel(ec);
227  if (ec)
228  std::cerr << ec.message();
229  }
230 
231  { // determine new point in time for the timer
232  services::iterator oldest_pos =
233  std::min_element(
234  discovered_services_.begin(),
235  discovered_services_.end(),
236  [](const service& a, const service& b)
237  {
238  return a.last_seen < b.last_seen;
239  }
240  );
241  assert(oldest_pos != discovered_services_.end());
242 
243  idle_check_timer_.expires_at(oldest_pos->last_seen + max_idle_);
244  idle_check_timer_.async_wait(
245  [this](const boost::system::error_code& ec)
246  {
247  if (!ec && remove_idle_services())
248  {
249  on_services_changed_(discovered_services_);
250  }
251  }
252  );
253  }
254  }
255 
256  on_services_changed_(discovered_services_);
257  }
258  else
259  {
260  std::clog << "ignoring: " << discovered_service << std::endl;
261  }
262  }
263 
264  void start_receive()
265  {
266  // first do a receive with null_buffers to determine the size
267  socket_.async_receive(boost::asio::null_buffers(),
268  [this](const boost::system::error_code& error, unsigned int)
269  {
270  if (error)
271  {
272  std::cerr << error.message() << std::endl;
273  }
274  else
275  {
276  size_t bytes_available = socket_.available();
277 
278  auto receive_buffer = std::make_shared<std::vector<char>>(bytes_available);
279  auto sender_endpoint = std::make_shared<boost::asio::ip::udp::endpoint>();
280 
281  socket_.async_receive_from(
282  boost::asio::buffer(receive_buffer->data(), receive_buffer->size()), *sender_endpoint,
283  [this, receive_buffer, sender_endpoint] // we hold on to the shared_ptrs, so that it does not delete it's contents
284  (const boost::system::error_code& error, size_t bytes_recvd)
285  {
286  if (error)
287  {
288  std::cerr << error.message() << std::endl;
289  }
290  else
291  {
292  this->handle_message({receive_buffer->data(), receive_buffer->data() + bytes_recvd}, *sender_endpoint);
293  start_receive();
294  }
295  });
296  }
297  });
298  }
299 
300 
301  // throw out services that have not been seen for to long, returns true, if at least one service was removed, false otherwise.
302  bool remove_idle_services()
303  {
304  auto dead_line = std::chrono::steady_clock::now() - max_idle_;
305  bool services_removed = false;
306 
307  for (services::const_iterator i = discovered_services_.begin(); i != discovered_services_.end();)
308  {
309  if (i->last_seen < dead_line)
310  {
311  i = discovered_services_.erase(i);
312  services_removed = true;
313  }
314  else
315  ++i;
316  }
317 
318  return services_removed;
319  }
320 
321  typedef boost::asio::basic_deadline_timer<
322  std::chrono::steady_clock,
323  detail::std_chrono_time_traits<std::chrono::steady_clock>> steady_clock_deadline_timer_t;
324 
325  const std::string listen_for_service_;
326  boost::asio::ip::udp::socket socket_;
327  steady_clock_deadline_timer_t idle_check_timer_;
328  const on_services_changed_t on_services_changed_;
329  const std::chrono::steady_clock::duration max_idle_;
330  const size_t max_services_;
331 
332  services discovered_services_;
333 };
334 
335 }
336 }
337 
338 #endif /* BB_SERVICE_DISCOVERER_HPP_INCLUDED */
std::string service_name
the name of the service
Definition: service_discoverer.hpp:62
std::function< void(const services &services)> on_services_changed_t
this callback gets called, when ever the set of available services changes
Definition: service_discoverer.hpp:100
boost::asio::ip::tcp::endpoint endpoint
enpoint you should connect to. Even though, it's an tcp endpoint, it's up to you, what you do with th...
Definition: service_discoverer.hpp:64
service_discoverer(boost::asio::io_service &io_service, const std::string &listen_for_service, const on_services_changed_t on_services_changed, const std::chrono::steady_clock::duration max_idle=std::chrono::seconds(30), const size_t max_services=10, const unsigned short multicast_port=30001, const boost::asio::ip::address &listen_address=boost::asio::ip::address::from_string("0.0.0.0"), const boost::asio::ip::address &multicast_address=boost::asio::ip::address::from_string("239.255.0.1"))
Definition: service_discoverer.hpp:112
std::set< service > services
a set of discovered services
Definition: service_discoverer.hpp:97
Definition: service_discoverer.hpp:54
std::string computer_name
the name of the computer the service is running on
Definition: service_discoverer.hpp:63
Definition: service_discoverer.hpp:60