diff --git a/src/adapter.hpp b/src/adapter.hpp index 3101ab8..ef1aa6e 100644 --- a/src/adapter.hpp +++ b/src/adapter.hpp @@ -17,6 +17,7 @@ using namespace boost::callable_traits; +// Adapter that pairs an encoder/decoder with a callback signal. template class Adapter; template requires(std::tuple_size_v> == 1 && // Decoder has only 1 argument diff --git a/src/adapter_base.hpp b/src/adapter_base.hpp index eb02a43..4e540ed 100644 --- a/src/adapter_base.hpp +++ b/src/adapter_base.hpp @@ -1,16 +1,16 @@ #pragma once -#include "utils.hpp" #include #include #include using namespace boost::signals2; +// Base adapter interface exposing a typed callback signal and name. template class AdapterBase { using cbk_ret_type_t_ = typename CallbackRetTypeTag::type; - // If return type is not void we combine all callback ivocation results to std::vector + // If return type is not void we combine all callback invocation results. class CollectAllCombiner_ { public: using result_type = std::conditional_t, std::vector, std::false_type>; diff --git a/src/adapter_factory.hpp b/src/adapter_factory.hpp index 1309de3..74f4b19 100644 --- a/src/adapter_factory.hpp +++ b/src/adapter_factory.hpp @@ -1,5 +1,6 @@ #pragma once +#include "utils.hpp" #include #include #include @@ -11,6 +12,7 @@ using namespace boost::callable_traits; template class AdapterBuilder; template class AdapterBuilderNamed; +// Fluent builder to assemble an Adapter from encoder/decoder/signature/name. template <> class AdapterBuilder<> { public: template AdapterBuilder encodeDataBy(Encoder &&encoder) { return AdapterBuilder(std::forward(encoder)); } @@ -18,51 +20,55 @@ public: template class AdapterBuilder { public: - AdapterBuilder(Encoder &&encoder) : m_encoder_ref_(std::forward(encoder)) {} + AdapterBuilder(Encoder &&encoder) : m_encoder_(std::forward(encoder)) {} template AdapterBuilder decodeDataBy(Decoder &&decoder) { - return AdapterBuilder(std::forward(m_encoder_ref_), std::forward(decoder)); + // Advance builder with decoder selected. + return AdapterBuilder(std::forward(m_encoder_), std::forward(decoder)); } private: - Encoder &&m_encoder_ref_; + std::decay_t m_encoder_; }; template class AdapterBuilder { public: - AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_ref_(std::forward(encoder)), m_decoder_ref_(std::forward(decoder)) {} + AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward(encoder)), m_decoder_(std::forward(decoder)) {} template AdapterBuilder withCallbackSignature() { - return AdapterBuilder(std::forward(m_encoder_ref_), std::forward(m_decoder_ref_)); + // Advance builder with callback signature selected. + return AdapterBuilder(std::forward(m_encoder_), std::forward(m_decoder_)); } private: - Encoder &&m_encoder_ref_; - Decoder &&m_decoder_ref_; + std::decay_t m_encoder_; + std::decay_t m_decoder_; }; template class AdapterBuilder { public: - AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_ref_(std::forward(encoder)), m_decoder_ref_(std::forward(decoder)) {} + AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward(encoder)), m_decoder_(std::forward(decoder)) {} AdapterBuilderNamed withName(const std::string &name) { - return AdapterBuilderNamed(std::forward(m_encoder_ref_), std::forward(m_decoder_ref_), name); + // Advance builder with adapter name selected. + return AdapterBuilderNamed(std::forward(m_encoder_), std::forward(m_decoder_), name); } private: - Encoder &&m_encoder_ref_; - Decoder &&m_decoder_ref_; + std::decay_t m_encoder_; + std::decay_t m_decoder_; }; template class AdapterBuilderNamed { public: AdapterBuilderNamed(Encoder &&encoder, Decoder &&decoder, const std::string &name) - : m_encoder_ref_(std::forward(encoder)), m_decoder_ref_(std::forward(decoder)), mc_name_(name) {} + : m_encoder_(std::forward(encoder)), m_decoder_(std::forward(decoder)), mc_name_(name) {} auto finalize() { + // Produce a concrete Adapter instance. return std::make_unique>, typename tp::tuple_tail>::type>>( - mc_name_, std::make_tuple(std::forward(m_encoder_ref_), std::forward(m_decoder_ref_), tag_s>{})); + mc_name_, std::make_tuple(std::forward(m_encoder_), std::forward(m_decoder_), tag_s>{})); } private: - const std::string &mc_name_; - Encoder &&m_encoder_ref_; - Decoder &&m_decoder_ref_; + const std::string mc_name_; + std::decay_t m_encoder_; + std::decay_t m_decoder_; }; diff --git a/src/module.hpp b/src/module.hpp index 8deac6e..8bc0408 100644 --- a/src/module.hpp +++ b/src/module.hpp @@ -8,11 +8,13 @@ #include "port_base.hpp" #include "tuple.hpp" +// Module type hint (currently unused). enum class module_type_e : uint32_t { STANDALONE, INCOMPOSITION, }; +// Base interface for a named module; exposes ports and run entrypoint. template class ModuleBase { public: ModuleBase(int32_t argc, char **argv, char **envp, const std::string &name) @@ -39,6 +41,7 @@ private: } mc_cli_args_; }; +// Concrete module that owns typed ports and dispatches entry with a port map. template class Module : public ModuleBase>> { public: using port_data_type_t = std::tuple_element_t<0, std::tuple>; diff --git a/src/module_factory.hpp b/src/module_factory.hpp index a783d0e..298cfe7 100644 --- a/src/module_factory.hpp +++ b/src/module_factory.hpp @@ -3,6 +3,7 @@ #include "module.hpp" #include +// Factory for building a Module from owned ports. template auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) { return std::make_unique>(argc, argv, envp, name, zmq_ctx, std::forward...>>(ports)); diff --git a/src/port.hpp b/src/port.hpp index 91e88d0..cefa5fc 100644 --- a/src/port.hpp +++ b/src/port.hpp @@ -16,14 +16,26 @@ using namespace boost::callable_traits; +// Typed port implementation owning adapters and a ZMQ transport. template class Port; template requires( + // Check return types (std::is_same_v>::base_t::callback_type_t::result_type> && ...) && + + // Check number of args ((std::tuple_size_v> == std::tuple_size_v>::callback_type_t::signature_type>>) && - ...)) + ...) && + + // Unique adapters check + ([]() consteval { + return [](std::index_sequence) consteval { + using tuple_t = std::tuple; + return ((tp::tuple_index, tuple_t>::value == Is) && ...); + }(std::index_sequence_for{}); + }.template operator()>...>())) class Port, std::tuple> : public PortBase...>>> { public: @@ -46,10 +58,12 @@ public: // fmt::print("Adding callback: name: {}, namehash: {}, typehash: {}, cbk_typehash: {}, cbk_type: {}\r\n", std::get(adapters)->name(), // std::hash()(std::get(adapters)->name()), typeid(adapter_input_type_t).hash_code(), typeid(adapter_callback_type_t).hash_code(), // type_name()); + // Cache name, name hash, input type hash, callback type hash, and adapter pointer. return std::make_tuple(std::get(adapters)->name(), std::hash()(std::get(adapters)->name()), typeid(adapter_input_type_t).hash_code(), typeid(adapter_callback_type_t).hash_code(), std::forward>(std::get(adapters))); }.template operator()()...); }(std::make_index_sequence{})) { + // Instantiate the port implementation with any extra args (e.g., SUB topics). std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoints), std::forward(args)...); }, std::forward(args)); } @@ -64,7 +78,11 @@ public: protected: void stop__() const override { m_impl__->stop_source().request_stop(); } - void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override { + void send__(const void *data, size_t size, size_t hash, const std::string &addr = "") const override { + if (!addr.empty() && this->type() != port_types_e::PUB) { + throw std::runtime_error("Addressed send is only supported for PUB ports"); + } + const void *adapter_ptr = nullptr; tp::for_each(mc_adapters_, [&](const auto &e) { @@ -74,6 +92,7 @@ protected: using adapter_in_type_t = std::remove_cvref_tdecoder())>>; adapter_ptr = &adapter; + // Encode to the environment type and pack payload. typename PortImplBase>::port_payload_s payload = { .typehash = hash, .data = {adapter->encoder()(*reinterpret_cast(data))}, @@ -81,7 +100,8 @@ protected: msgpack::sbuffer buf; msgpack::pack(buf, payload); - m_impl__->send(addr, buf); + // Send encoded payload via the transport. + m_impl__->send(buf, addr); } }); @@ -96,6 +116,7 @@ protected: tp::for_each(mc_adapters_, [&](auto &a) { if (!ret) { auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = a; + // Match by adapter name/type and callback signature. if (adapter_typehash == typehash && adapter_namehash == namehash && adapter_cbk_typehash == cbk_typehash) { ret = reinterpret_cast(adapter.get()); } @@ -121,11 +142,12 @@ private: using type_t = std::function; cbk_return_type_t_ operator()(const port_data_type_t &data, size_t hash, Aargs &&...callback_args) const { - std::conditional_t, cbk_return_type_t_, std::false_type> ret; + std::conditional_t, cbk_return_type_t_, std::false_type> ret{}; tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) { const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; if (adapter_typehash == hash) { + // Decode payload and dispatch to the adapter's signal. if constexpr (std::is_void_vcallback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) { adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); } else { @@ -143,7 +165,7 @@ private: const Port *mc_port_; }; - mutable std::unique_ptr>> m_impl__; + mutable std::unique_ptr>> m_impl__{nullptr}; mutable std::tuple>...> mc_adapters_; template void init_impl_(enum port_types_e pt, ImplArgs &&...args) const { @@ -167,11 +189,16 @@ private: if (type == pt) { using impl_type_t = std::remove_pointer_t; + // Construct the selected PortImpl if its constructor matches the args. if constexpr (std::is_constructible_v &&>) { m_impl__ = std::make_unique(this, std::forward(args)..., PortImplCallback_(this)); } } }); + + if (!m_impl__) { + throw std::runtime_error("No PortImpl for port type ..."); + } } }; diff --git a/src/port_base.hpp b/src/port_base.hpp index 5faa537..c9ebcc0 100644 --- a/src/port_base.hpp +++ b/src/port_base.hpp @@ -19,12 +19,14 @@ using namespace boost::callable_traits; +// Abstract port API for sending encoded messages and accessing callbacks. template class PortBase { + // Helper binding an address/topic to a port for chained sends. class AddressedPort_ { public: AddressedPort_(const PortBase *port, const std::string &address) : mc_addr_(address), mc_port_(port) {} template const auto &operator<<(const InType &in) const { - mc_port_->send__(mc_addr_, &in, sizeof(InType), typeid(InType).hash_code()); + mc_port_->send__(&in, sizeof(InType), typeid(InType).hash_code(), mc_addr_); return *this; } @@ -47,18 +49,20 @@ public: const auto operator[](const std::string &address) const { return AddressedPort_(this, address); } template const PortBase &operator<<(const InType &in) const { - send__("", &in, sizeof(InType), typeid(InType).hash_code()); + // Use empty address for non-addressed sends. + send__(&in, sizeof(InType), typeid(InType).hash_code()); return *this; } virtual void stop() const = 0; template auto &callback(const std::string &name) const { + // Lookup adapter by callback signature and name. return GetCallbackHelper_>::type>(this).template operator()(name); } protected: virtual void stop__() const = 0; - virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0; + virtual void send__(const void *data, size_t size, size_t type_hash, const std::string &addr = "") const = 0; virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const = 0; private: @@ -67,6 +71,7 @@ private: const std::map mc_endpoints_; const size_t mc_name_hash_; + // Type-safe callback lookup helper. template class GetCallbackHelper_; template class GetCallbackHelper_> { public: diff --git a/src/port_dealer_impl.hpp b/src/port_dealer_impl.hpp index 4653b7d..f89c88f 100644 --- a/src/port_dealer_impl.hpp +++ b/src/port_dealer_impl.hpp @@ -7,13 +7,14 @@ #include #include +// Dealer transport placeholder (no send/recv logic yet). template class PortImpl : public PortImplBase { public: PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) {} // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {}; private: void listen__(std::stop_token st) const override { diff --git a/src/port_factory.hpp b/src/port_factory.hpp index 5872961..0662250 100644 --- a/src/port_factory.hpp +++ b/src/port_factory.hpp @@ -2,6 +2,7 @@ #include "port.hpp" +// Factory for building a Port from adapters and extra args. template auto makePort(enum port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, std::tuple &&args = {}) { diff --git a/src/port_impl_base.hpp b/src/port_impl_base.hpp index 67c842e..7fc9f12 100644 --- a/src/port_impl_base.hpp +++ b/src/port_impl_base.hpp @@ -4,14 +4,17 @@ #include #include #include +#include #define ZMQ_BUILD_DRAFT_API #include +// Base ZMQ-backed transport for a port; manages socket and listener thread. template class PortImplBase { using port_data_type_t_ = std::remove_cvref_t>>; public: + // Wire payload: type hash + batch of encoded values. struct port_payload_s { size_t typehash; std::vector data; @@ -22,12 +25,22 @@ public: : mc_endpoints__(endpoints), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {} virtual ~PortImplBase() = default; - virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0; + virtual void send(const msgpack::sbuffer &data, const std::string &addr = "") const = 0; void close() { - m_sock__.close(); + // Close socket first to break any pending waits. + try { + m_sock__.close(); + } catch (...) { + // Ignore close errors during shutdown. + } + // Join listener thread if one was started. if (m_listener_thread__.valid()) { - m_listener_thread__.get(); + try { + m_listener_thread__.get(); + } catch (...) { + // Ignore listener exceptions during shutdown. + } } }; diff --git a/src/port_pair_impl.hpp b/src/port_pair_impl.hpp index 9c94c53..bb84405 100644 --- a/src/port_pair_impl.hpp +++ b/src/port_pair_impl.hpp @@ -7,11 +7,12 @@ #include #include +// Pair transport placeholder; currently no full send/recv support. template class PortImpl : public PortImplBase { public: PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { - this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push); + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair); for (const auto &[_, ep] : this->mc_endpoints__) { this->m_sock__.bind(ep); @@ -19,7 +20,7 @@ public: } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {}; private: void listen__(std::stop_token st) const override { diff --git a/src/port_publisher_impl.hpp b/src/port_publisher_impl.hpp index 7e99a11..644f200 100644 --- a/src/port_publisher_impl.hpp +++ b/src/port_publisher_impl.hpp @@ -7,6 +7,7 @@ #include #include +// Publisher transport: binds endpoints and sends topic + payload frames. template class PortImpl final : public PortImplBase { public: PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) @@ -21,10 +22,10 @@ public: ~PortImpl() override {} // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { try { this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore); - this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); } catch (const zmq::error_t &err) { fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); } diff --git a/src/port_pull_impl.hpp b/src/port_pull_impl.hpp index d047dc5..d925ff3 100644 --- a/src/port_pull_impl.hpp +++ b/src/port_pull_impl.hpp @@ -9,6 +9,7 @@ #include #include +// Pull transport: connects and listens for incoming payloads. template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; @@ -24,36 +25,46 @@ public: } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); }; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); }; private: void listen__(std::stop_token st) const override { this->m_listener_thread__ = std::async( std::launch::async, [this](std::stop_token st) { - zmq::poller_t poller; - poller.add(this->m_sock__, zmq::event_flags::pollin); + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); - while (!st.stop_requested()) { - std::vector> events(1u); - size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); - for (int32_t i = 0; i < num_events; ++i) { - zmq::message_t msg; + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; - this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { - typename base_t::port_payload_s payload; + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; - msgpack::sbuffer buf; - buf.write(reinterpret_cast(msg.data()), msg.size()); - const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); - for (const auto &data : batch) { - this->mc_cbk__(data, typehash); - } + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } - return std::optional(res); - }); + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; } } }, diff --git a/src/port_push_impl.hpp b/src/port_push_impl.hpp index 9954efe..e8ac9aa 100644 --- a/src/port_push_impl.hpp +++ b/src/port_push_impl.hpp @@ -7,6 +7,7 @@ #include #include +// Push transport: binds endpoints and sends fire-and-forget payloads. template class PortImpl : public PortImplBase { public: PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) @@ -19,9 +20,9 @@ public: } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { try { - this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); } catch (const zmq::error_t &err) { fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); } diff --git a/src/port_rep_impl.hpp b/src/port_rep_impl.hpp index a0612c4..026dc60 100644 --- a/src/port_rep_impl.hpp +++ b/src/port_rep_impl.hpp @@ -13,6 +13,7 @@ #include "port_types.hpp" #include "tuple.hpp" +// Reply transport: listens for requests and sends reply payloads. template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; @@ -23,67 +24,82 @@ public: this->m_sock__.bind(ep); } + // Start async listener loop. listen__(this->stop_source().get_token()); } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); }; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); }; private: void listen__(std::stop_token st) const override { this->m_listener_thread__ = std::async( std::launch::async, [this](std::stop_token st) { - zmq::poller_t poller; - poller.add(this->m_sock__, zmq::event_flags::pollin); + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); - while (!st.stop_requested()) { - std::vector> events(1u); - size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); - for (int32_t i = 0; i < num_events; ++i) { - zmq::message_t msg; + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; - this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { - typename base_t::port_payload_s payload; + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; - msgpack::sbuffer buf; - buf.write(reinterpret_cast(msg.data()), msg.size()); - const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); - for (const auto &data : batch) { + // Callback should return reply data for REP sockets. if constexpr (!std::is_void_vmc_cbk__)>>>) { - auto reply_data = this->mc_cbk__(data, typehash); - for (const auto &d : reply_data) { - using adapter_in_type_t = std::remove_cvref_t; + if (batch.size()) { + auto reply_data = this->mc_cbk__(batch.front(), typehash); + typename base_t::port_payload_s reply_payload = { + .typehash = typeid(typename decltype(reply_data)::value_type).hash_code(), + }; - size_t typehash = typeid(adapter_in_type_t).hash_code(); + for (const auto &d : reply_data) { + using adapter_in_type_t = std::remove_cvref_t; - tp::for_each(this->mc_port__->adapters(), [&](const auto &e) { - const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; - if constexpr (std::is_same_vencoder())>>>, - adapter_in_type_t>) { - if (adapter_typehash == typehash) { + size_t typehash = typeid(adapter_in_type_t).hash_code(); - typename base_t::port_payload_s payload = { - .typehash = typeid(d).hash_code(), - .data = {adapter->encoder()(d)}, - }; - - msgpack::sbuffer buf; - msgpack::pack(buf, payload); - this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none); + // Find matching encoder by type to build reply payload. + tp::for_each(this->mc_port__->adapters(), [&](const auto &e) { + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; + if constexpr (std::is_same_vencoder())>>>, + adapter_in_type_t>) { + if (adapter_typehash == typehash) { + reply_payload.data.push_back(adapter->encoder()(d)); + } } - } - }); + }); + } + + msgpack::sbuffer buf; + msgpack::pack(buf, reply_payload); + + // Send reply for this item. + this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none); } } else { - std::runtime_error("Callback of REPLY pattern socket should return non-void value"); + throw std::runtime_error("Callback of REPLY pattern socket should return non-void value"); } - } - return std::optional(res); - }); + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; } } }, diff --git a/src/port_req_impl.hpp b/src/port_req_impl.hpp index a2096de..9ea6c8a 100644 --- a/src/port_req_impl.hpp +++ b/src/port_req_impl.hpp @@ -8,6 +8,7 @@ #include #include +// Request transport: sends a payload and waits for a reply. template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; @@ -18,9 +19,12 @@ public: for (const auto &[_, ep] : this->mc_endpoints__) { this->m_sock__.connect(ep); } + + // Avoid blocking forever if no REP is available. + this->m_sock__.set(zmq::sockopt::rcvtimeo, static_cast(base_t::sc_recv_timeout_ms__)); } - void send(const std::string &addr, const msgpack::sbuffer &data) const override { + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { try { zmq::message_t reply; this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); diff --git a/src/port_router_impl.hpp b/src/port_router_impl.hpp index 6f3831f..98f5c0e 100644 --- a/src/port_router_impl.hpp +++ b/src/port_router_impl.hpp @@ -7,13 +7,14 @@ #include #include +// Router transport placeholder (no send/recv logic yet). template class PortImpl : public PortImplBase { public: PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) {} // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {}; private: void listen__(std::stop_token st) const override { diff --git a/src/port_subscriber_impl.hpp b/src/port_subscriber_impl.hpp index 498d9b0..2322cfc 100644 --- a/src/port_subscriber_impl.hpp +++ b/src/port_subscriber_impl.hpp @@ -13,6 +13,7 @@ #include #include +// Subscriber transport: connects, subscribes to topics, and dispatches payloads. template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; @@ -24,15 +25,17 @@ public: this->m_sock__.connect(ep); } + // Subscribe to each topic prefix. for (const auto &topic : mc_topics_) { this->m_sock__.set(zmq::sockopt::subscribe, topic); } + // Start async listener loop. listen__(this->stop_source().get_token()); } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; private: const std::list mc_topics_; @@ -41,35 +44,47 @@ private: this->m_listener_thread__ = std::async( std::launch::async, [this](std::stop_token st) { - zmq::poller_t poller; - poller.add(this->m_sock__, zmq::event_flags::pollin); + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); - while (!st.stop_requested()) { - std::vector> events(1u); - size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); - for (int32_t i = 0; i < num_events; ++i) { - zmq::message_t msg; + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; - this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { - std::string envelope = std::string(static_cast(msg.data()), msg.size()); + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + // First frame is the topic envelope. + std::string envelope = std::string(static_cast(msg.data()), msg.size()); - this->m_sock__.recv(msg).and_then([&](const auto &res) { - typename base_t::port_payload_s payload; + this->m_sock__.recv(msg).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; - msgpack::sbuffer buf; - buf.write(reinterpret_cast(msg.data()), msg.size()); - const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); - for (const auto &data : batch) { - this->mc_cbk__(data, typehash, envelope); - } + // Dispatch each decoded item. + for (const auto &data : batch) { + this->mc_cbk__(data, typehash, envelope); + } + + return std::optional(res); + }); return std::optional(res); }); - - return std::optional(res); - }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; } } }, diff --git a/src/port_types.hpp b/src/port_types.hpp index f389489..2b4f506 100644 --- a/src/port_types.hpp +++ b/src/port_types.hpp @@ -2,6 +2,7 @@ #include +// Supported ZMQ socket patterns for ports. enum class port_types_e : uint32_t { UNKNOWN = 0, PUB, @@ -20,6 +21,7 @@ enum class port_types_e : uint32_t { template class PortImpl; +// Endpoint tag base type. struct endpoints_base_s {}; template struct endpoints_s; diff --git a/src/templates.cpp b/src/templates.cpp index 317f3d3..9bcbc43 100644 --- a/src/templates.cpp +++ b/src/templates.cpp @@ -18,8 +18,10 @@ using env_data_type_t = std::vector; void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port + // Fetch typed callbacks by adapter name and signature. auto &int_cbk = subscriber.callback("int-vector-int"); auto &string_cbk = subscriber.callback("string-vector-string"); auto &double_cbk = subscriber.callback("double-vector-double"); @@ -40,6 +42,7 @@ void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unorder // Publisher module entrypoint void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. const auto &publisher = *ports.at("publisher_port"); // Get publisher port // Publish data @@ -51,13 +54,16 @@ void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordere // Publisher module entrypoint void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. const auto &publisher = *ports.at("push_port"); // Get publisher port publisher << 1 << 2 << double{3.f} << std::string("test"); } void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. const auto &puller = *ports.at("pull_port"); // Get subscriber port + // Fetch typed callbacks by adapter name and signature. auto &int_cbk = puller.callback("int-vector-int"); auto &string_cbk = puller.callback("string-vector-string"); auto &double_cbk = puller.callback("double-vector-double"); @@ -69,8 +75,10 @@ void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_m } void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. const auto &req = *ports.at("req_port"); // Get publisher port // static_assert(std::is_same_v, void>, ""); + // Fetch typed callbacks by adapter name and signature. auto &int_cbk = req.callback("int-vector-int"); auto &string_cbk = req.callback("string-vector-string"); auto &double_cbk = req.callback("double-vector-double"); @@ -84,8 +92,10 @@ void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map< } void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. const auto &rep = *ports.at("rep_port"); // Get subscriber port + // Fetch typed callbacks by adapter name and signature. auto &int_cbk = rep.callback("int-vector-int"); auto &string_cbk = rep.callback("string-vector-string"); auto &double_cbk = rep.callback("double-vector-double"); @@ -98,9 +108,15 @@ void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map< }); string_cbk.connect([](const std::string &s) -> std::string { - fmt::print("REPLY socket: got data: {} of {} type\r\n", s, type_name>()); + fmt::print("REPLY socket: got data: {} of {} type (FIRST callback)\r\n", s, type_name>()); // Handle data ... - return fmt::format("'Handle request: {} of type: {}'", s, type_name>()); + return fmt::format("'Handle request: {} of type: {}' (FIRST callback)", s, type_name>()); + }); + + string_cbk.connect([](const std::string &s) -> std::string { + fmt::print("REPLY socket: got data: {} of {} type (SECOND callback)\r\n", s, type_name>()); + // Handle data ... + return fmt::format("'Handle request: {} of type: {}' (SECOND callback)", s, type_name>()); }); double_cbk.connect([](const double &d) -> std::string { @@ -113,7 +129,14 @@ void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map< int main(int argc, char *argv[], char *envp[]) { using enum port_types_e; codecs_s codecs; + // Shared ZMQ context for in-process transports. zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads + auto a = AdapterBuilder(); + auto b = AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("string-vector-string"); // Make module that contains only 1 port working on PUBLISHER pattern // TODO: merge all builders to one complex "Module" builder @@ -297,7 +320,7 @@ int main(int argc, char *argv[], char *envp[]) { AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() + .withCallbackSignature() .withName("string-vector-string") .finalize(), diff --git a/src/tuple.hpp b/src/tuple.hpp index 5f0b8f6..bfb4042 100644 --- a/src/tuple.hpp +++ b/src/tuple.hpp @@ -9,6 +9,7 @@ #include #include +// Tuple utilities for compile-time iteration and transformations. namespace tp { namespace detail { diff --git a/src/utils.hpp b/src/utils.hpp index ace5292..8b93ebf 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -2,9 +2,11 @@ #include +// Trait to detect template specializations. template class Template> struct is_specialization : std::false_type {}; template