module-arch-POC/src/port_publisher_impl.hpp

36 lines
1.3 KiB
C++

#pragma once
#include "port_impl_base.hpp"
#include "port_types.hpp"
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Port, typename Callback> class PortImpl<port_types_e::PUB, Port, Callback> final : public PortImplBase<Port, Callback> {
public:
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.bind(ep);
}
}
~PortImpl() override {}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
try {
this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore);
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
}
};
private:
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); }
};