wamp_session.hpp
1 //
3 // Copyright (c) Crossbar.io Technologies GmbH and contributors
4 //
5 // Boost Software License - Version 1.0 - August 17th, 2003
6 //
7 // Permission is hereby granted, free of charge, to any person or organization
8 // obtaining a copy of the software and accompanying documentation covered by
9 // this license (the "Software") to use, reproduce, display, distribute,
10 // execute, and transmit the Software, and to prepare derivative works of the
11 // Software, and to permit third-parties to whom the Software is furnished to
12 // do so, all subject to the following:
13 //
14 // The copyright notices in the Software and this entire statement, including
15 // the above license grant, this restriction and the following disclaimer,
16 // must be included in all copies of the Software, in whole or in part, and
17 // all derivative works of the Software, unless such copies or derivative
18 // works are solely in the form of machine-executable object code generated by
19 // a source language processor.
20 //
21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 // FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
24 // SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
25 // FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
26 // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
27 // DEALINGS IN THE SOFTWARE.
28 //
30 
31 #ifndef AUTOBAHN_SESSION_HPP
32 #define AUTOBAHN_SESSION_HPP
33 
34 #include "wamp_call_options.hpp"
35 #include "wamp_call_result.hpp"
36 #include "wamp_event_handler.hpp"
37 #include "wamp_message.hpp"
38 #include "wamp_procedure.hpp"
39 #include "wamp_publish_options.hpp"
40 #include "wamp_subscribe_options.hpp"
41 #include "wamp_transport_handler.hpp"
42 #include "boost_config.hpp"
43 
44 #include <boost/asio.hpp>
45 #include <cstdint>
46 #include <functional>
47 #include <istream>
48 #include <ostream>
49 #include <map>
50 #include <memory>
51 #include <msgpack.hpp>
52 #include <stdexcept>
53 #include <string>
54 #include <utility>
55 #include <vector>
56 
57 #if defined(_WIN32) || defined(WIN32)
58 #define WIN32_LEAN_AND_MEAN
59 #endif
60 
61 #ifdef ERROR
62 #undef ERROR
63 #endif
64 
65 namespace autobahn {
66 
67 class wamp_call;
68 class wamp_message;
69 class wamp_register_request;
70 class wamp_registration;
71 class wamp_subscribe_request;
72 class wamp_subscription;
73 class wamp_transport;
74 class wamp_unregister_request;
75 class wamp_unsubscribe_request;
76 class wamp_authenticate;
77 class wamp_challenge;
78 
87 class wamp_session :
89  public wamp_transport_handler,
90  public std::enable_shared_from_this<wamp_session>
91 {
92 public:
93 
100  wamp_session(
101  boost::asio::io_service& io_service,
102  bool debug_enabled = false);
103 
104  ~wamp_session();
105 
111  boost::future<void> start();
112 
118  boost::future<void> stop();
119 
128  boost::future<uint64_t> join(
129  const std::string& realm,
130  const std::vector<std::string>& authmethods = std::vector<std::string>(),
131  const std::string& authid = "");
132 
139  boost::future<std::string> leave(
140  const std::string& reason = std::string("wamp.error.close_realm"));
141 
150  boost::future<void> publish(const std::string& topic,
151  const wamp_publish_options& options = wamp_publish_options());
152 
161  template <typename List>
162  boost::future<void> publish(const std::string& topic, const List& arguments,
163  const wamp_publish_options& options = wamp_publish_options());
164 
174  template <typename List, typename Map>
175  boost::future<void> publish(
176  const std::string& topic,
177  const List& arguments,
178  const Map& kw_arguments,
179  const wamp_publish_options& options = wamp_publish_options());
180 
189  boost::future<wamp_subscription> subscribe(
190  const std::string& topic,
191  const wamp_event_handler& handler,
192  const wamp_subscribe_options& options = wamp_subscribe_options());
193 
200  boost::future<void> unsubscribe(const wamp_subscription& subscription);
201 
209  boost::future<wamp_call_result> call(
210  const std::string& procedure,
211  const wamp_call_options& options = wamp_call_options());
212 
221  template <typename List>
222  boost::future<wamp_call_result> call(
223  const std::string& procedure,
224  const List& arguments,
225  const wamp_call_options& options = wamp_call_options());
226 
236  template<typename List, typename Map>
237  boost::future<wamp_call_result> call(
238  const std::string& procedure,
239  const List& arguments, const Map& kw_arguments,
240  const wamp_call_options& options = wamp_call_options());
241 
250  boost::future<wamp_registration> provide(
251  const std::string& uri,
252  const wamp_procedure& procedure,
253  const provide_options& options = provide_options());
254 
261  boost::future<void> unprovide(const wamp_registration& registration);
262 
271  virtual boost::future<wamp_authenticate> on_challenge(const wamp_challenge& challenge);
272 
305  const std::unordered_map<std::string, msgpack::object>& welcome_details();
306 
307 private:
308  // Implements the wamp transport handler interface.
309  virtual void on_attach(const std::shared_ptr<wamp_transport>& transport) override;
310  virtual void on_detach(bool was_clean, const std::string& reason) override;
311  virtual void on_message(wamp_message&& message) override;
312 
313  // WAMP message processing
314  void process_error(wamp_message&& message);
315  void process_welcome(wamp_message&& message);
316  void process_abort(wamp_message&& message);
317  void process_challenge(wamp_message&& message);
318  void process_call_result(wamp_message&& message);
319  void process_subscribed(wamp_message&& message);
320  void process_unsubscribed(wamp_message&& message);
321  void process_event(wamp_message&& message);
322  void process_registered(wamp_message&& message);
323  void process_unregistered(wamp_message&& message);
324  void process_invocation(wamp_message&& message);
325  void process_goodbye(wamp_message&& message);
326 
327  // Transmitting/receiving messages
328  void send_message(wamp_message&& message, bool session_established = true);
329  void receive_message();
330 
331  void got_handshake_reply(const boost::system::error_code& error);
332  void got_message_header(const boost::system::error_code& error);
333  void got_message_body(const boost::system::error_code& error);
334  void got_message(wamp_message&& message);
335 
336  bool m_debug_enabled;
337 
338  boost::asio::io_service& m_io_service;
339 
340  // The transport this session runs on.
341  std::shared_ptr<wamp_transport> m_transport;
342 
343  // Last request ID of outgoing WAMP requests.
344  std::atomic<uint64_t> m_request_id;
345 
346  // WAMP session ID (if the session is joined to a realm).
347  uint64_t m_session_id;
348 
349  // Synchronization for dealing with starting the session.
350  boost::promise<void> m_session_start;
351 
352  // Future to be fired when session was joined.
353  boost::promise<uint64_t> m_session_join;
354 
355  // Whether or not we have already sent a goodbye when leaving the session.
356  bool m_goodbye_sent;
357 
358  boost::promise<std::string> m_session_leave;
359 
360  // Set to true when the session is stopped.
361  bool m_running;
362 
363  // Synchronization for dealing with stopping the session
364  boost::promise<void> m_session_stop;
365 
367  // Caller
368 
369  // Track pending calls by request id.
370  std::map<uint64_t /*request id*/, std::shared_ptr<wamp_call>> m_calls;
371 
373  // Subscriber
374 
375  // Pending subscribe requests by request id.
376  std::map<uint64_t /*request id*/, std::shared_ptr<wamp_subscribe_request>> m_subscribe_requests;
377 
378  // Pending unsubscribe requests by request id.
379  std::map<uint64_t /*request id*/, std::shared_ptr<wamp_unsubscribe_request>> m_unsubscribe_requests;
380 
381  // Event handlers by subscription id.
382  std::multimap<uint64_t /*subscription id*/, wamp_event_handler> m_subscription_handlers;
383 
385  // Callee
386 
387  // Map of outstanding WAMP register requests (request ID -> register request).
388  std::map<uint64_t, std::shared_ptr<wamp_register_request>> m_register_requests;
389 
390  // Map of outstanding WAMP unregister requests (request ID -> unregister request).
391  std::map<uint64_t, std::shared_ptr<wamp_unregister_request>> m_unregister_requests;
392 
393  // Map of registered procedures (registration ID -> procedure)
394  std::map<uint64_t, wamp_procedure> m_procedures;
395 
396  // Welcome details
397  std::unordered_map<std::string, msgpack::object> m_welcome_details;
398 
399 };
400 
401 } // namespace autobahn
402 
403 #include "wamp_session.ipp"
404 
405 #endif // AUTOBAHN_SESSION_HPP
boost::future< wamp_registration > provide(const std::string &uri, const wamp_procedure &procedure, const provide_options &options=provide_options())
Register a procedure that can be called remotely.
boost::future< wamp_call_result > call(const std::string &procedure, const wamp_call_options &options=wamp_call_options())
Calls a remote procedure with no arguments.
boost::future< void > stop()
Stops the session with the router.
wamp_session(boost::asio::io_service &io_service, bool debug_enabled=false)
Create a new WAMP session.
boost::future< wamp_subscription > subscribe(const std::string &topic, const wamp_event_handler &handler, const wamp_subscribe_options &options=wamp_subscribe_options())
Subscribe a handler to a topic to receive events.
boost::future< void > publish(const std::string &topic, const wamp_publish_options &options=wamp_publish_options())
Publish an event with empty payload to a topic.
boost::future< void > unprovide(const wamp_registration &registration)
Unregister a handler to previosly registered service.
boost::future< void > start()
Establishes a session with the router.
boost::future< uint64_t > join(const std::string &realm, const std::vector< std::string > &authmethods=std::vector< std::string >(), const std::string &authid="")
Join a realm with the session.
boost::future< void > unsubscribe(const wamp_subscription &subscription)
Unubscribe a handler to previously subscribed topic.
boost::future< std::string > leave(const std::string &reason=std::string("wamp.error.close_realm"))
Leave the realm.
const std::unordered_map< std::string, msgpack::object > & welcome_details()
Accessor method to WELCOME DETAILS dictionary containing router roles and corresponding features...
virtual boost::future< wamp_authenticate > on_challenge(const wamp_challenge &challenge)
Function called by the session when authenticating.