11#include " behaviortree_cpp/loggers/bt_zmq_publisher.h"
22#include " behaviortree_cpp/loggers/bt_flatbuffer_helper.h"
33#include < future>
4+ #include < zmq.hpp>
45
56namespace BT
67{
78std::atomic<bool > PublisherZMQ::ref_count (false );
89
9- void PublisherZMQ::createStatusBuffer ()
10+ struct PublisherZMQ ::Pimpl
1011{
11- status_buffer_.clear ();
12- applyRecursiveVisitor (root_node_, [this ](TreeNode* node) {
13- size_t index = status_buffer_.size ();
14- status_buffer_.resize (index + 3 );
15- flatbuffers::WriteScalar<uint16_t >(&status_buffer_[index], node->UID ());
16- flatbuffers::WriteScalar<int8_t >(&status_buffer_[index + 2 ],
17- static_cast <int8_t >(convertToFlatbuffers (node->status ())));
18- });
19- }
12+ Pimpl ():
13+ context (1 )
14+ , publisher(context, ZMQ_PUB)
15+ , server(context, ZMQ_REP)
16+ {}
17+
18+ zmq::context_t context;
19+ zmq::socket_t publisher;
20+ zmq::socket_t server;
21+ };
22+
2023
2124PublisherZMQ::PublisherZMQ (TreeNode* root_node, int max_msg_per_second)
2225 : StatusChangeLogger(root_node)
2326 , root_node_(root_node)
2427 , min_time_between_msgs_(std::chrono::microseconds(1000 * 1000 ) / max_msg_per_second)
25- , zmq_context_(1 )
26- , zmq_publisher_(zmq_context_, ZMQ_PUB)
27- , zmq_server_(zmq_context_, ZMQ_REP)
2828 , send_pending_(false )
29+ , zmq_(new Pimpl())
2930{
3031 static bool first_instance = true ;
3132 if (first_instance)
@@ -43,11 +44,11 @@ PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
4344 tree_buffer_.resize (builder.GetSize ());
4445 memcpy (tree_buffer_.data (), builder.GetBufferPointer (), builder.GetSize ());
4546
46- zmq_publisher_ .bind (" tcp://*:1666" );
47- zmq_server_ .bind (" tcp://*:1667" );
47+ zmq_-> publisher .bind (" tcp://*:1666" );
48+ zmq_-> server .bind (" tcp://*:1667" );
4849
4950 int timeout_ms = 100 ;
50- zmq_server_ .setsockopt (ZMQ_RCVTIMEO, &timeout_ms, sizeof (int ));
51+ zmq_-> server .setsockopt (ZMQ_RCVTIMEO, &timeout_ms, sizeof (int ));
5152
5253 active_server_ = true ;
5354
@@ -57,12 +58,12 @@ PublisherZMQ::PublisherZMQ(TreeNode* root_node, int max_msg_per_second)
5758 zmq::message_t req;
5859 try
5960 {
60- bool received = zmq_server_ .recv (&req);
61+ bool received = zmq_-> server .recv (&req);
6162 if (received)
6263 {
6364 zmq::message_t reply (tree_buffer_.size ());
6465 memcpy (reply.data (), tree_buffer_.data (), tree_buffer_.size ());
65- zmq_server_ .send (reply);
66+ zmq_-> server .send (reply);
6667 }
6768 }
6869 catch (zmq::error_t & err)
@@ -84,6 +85,20 @@ PublisherZMQ::~PublisherZMQ()
8485 thread_.join ();
8586 }
8687 flush ();
88+ delete zmq_;
89+ }
90+
91+
92+ void PublisherZMQ::createStatusBuffer ()
93+ {
94+ status_buffer_.clear ();
95+ applyRecursiveVisitor (root_node_, [this ](TreeNode* node) {
96+ size_t index = status_buffer_.size ();
97+ status_buffer_.resize (index + 3 );
98+ flatbuffers::WriteScalar<uint16_t >(&status_buffer_[index], node->UID ());
99+ flatbuffers::WriteScalar<int8_t >(&status_buffer_[index + 2 ],
100+ static_cast <int8_t >(convertToFlatbuffers (node->status ())));
101+ });
87102}
88103
89104void PublisherZMQ::callback (Duration timestamp, const TreeNode& node, NodeStatus prev_status,
@@ -139,7 +154,7 @@ void PublisherZMQ::flush()
139154 createStatusBuffer ();
140155 }
141156
142- zmq_publisher_ .send (message);
157+ zmq_-> publisher .send (message);
143158 send_pending_ = false ;
144159 // printf("%.3f zmq send\n", std::chrono::duration<double>( std::chrono::high_resolution_clock::now().time_since_epoch() ).count());
145160}
0 commit comments