PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
async_client.h
Go to the documentation of this file.
1
16
17/*******************************************************************************
18 * Copyright (c) 2013-2025 Frank Pagliughi <fpagliughi@mindspring.com>
19 *
20 * All rights reserved. This program and the accompanying materials
21 * are made available under the terms of the Eclipse Public License v2.0
22 * and Eclipse Distribution License v1.0 which accompany this distribution.
23 *
24 * The Eclipse Public License is available at
25 * http://www.eclipse.org/legal/epl-v20.html
26 * and the Eclipse Distribution License is available at
27 * http://www.eclipse.org/org/documents/edl-v10.php.
28 *
29 * Contributors:
30 * Frank Pagliughi - initial implementation and documentation
31 * Frank Pagliughi - MQTT v5 support
32 *******************************************************************************/
33
34#ifndef __mqtt_async_client_h
35#define __mqtt_async_client_h
36
37#include <functional>
38#include <list>
39#include <memory>
40#include <stdexcept>
41#include <tuple>
42#include <vector>
43
44#include "MQTTAsync.h"
45#include "mqtt/callback.h"
46#include "mqtt/create_options.h"
47#include "mqtt/delivery_token.h"
48#include "mqtt/event.h"
49#include "mqtt/exception.h"
51#include "mqtt/iasync_client.h"
53#include "mqtt/message.h"
54#include "mqtt/properties.h"
56#include "mqtt/thread_queue.h"
57#include "mqtt/token.h"
58#include "mqtt/types.h"
59
60namespace mqtt {
61
62// OBSOLETE: The legacy constants that lacked the "PAHO_MQTTPP_" prefix
63// clashed with #define's from other libraries and will be removed at the
64// next major version upgrade.
65
66#if defined(PAHO_MQTTPP_VERSIONS)
68const uint32_t PAHO_MQTTPP_VERSION = 0x01050002;
70const string PAHO_MQTTPP_VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.2");
72const string PAHO_MQTTPP_COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi");
73#else
75const uint32_t VERSION = 0x01050002;
77const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.2");
79const string COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi");
80#endif
81
83
136class async_client : public virtual iasync_client
137{
138public:
140 using ptr_t = std::shared_ptr<async_client>;
142 using consumer_queue_type = std::unique_ptr<thread_queue<event>>;
143
145 using message_handler = std::function<void(const_message_ptr)>;
147 using connection_handler = std::function<void(const string& cause)>;
149 using disconnected_handler = std::function<void(const properties&, ReasonCode)>;
151 using update_connection_handler = std::function<bool(connect_data&)>;
152
153private:
155 using guard = std::unique_lock<std::mutex>;
157 using unique_lock = std::unique_lock<std::mutex>;
158
160 mutable std::mutex lock_;
162 MQTTAsync cli_;
164 const create_options createOpts_;
166 int mqttVersion_;
168 std::unique_ptr<MQTTClient_persistence> persist_{};
170 callback* userCallback_{};
172 connection_handler connHandler_;
174 connection_handler connLostHandler_;
176 disconnected_handler disconnectedHandler_;
178 update_connection_handler updateConnectionHandler_;
180 message_handler msgHandler_;
182 connect_options connOpts_;
184 token_ptr connTok_;
186 std::list<token_ptr> pendingTokens_;
188 std::list<delivery_token_ptr> pendingDeliveryTokens_;
191
193 static void on_connected(void* context, char* cause);
194 static void on_connection_lost(void* context, char* cause);
195 static void on_disconnected(
196 void* context, MQTTProperties* cprops, MQTTReasonCodes reasonCode
197 );
198 static int on_message_arrived(
199 void* context, char* topicName, int topicLen, MQTTAsync_message* msg
200 );
201 static void on_delivery_complete(void* context, MQTTAsync_token tok);
202 static int on_update_connection(void* context, MQTTAsync_connectData* cdata);
203
205 friend class token;
206 virtual void add_token(token_ptr tok);
207 virtual void add_token(delivery_token_ptr tok);
208 virtual void remove_token(token* tok) override;
209 virtual void remove_token(token_ptr tok) { remove_token(tok.get()); }
210 void remove_token(delivery_token_ptr tok) { remove_token(tok.get()); }
211
213 async_client() = delete;
214 async_client(const async_client&) = delete;
215 async_client& operator=(const async_client&) = delete;
216
218 static void check_ret(int rc) {
219 if (rc != MQTTASYNC_SUCCESS)
220 throw exception(rc);
221 }
229 void create();
230
231public:
243 explicit async_client(
244 const string& serverURI, const string& clientId = string{},
245 const persistence_type& persistence = NO_PERSISTENCE
246 )
247 : createOpts_{serverURI, clientId, persistence} {
248 create();
249 }
264 const string& serverURI, const string& clientId, int maxBufferedMessages,
265 const persistence_type& persistence = NO_PERSISTENCE
266 )
267 : createOpts_{serverURI, clientId, maxBufferedMessages, persistence} {
268 create();
269 }
283 const string& serverURI, const string& clientId, const create_options& opts,
284 const persistence_type& persistence = NO_PERSISTENCE
285 )
286 : createOpts_{serverURI, clientId, opts, persistence} {
287 create();
288 }
297 async_client(const create_options& opts) : createOpts_{opts} { create(); }
301 ~async_client() override;
308 void set_callback(callback& cb) override;
314 void disable_callbacks() override;
351 token_ptr connect() override;
376 connect_options options, void* userContext, iaction_listener& cb
377 ) override;
389 token_ptr connect(void* userContext, iaction_listener& cb) override {
390 return connect(connect_options{}, userContext, cb);
391 }
423 token_ptr disconnect(int timeout) override {
424 return disconnect(disconnect_options(timeout));
425 }
436 template <class Rep, class Period>
437 token_ptr disconnect(const std::chrono::duration<Rep, Period>& timeout) {
438 // TODO: check range
439 return disconnect((int)to_milliseconds_count(timeout));
440 }
455 token_ptr disconnect(int timeout, void* userContext, iaction_listener& cb) override;
470 template <class Rep, class Period>
472 const std::chrono::duration<Rep, Period>& timeout, void* userContext,
474 ) {
475 // TODO: check range
476 return disconnect((int)to_milliseconds_count(timeout), userContext, cb);
477 }
489 token_ptr disconnect(void* userContext, iaction_listener& cb) override {
490 return disconnect(0L, userContext, cb);
491 }
501 std::vector<delivery_token_ptr> get_pending_delivery_tokens() const override;
506 string get_client_id() const override { return createOpts_.get_client_id(); }
511 string get_server_uri() const override { return createOpts_.get_server_uri(); }
521 int mqtt_version() const noexcept { return mqttVersion_; }
528 guard g(lock_);
529 return connOpts_;
530 }
535 bool is_connected() const override { return to_bool(MQTTAsync_isConnected(cli_)); }
549 string_ref topic, const void* payload, size_t n, int qos, bool retained,
550 const properties& props = properties()
551 ) override;
560 delivery_token_ptr publish(string_ref topic, const void* payload, size_t n) override {
561 return publish(
562 std::move(topic), payload, n, message::DFLT_QOS, message::DFLT_RETAINED
563 );
564 }
577 string_ref topic, binary_ref payload, int qos, bool retained,
578 const properties& props = properties()
579 ) override;
588 return publish(
589 std::move(topic), std::move(payload), message::DFLT_QOS, message::DFLT_RETAINED
590 );
591 }
608 string_ref topic, const void* payload, size_t n, int qos, bool retained,
609 void* userContext, iaction_listener& cb
610 ) override;
632 const_message_ptr msg, void* userContext, iaction_listener& cb
633 ) override;
645 const string& topicFilter, int qos,
646 const subscribe_options& opts = subscribe_options(),
647 const properties& props = properties()
648 ) override;
667 const string& topicFilter, int qos, void* userContext, iaction_listener& cb,
668 const subscribe_options& opts = subscribe_options(),
669 const properties& props = properties()
670 ) override;
686 const_string_collection_ptr topicFilters, const qos_collection& qos,
687 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
688 const properties& props = properties()
689 ) override;
708 const_string_collection_ptr topicFilters, const qos_collection& qos,
709 void* userContext, iaction_listener& cb,
710 const std::vector<subscribe_options>& opts = std::vector<subscribe_options>(),
711 const properties& props = properties()
712 ) override;
722 const string& topicFilter, const properties& props = properties()
723 ) override;
734 const_string_collection_ptr topicFilters, const properties& props = properties()
735 ) override;
750 const_string_collection_ptr topicFilters, void* userContext, iaction_listener& cb,
751 const properties& props = properties()
752 ) override;
766 const string& topicFilter, void* userContext, iaction_listener& cb,
767 const properties& props = properties()
768 ) override;
788 void start_consuming() override;
797 void stop_consuming() override;
801 void clear_consumer() override {
802 if (que_)
803 que_->clear();
804 }
812 bool consumer_closed() noexcept override { return !que_ || que_->closed(); }
820 bool consumer_done() noexcept override { return !que_ || que_->done(); }
829 std::size_t consumer_queue_size() const override { return (que_) ? que_->size() : 0; }
836 event consume_event() override;
843 bool try_consume_event(event* evt) override;
851 template <typename Rep, class Period>
853 event* evt, const std::chrono::duration<Rep, Period>& relTime
854 ) {
855 if (!que_)
856 throw mqtt::exception(-1, "Consumer not started");
857
858 try {
859 return que_->try_get_for(evt, relTime);
860 }
861 catch (queue_closed&) {
862 *evt = event{shutdown_event{}};
863 return true;
864 }
865 }
872 template <typename Rep, class Period>
873 event try_consume_event_for(const std::chrono::duration<Rep, Period>& relTime) {
874 event evt;
875 try {
876 que_->try_get_for(&evt, relTime);
877 }
878 catch (queue_closed&) {
879 evt = event{shutdown_event{}};
880 }
881 return evt;
882 }
890 template <class Clock, class Duration>
892 event* evt, const std::chrono::time_point<Clock, Duration>& absTime
893 ) {
894 if (!que_)
895 throw mqtt::exception(-1, "Consumer not started");
896
897 try {
898 return que_->try_get_until(evt, absTime);
899 }
900 catch (queue_closed&) {
901 *evt = event{shutdown_event{}};
902 return true;
903 }
904 }
911 template <class Clock, class Duration>
912 event try_consume_event_until(const std::chrono::time_point<Clock, Duration>& absTime) {
913 event evt;
914 try {
915 que_->try_get_until(&evt, absTime);
916 }
917 catch (queue_closed&) {
918 evt = event{shutdown_event{}};
919 }
920 return evt;
921 }
943 template <typename Rep, class Period>
945 const_message_ptr* msg, const std::chrono::duration<Rep, Period>& relTime
946 ) {
947 if (!que_)
948 throw mqtt::exception(-1, "Consumer not started");
949
950 event evt;
951
952 while (true) {
953 if (!try_consume_event_for(&evt, relTime))
954 return false;
955
956 if (const auto* pval = evt.get_message_if()) {
957 *msg = std::move(*pval);
958 break;
959 }
960
961 if (evt.is_any_disconnect()) {
962 *msg = const_message_ptr{};
963 break;
964 }
965 }
966 return true;
967 }
974 template <typename Rep, class Period>
976 const std::chrono::duration<Rep, Period>& relTime
977 ) {
979 this->try_consume_message_for(&msg, relTime);
980 return msg;
981 }
989 template <class Clock, class Duration>
991 const_message_ptr* msg, const std::chrono::time_point<Clock, Duration>& absTime
992 ) {
993 if (!que_)
994 throw mqtt::exception(-1, "Consumer not started");
995
996 event evt;
997
998 while (true) {
999 if (!try_consume_event_until(&evt, absTime))
1000 return false;
1001
1002 if (const auto* pval = evt.get_message_if()) {
1003 *msg = std::move(*pval);
1004 break;
1005 }
1006
1007 if (!evt.is_any_disconnect()) {
1008 *msg = const_message_ptr{};
1009 break;
1010 }
1011 }
1012
1013 return true;
1014 }
1020 template <class Clock, class Duration>
1022 const std::chrono::time_point<Clock, Duration>& absTime
1023 ) {
1025 this->try_consume_message_until(&msg, absTime);
1026 return msg;
1027 }
1028};
1029
1032
1034} // namespace mqtt
1035
1036#endif // __mqtt_async_client_h
Definition async_client.h:137
void set_connection_lost_handler(connection_handler cb)
bool try_consume_event_until(event *evt, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:891
delivery_token_ptr publish(const_message_ptr msg, void *userContext, iaction_listener &cb) override
bool consumer_done() noexcept override
Definition async_client.h:820
std::unique_ptr< thread_queue< event > > consumer_queue_type
Definition async_client.h:142
void set_disconnected_handler(disconnected_handler cb)
const_message_ptr try_consume_message_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:1021
token_ptr subscribe(const string &topicFilter, int qos, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
token_ptr disconnect(disconnect_options opts) override
bool try_consume_message(const_message_ptr *msg) override
token_ptr disconnect() override
Definition async_client.h:404
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n) override
Definition async_client.h:560
delivery_token_ptr publish(string_ref topic, binary_ref payload, int qos, bool retained, const properties &props=properties()) override
token_ptr subscribe(const string &topicFilter, int qos, void *userContext, iaction_listener &cb, const subscribe_options &opts=subscribe_options(), const properties &props=properties()) override
std::shared_ptr< async_client > ptr_t
Definition async_client.h:140
bool consumer_closed() noexcept override
Definition async_client.h:812
void stop_consuming() override
connect_options get_connect_options() const
Definition async_client.h:527
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:944
token_ptr disconnect(int timeout) override
Definition async_client.h:423
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
std::function< bool(connect_data &)> update_connection_handler
Definition async_client.h:151
void set_message_callback(message_handler cb)
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout, void *userContext, iaction_listener &cb)
Definition async_client.h:471
token_ptr connect(connect_options options, void *userContext, iaction_listener &cb) override
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained, const properties &props=properties()) override
std::function< void(const properties &, ReasonCode)> disconnected_handler
Definition async_client.h:149
std::function< void(const_message_ptr)> message_handler
Definition async_client.h:145
std::size_t consumer_queue_size() const override
Definition async_client.h:829
async_client(const string &serverURI, const string &clientId, int maxBufferedMessages, const persistence_type &persistence=NO_PERSISTENCE)
Definition async_client.h:263
token_ptr unsubscribe(const string &topicFilter, const properties &props=properties()) override
delivery_token_ptr publish(string_ref topic, binary_ref payload) override
Definition async_client.h:587
string get_client_id() const override
Definition async_client.h:506
void set_callback(callback &cb) override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:990
token_ptr subscribe(const_string_collection_ptr topicFilters, const qos_collection &qos, void *userContext, iaction_listener &cb, const std::vector< subscribe_options > &opts=std::vector< subscribe_options >(), const properties &props=properties()) override
const_message_ptr consume_message() override
token_ptr reconnect() override
const_message_ptr try_consume_message_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:975
void set_connected_handler(connection_handler cb)
bool try_consume_event_for(event *evt, const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:852
async_client(const create_options &opts)
Definition async_client.h:297
token_ptr disconnect(int timeout, void *userContext, iaction_listener &cb) override
std::vector< delivery_token_ptr > get_pending_delivery_tokens() const override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, const properties &props=properties()) override
token_ptr unsubscribe(const_string_collection_ptr topicFilters, void *userContext, iaction_listener &cb, const properties &props=properties()) override
void clear_consumer() override
Definition async_client.h:801
bool try_consume_event(event *evt) override
int mqtt_version() const noexcept
Definition async_client.h:521
token_ptr unsubscribe(const string &topicFilter, void *userContext, iaction_listener &cb, const properties &props=properties()) override
event try_consume_event_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition async_client.h:912
event consume_event() override
string get_server_uri() const override
Definition async_client.h:511
~async_client() override
token_ptr disconnect(const std::chrono::duration< Rep, Period > &timeout)
Definition async_client.h:437
delivery_token_ptr get_pending_delivery_token(int msgID) const override
token_ptr disconnect(void *userContext, iaction_listener &cb) override
Definition async_client.h:489
token_ptr connect(void *userContext, iaction_listener &cb) override
Definition async_client.h:389
void disable_callbacks() override
async_client(const string &serverURI, const string &clientId=string{}, const persistence_type &persistence=NO_PERSISTENCE)
Definition async_client.h:243
void set_update_connection_handler(update_connection_handler cb)
token_ptr connect() override
bool is_connected() const override
Definition async_client.h:535
event try_consume_event_for(const std::chrono::duration< Rep, Period > &relTime)
Definition async_client.h:873
delivery_token_ptr publish(const_message_ptr msg) override
void start_consuming() override
token_ptr connect(connect_options options) override
async_client(const string &serverURI, const string &clientId, const create_options &opts, const persistence_type &persistence=NO_PERSISTENCE)
Definition async_client.h:282
std::function< void(const string &cause)> connection_handler
Definition async_client.h:147
Definition callback.h:43
Definition connect_options.h:571
Definition connect_options.h:50
Definition create_options.h:77
const string & get_server_uri() const noexcept
Definition create_options.h:205
const string & get_client_id() const noexcept
Definition create_options.h:215
Definition disconnect_options.h:41
Definition event.h:87
Definition exception.h:48
Definition iaction_listener.h:50
Definition iasync_client.h:60
std::vector< int > qos_collection
Definition iasync_client.h:66
static constexpr bool DFLT_RETAINED
Definition message.h:62
static constexpr int DFLT_QOS
Definition message.h:60
Definition properties.h:293
Definition thread_queue.h:43
Definition subscribe_options.h:49
Definition token.h:54
Definition topic.h:45
Definition async_client.h:60
ReasonCode
Definition reason_code.h:39
const string COPYRIGHT("Copyright (c) 2013-2025 Frank Pagliughi")
constexpr no_persistence NO_PERSISTENCE
Definition create_options.h:43
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:259
bool to_bool(int n)
Definition types.h:107
token::ptr_t token_ptr
Definition token.h:513
const uint32_t VERSION
Definition async_client.h:75
message::const_ptr_t const_message_ptr
Definition message.h:372
const string VERSION_STR("Paho MQTT C++ (mqttpp) v. 1.5.2")
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition types.h:95
delivery_token::ptr_t delivery_token_ptr
Definition delivery_token.h:127
async_client::ptr_t async_client_ptr
Definition async_client.h:1031
std::variant< no_persistence, string, iclient_persistence * > persistence_type
Definition create_options.h:53
Definition event.h:59