module-arch-POC/src/port_subscriber_impl.hpp

56 lines
2.0 KiB
C++

#pragma once
#include "port_impl_base.hpp"
#include "port_types.hpp"
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::SUB, Callback> : public PortImplBase<Callback> {
public:
using base_t = PortImplBase<Callback>;
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list<std::string> &topics, Callback &&callback)
: PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)), mc_topics_(topics) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub);
this->m_sock__.connect(this->mc_endpoint__);
for (const auto &topic : mc_topics_) {
this->m_sock__.set(zmq::sockopt::subscribe, topic);
}
}
void listen(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
while (!st.stop_requested()) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
fmt::print("Received envelope: {}\r\n", std::string(static_cast<const char *>(msg.data()), msg.size()));
this->m_sock__.recv(msg).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto [typehash, data] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
this->mc_cbk__(data, typehash);
return std::optional(res);
});
return std::optional(res);
});
}
},
st);
}
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); };
private:
const std::list<std::string> mc_topics_;
};