29#include <condition_variable>
57 using ptr_t = std::shared_ptr<token>;
68 using guard = std::lock_guard<std::mutex>;
70 using unique_lock = std::unique_lock<std::mutex>;
73 mutable std::mutex lock_;
75 mutable std::condition_variable cond_;
88 MQTTAsync_token msgId_;
99 iaction_listener* listener_;
106 std::unique_ptr<connect_response> connRsp_;
108 std::unique_ptr<subscribe_response> subRsp_;
110 std::unique_ptr<unsubscribe_response> unsubRsp_;
130 void set_message_id(MQTTAsync_token msgId) {
143 static void on_success(
void* tokObj, MQTTAsync_successData* rsp);
144 static void on_success5(
void* tokObj, MQTTAsync_successData5* rsp);
154 static void on_failure(
void* tokObj, MQTTAsync_failureData* rsp);
155 static void on_failure5(
void* tokObj, MQTTAsync_failureData5* rsp);
162 static void on_connected(
void* tokObj,
char* );
167 void on_success(MQTTAsync_successData* rsp);
168 void on_success5(MQTTAsync_successData5* rsp);
173 void on_failure(MQTTAsync_failureData* rsp);
174 void on_failure5(MQTTAsync_failureData5* rsp);
180 void check_ret()
const {
181 if (rc_ != MQTTASYNC_SUCCESS || reasonCode_ >= 0x80)
182 throw exception(rc_, reasonCode_, errMsg_);
267 return std::make_shared<token>(typ, cli);
281 return std::make_shared<token>(typ, cli, userContext, cb);
290 return std::make_shared<token>(typ, cli,
topic);
306 return std::make_shared<token>(typ, cli,
topic, userContext, cb);
315 return std::make_shared<token>(typ, cli, topics);
331 return std::make_shared<token>(typ, cli, topics, userContext, cb);
358 static_assert(
sizeof(msgId_) <=
sizeof(
int),
"MQTTAsync_token must fit into int");
388 explicit operator bool()
const {
390 return rc_ == MQTTASYNC_SUCCESS && reasonCode_ < 0x80;
411 userContext_ = userContext;
453 return wait_for(std::chrono::milliseconds(timeout));
461 template <
class Rep,
class Period>
462 bool wait_for(
const std::chrono::duration<Rep, Period>& relTime) {
463 unique_lock g(lock_);
464 if (!cond_.wait_for(g, std::chrono::milliseconds(relTime), [
this] {
477 template <
class Clock,
class Duration>
478 bool wait_until(
const std::chrono::time_point<Clock, Duration>& absTime) {
479 unique_lock g(lock_);
480 if (!cond_.wait_until(g, absTime, [
this] { return complete_; }))
Definition async_client.h:137
Definition connect_options.h:50
Definition server_response.h:73
Definition response_options.h:264
Definition disconnect_options.h:41
Definition exception.h:48
Definition iaction_listener.h:50
Definition iasync_client.h:60
Definition response_options.h:51
Definition string_collection.h:45
token(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition token.h:201
static ptr_t create(Type typ, iasync_client &cli, const string &topic)
Definition token.h:289
static ptr_t create(Type typ, iasync_client &cli)
Definition token.h:266
virtual int get_message_id() const
Definition token.h:357
virtual void * get_user_context() const
Definition token.h:372
virtual int get_return_code() const
Definition token.h:398
token(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition token.h:222
static ptr_t create(Type typ, iasync_client &cli, const string &topic, void *userContext, iaction_listener &cb)
Definition token.h:302
Type get_type() const
Definition token.h:338
ReasonCode get_reason_code() const
Definition token.h:423
std::shared_ptr< token > ptr_t
Definition token.h:57
unsubscribe_response get_unsubscribe_response() const
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics)
Definition token.h:314
subscribe_response get_subscribe_response() const
std::shared_ptr< const token > const_ptr_t
Definition token.h:59
virtual bool wait_for(long timeout)
Definition token.h:452
virtual const_string_collection_ptr get_topics() const
Definition token.h:367
virtual bool is_complete() const
Definition token.h:380
virtual void set_user_context(void *userContext)
Definition token.h:409
token(Type typ, iasync_client &cli)
Definition token.h:191
Type
Definition token.h:64
@ SUBSCRIBE
Definition token.h:64
@ CONNECT
Definition token.h:64
@ UNSUBSCRIBE
Definition token.h:64
@ PUBLISH
Definition token.h:64
@ DISCONNECT
Definition token.h:64
string get_error_message() const
Definition token.h:428
token(Type typ, iasync_client &cli, MQTTAsync_token tok)
bool wait_until(const std::chrono::time_point< Clock, Duration > &absTime)
Definition token.h:478
token(Type typ, iasync_client &cli, const_string_collection_ptr topics)
connect_response get_connect_response() const
virtual iaction_listener * get_action_callback() const
Definition token.h:343
virtual iasync_client * get_client() const
Definition token.h:352
virtual bool try_wait()
Definition token.h:439
virtual ~token()
Definition token.h:259
std::weak_ptr< token > weak_ptr_t
Definition token.h:61
token(Type typ, iasync_client &cli, const string &topic)
Definition token.h:210
void set_num_expected(size_t n)
Definition token.h:418
token(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
virtual void set_action_callback(iaction_listener &listener)
friend class mock_async_client
Definition token.h:114
static ptr_t create(Type typ, iasync_client &cli, const_string_collection_ptr topics, void *userContext, iaction_listener &cb)
Definition token.h:327
static ptr_t create(Type typ, iasync_client &cli, void *userContext, iaction_listener &cb)
Definition token.h:278
bool wait_for(const std::chrono::duration< Rep, Period > &relTime)
Definition token.h:462
Definition server_response.h:172
Definition async_client.h:60
ReasonCode
Definition reason_code.h:39
@ SUCCESS
Definition reason_code.h:40
token::const_ptr_t const_token_ptr
Definition token.h:516
string_collection::const_ptr_t const_string_collection_ptr
Definition string_collection.h:259
token::ptr_t token_ptr
Definition token.h:513
Definition server_response.h:131