diff --git a/src/adapter_base.hpp b/src/adapter_base.hpp index 4e540ed..0633114 100644 --- a/src/adapter_base.hpp +++ b/src/adapter_base.hpp @@ -4,8 +4,6 @@ #include #include -using namespace boost::signals2; - // Base adapter interface exposing a typed callback signal and name. template class AdapterBase { using cbk_ret_type_t_ = typename CallbackRetTypeTag::type; @@ -30,7 +28,7 @@ template cl public: // Use combiner if return type is not void using signature_t = std::conditional_t, void(const InType &, CbkAargs &&...), cbk_ret_type_t_(const InType &, CbkAargs &&...)>; - using callback_type_t = std::conditional_t, signal, signal>; + using callback_type_t = std::conditional_t, boost::signals2::signal, boost::signals2::signal>; AdapterBase(const std::string &name) : mc_name_(name) {} diff --git a/src/codecs.cpp b/src/codecs.cpp new file mode 100644 index 0000000..42762c9 --- /dev/null +++ b/src/codecs.cpp @@ -0,0 +1,34 @@ +#include "codecs.hpp" + +template <> std::vector codecs_s>::encoders_s::from_int(const int32_t &i) { + auto str = std::to_string(i); // Convert to string first + return {str.begin(), str.end()}; // String to byte array +} + +template <> auto codecs_s>::encoders_s::from_string(const std::string &s) -> std::vector { return {s.begin(), s.end()}; }; + +template <> auto codecs_s>::encoders_s::from_double(const double &d) -> std::vector { + auto str = std::to_string(d); + return {str.begin(), str.end()}; +}; + +template <> auto codecs_s>::decoders_s::to_int(const std::vector &s) -> int32_t { + int32_t ret; + auto str = std::string(s.begin(), s.end()); + if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) { + return ret; + } + + throw std::runtime_error(fmt::format("Invalid convert from {} to integer type", str)); +}; + +template <> auto codecs_s>::decoders_s::to_string(const std::vector &i) -> std::string { return std::string(i.begin(), i.end()); }; +template <> auto codecs_s>::decoders_s::to_double(const std::vector &s) -> double { + double ret; + auto str = std::string(s.begin(), s.end()); + if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) { + return ret; + } + + throw std::runtime_error(fmt::format("Invalid convert from {} to double type", str)); +}; diff --git a/src/codecs.hpp b/src/codecs.hpp new file mode 100644 index 0000000..1b67cc1 --- /dev/null +++ b/src/codecs.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include +#include + +#define FMT_HEADER_ONLY +#include +#include + +// Simple encoder/decoder set for an environment payload type. +template struct codecs_s { + struct encoders_s { + static auto from_int(const int32_t &) -> EnvDataType; + static auto from_string(const std::string &s) -> EnvDataType; + static auto from_double(const double &d) -> EnvDataType; + } encoders; + + struct decoders_s { + static auto to_int(const EnvDataType &s) -> int32_t; + static auto to_string(const EnvDataType &i) -> std::string; + static auto to_double(const EnvDataType &s) -> double; + } decoders; +}; diff --git a/src/module_factory.hpp b/src/module_factory.hpp index 298cfe7..3ef7431 100644 --- a/src/module_factory.hpp +++ b/src/module_factory.hpp @@ -2,9 +2,59 @@ #include "module.hpp" #include +#include +#include +#include // Factory for building a Module from owned ports. template auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) { return std::make_unique>(argc, argv, envp, name, zmq_ctx, std::forward...>>(ports)); } + +class ModuleBuilderNamed; +class ModuleBuilderContext; +template class ModuleBuilderPorts; + +// Fluent builder to assemble a Module from name/context/ports. +class ModuleBuilder { +public: + ModuleBuilderNamed withName(std::string name); +}; + +class ModuleBuilderNamed { +public: + explicit ModuleBuilderNamed(std::string name) : name_(std::move(name)) {} + ModuleBuilderContext withContext(zmq::context_t &zmq_ctx); + +private: + std::string name_; +}; + +class ModuleBuilderContext { +public: + ModuleBuilderContext(std::string name, zmq::context_t &zmq_ctx) : name_(std::move(name)), ctx_(&zmq_ctx) {} + + template ModuleBuilderPorts withPorts(std::tuple...> &&ports) { + return ModuleBuilderPorts(name_, *ctx_, std::forward...>>(ports)); + } + +private: + std::string name_; + zmq::context_t *ctx_; +}; + +template class ModuleBuilderPorts { +public: + ModuleBuilderPorts(std::string name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) : name_(std::move(name)), ctx_(&zmq_ctx), ports_(std::move(ports)) {} + + auto finalize(int32_t argc, char **argv, char **envp) { return makeModule(argc, argv, envp, name_, *ctx_, std::move(ports_)); } + +private: + std::string name_; + zmq::context_t *ctx_; + std::tuple...> ports_; +}; + +inline ModuleBuilderNamed ModuleBuilder::withName(std::string name) { return ModuleBuilderNamed(std::move(name)); } +inline ModuleBuilderContext ModuleBuilderNamed::withContext(zmq::context_t &zmq_ctx) { return ModuleBuilderContext(std::move(name_), zmq_ctx); } diff --git a/src/port.hpp b/src/port.hpp index cefa5fc..9a910f0 100644 --- a/src/port.hpp +++ b/src/port.hpp @@ -29,9 +29,9 @@ template std::tuple_size_v>::callback_type_t::signature_type>>) && ...) && - // Unique adapters check + // All adapters are unique ([]() consteval { - return [](std::index_sequence) consteval { + return [](std::index_sequence) consteval { using tuple_t = std::tuple; return ((tp::tuple_index, tuple_t>::value == Is) && ...); }(std::index_sequence_for{}); @@ -53,14 +53,11 @@ public: return std::make_tuple([&]() { using adapter_type_t = std::remove_cvref_t(adapters))>; using adapter_input_type_t = std::remove_cvref_t>; - using adapter_callback_type_t = std::remove_cvref_t; + using adapter_callback_type_t = std::remove_cvref_t; - // fmt::print("Adding callback: name: {}, namehash: {}, typehash: {}, cbk_typehash: {}, cbk_type: {}\r\n", std::get(adapters)->name(), - // std::hash()(std::get(adapters)->name()), typeid(adapter_input_type_t).hash_code(), typeid(adapter_callback_type_t).hash_code(), - // type_name()); // Cache name, name hash, input type hash, callback type hash, and adapter pointer. - return std::make_tuple(std::get(adapters)->name(), std::hash()(std::get(adapters)->name()), typeid(adapter_input_type_t).hash_code(), - typeid(adapter_callback_type_t).hash_code(), std::forward>(std::get(adapters))); + return std::make_tuple(std::get(adapters)->name(), std::hash()(std::get(adapters)->name()), type_hash(), + type_hash(), std::forward>(std::get(adapters))); }.template operator()()...); }(std::make_index_sequence{})) { // Instantiate the port implementation with any extra args (e.g., SUB topics). @@ -78,9 +75,9 @@ public: protected: void stop__() const override { m_impl__->stop_source().request_stop(); } - void send__(const void *data, size_t size, size_t hash, const std::string &addr = "") const override { - if (!addr.empty() && this->type() != port_types_e::PUB) { - throw std::runtime_error("Addressed send is only supported for PUB ports"); + void send__(const void *data, size_t size, uint64_t hash, const std::string &addr = "") const override { + if (!addr.empty() && this->type() != port_types_e::PUB && this->type() != port_types_e::RADIO) { + throw std::runtime_error("Addressed send is only supported for PUB/RADIO ports"); } const void *adapter_ptr = nullptr; @@ -110,7 +107,7 @@ protected: } } - void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const override final { + void *get_adapter__(const std::string &name, uint64_t namehash, uint64_t typehash, uint64_t cbk_typehash) const override final { void *ret = nullptr; tp::for_each(mc_adapters_, [&](auto &a) { @@ -139,23 +136,32 @@ private: template class PortImplCallback_> { public: PortImplCallback_(const Port *port) : mc_port_(port) {} - using type_t = std::function; + using type_t = std::function; - cbk_return_type_t_ operator()(const port_data_type_t &data, size_t hash, Aargs &&...callback_args) const { + cbk_return_type_t_ operator()(const port_data_type_t &data, uint64_t hash, Aargs &&...callback_args) const { std::conditional_t, cbk_return_type_t_, std::false_type> ret{}; + bool found = false; tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) { - const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; - if (adapter_typehash == hash) { - // Decode payload and dispatch to the adapter's signal. - if constexpr (std::is_void_vcallback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) { - adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); - } else { - ret = adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); + if (!found) { + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; + if (adapter_typehash == hash) { + // Decode payload and dispatch to the adapter's signal. + if constexpr (std::is_void_vcallback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) { + adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); + } else { + ret = adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); + } + + found = true; } } }); + if (!found) { + throw std::runtime_error(fmt::format("Adapter with type hash: {} not found", hash)); + } + if constexpr (!std::is_void_v) { return ret; } @@ -166,7 +172,7 @@ private: }; mutable std::unique_ptr>> m_impl__{nullptr}; - mutable std::tuple>...> mc_adapters_; + mutable std::tuple>...> mc_adapters_; template void init_impl_(enum port_types_e pt, ImplArgs &&...args) const { using enum port_types_e; @@ -182,7 +188,8 @@ private: std::make_tuple(make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), - make_null_impl_pair.template operator()()); + make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), + make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()()); tp::for_each(impl_map, [&](const auto &p) { const auto &[type, null_pimpl] = p; diff --git a/src/port_base.hpp b/src/port_base.hpp index c9ebcc0..622ae57 100644 --- a/src/port_base.hpp +++ b/src/port_base.hpp @@ -3,8 +3,10 @@ #include "src/tuple.hpp" #include #include +#include #include #include +#include #define ZMQ_BUILD_DRAFT_API #include @@ -26,7 +28,7 @@ template class PortBase { public: AddressedPort_(const PortBase *port, const std::string &address) : mc_addr_(address), mc_port_(port) {} template const auto &operator<<(const InType &in) const { - mc_port_->send__(&in, sizeof(InType), typeid(InType).hash_code(), mc_addr_); + mc_port_->send__(&in, sizeof(InType), type_hash>(), mc_addr_); return *this; } @@ -50,7 +52,7 @@ public: template const PortBase &operator<<(const InType &in) const { // Use empty address for non-addressed sends. - send__(&in, sizeof(InType), typeid(InType).hash_code()); + send__(&in, sizeof(InType), type_hash>()); return *this; } @@ -62,14 +64,14 @@ public: protected: virtual void stop__() const = 0; - virtual void send__(const void *data, size_t size, size_t type_hash, const std::string &addr = "") const = 0; - virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const = 0; + virtual void send__(const void *data, size_t size, uint64_t type_hash, const std::string &addr = "") const = 0; + virtual void *get_adapter__(const std::string &name, uint64_t namehash, uint64_t typehash, uint64_t cbk_typehash) const = 0; private: const enum port_types_e mc_type_; const std::string mc_name_; const std::map mc_endpoints_; - const size_t mc_name_hash_; + const uint64_t mc_name_hash_; // Type-safe callback lookup helper. template class GetCallbackHelper_; @@ -79,11 +81,13 @@ private: template auto &operator()(const std::string &name) const { using ret_type_t = std::remove_cvref_t>>; - using arg_type_t = std::tuple_element_t<0, args_t>>; + using arg_type_t = std::remove_cvref_t>>>; + using cbk_type_t = typename AdapterBase, Aargs...>::callback_type_t::signature_type; + return (*static_cast, Aargs...> *>( - mc_port_->get_adapter__(name, std::hash()(name), typeid(arg_type_t).hash_code(), - typeid(typename AdapterBase, Aargs...>::callback_type_t).hash_code()))) + mc_port_->get_adapter__(name, std::hash()(name), type_hash(), type_hash>()))) .callback(); + // typeid(typename AdapterBase, Aargs...>::callback_type_t).hash_code() } private: diff --git a/src/port_dealer_impl.hpp b/src/port_dealer_impl.hpp index f89c88f..e0b2ce8 100644 --- a/src/port_dealer_impl.hpp +++ b/src/port_dealer_impl.hpp @@ -2,6 +2,8 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include #define FMT_HEADER_ONLY #include diff --git a/src/port_dish_impl.hpp b/src/port_dish_impl.hpp new file mode 100644 index 0000000..ab2d096 --- /dev/null +++ b/src/port_dish_impl.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" +#include +#include +#include + +#define FMT_HEADER_ONLY +#include +#include + +// Dish transport: connects, joins groups, and dispatches payloads with group name. +template class PortImpl : public PortImplBase { +public: + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, const std::list &groups, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)), mc_groups_(groups) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::dish); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } + + // Join each group. + for (const auto &group : mc_groups_) { + this->m_sock__.join(group.c_str()); + } + + // Start async listener loop. + listen__(this->stop_source().get_token()); + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on DISH pattern socket"); }; + +private: + const std::list mc_groups_; + + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + std::string group = std::string(msg.group()); + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + // Dispatch each decoded item. + for (const auto &data : batch) { + this->mc_cbk__(data, typehash, group); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } +}; diff --git a/src/port_factory.hpp b/src/port_factory.hpp index 0662250..ab1b850 100644 --- a/src/port_factory.hpp +++ b/src/port_factory.hpp @@ -1,6 +1,10 @@ #pragma once #include "port.hpp" +#include +#include +#include +#include // Factory for building a Port from adapters and extra args. template @@ -9,3 +13,107 @@ auto makePort(enum port_types_e pt, const std::string &name, const std::map, std::tuple>>(pt, name, endpoints, zmq_ctx, std::forward...>>(adapters), std::forward>(args)); } + +class PortBuilderNamed; +class PortBuilderEndpoints; +class PortBuilderContext; +class PortBuilderWithContext; + +template class PortBuilderAdapters; +template class PortBuilderArgs; + +class PortBuilderNamed { +public: + explicit PortBuilderNamed(port_types_e pt) : m_pt_(pt) {} + PortBuilderEndpoints withName(const std::string &name); + +private: + port_types_e m_pt_; +}; + +// Fluent builder to assemble a Port from type/name/endpoints/context/adapters/args. +class PortBuilder { +public: + inline PortBuilderNamed withType(port_types_e pt) { return PortBuilderNamed(pt); } +}; + +class PortBuilderWithContext { +public: + PortBuilderWithContext(port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx) + : m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx) {} + + template inline PortBuilderAdapters> withAdapters(std::tuple...> &&adapters) { + return PortBuilderAdapters>(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(adapters)); + } + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; + zmq::context_t *m_ctx_; +}; + +class PortBuilderContext { +public: + PortBuilderContext(port_types_e pt, const std::string &name, const std::map &endpoints) : m_pt_(pt), m_name_(name), m_endpoints_(endpoints) {} + PortBuilderWithContext withContext(zmq::context_t &zmq_ctx); + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; +}; + +class PortBuilderEndpoints { +public: + PortBuilderEndpoints(port_types_e pt, const std::string &name) : m_pt_(pt), m_name_(name) {} + PortBuilderContext withEndpoints(const std::map &endpoints); + +private: + port_types_e m_pt_; + std::string m_name_; +}; + +template class PortBuilderAdapters> { +public: + PortBuilderAdapters(port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters) + : m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx), m_adapters_(&adapters) {} + + template PortBuilderArgs, std::tuple> withArgs(std::tuple &&args) { + return PortBuilderArgs, std::tuple>(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(*m_adapters_), + std::forward>(args)); + } + + auto finalize() { return makePort(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(*m_adapters_)); } + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; + zmq::context_t *m_ctx_; + std::tuple...> *m_adapters_; +}; + +template class PortBuilderArgs, std::tuple> { +public: + PortBuilderArgs(port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters, std::tuple &&args) + : m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx), m_adapters_(&adapters), m_args_(std::forward>(args)) {} + + inline auto finalize() { + return makePort(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(*m_adapters_), std::forward>(m_args_)); + } + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; + zmq::context_t *m_ctx_; + std::tuple...> *m_adapters_; + std::tuple m_args_; +}; + +inline PortBuilderEndpoints PortBuilderNamed::withName(const std::string &name) { return PortBuilderEndpoints(m_pt_, name); } +inline PortBuilderWithContext PortBuilderContext::withContext(zmq::context_t &zmq_ctx) { return PortBuilderWithContext(m_pt_, m_name_, m_endpoints_, zmq_ctx); } +inline PortBuilderContext PortBuilderEndpoints::withEndpoints(const std::map &endpoints) { return PortBuilderContext(m_pt_, m_name_, endpoints); } diff --git a/src/port_impl_base.hpp b/src/port_impl_base.hpp index 7fc9f12..96d88ab 100644 --- a/src/port_impl_base.hpp +++ b/src/port_impl_base.hpp @@ -16,7 +16,7 @@ template class PortImplBase { public: // Wire payload: type hash + batch of encoded values. struct port_payload_s { - size_t typehash; + uint64_t typehash; std::vector data; MSGPACK_DEFINE(typehash, data); }; @@ -27,14 +27,7 @@ public: virtual void send(const msgpack::sbuffer &data, const std::string &addr = "") const = 0; void close() { - // Close socket first to break any pending waits. - try { - m_sock__.close(); - } catch (...) { - // Ignore close errors during shutdown. - } - - // Join listener thread if one was started. + // Join listener thread before closing the socket to avoid polling on a closed fd. if (m_listener_thread__.valid()) { try { m_listener_thread__.get(); @@ -42,6 +35,12 @@ public: // Ignore listener exceptions during shutdown. } } + + try { + m_sock__.close(); + } catch (...) { + // Ignore close errors during shutdown. + } }; inline auto &stop_source() { return m_ss_; } diff --git a/src/port_pair_impl.hpp b/src/port_pair_impl.hpp index bb84405..4831ae8 100644 --- a/src/port_pair_impl.hpp +++ b/src/port_pair_impl.hpp @@ -2,14 +2,87 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include +#include #define FMT_HEADER_ONLY #include #include -// Pair transport placeholder; currently no full send/recv support. -template class PortImpl : public PortImplBase { +// Pair transport (client): connects and listens for incoming payloads. +template class PortImpl : public PortImplBase { public: + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } + + listen__(this->stop_source().get_token()); + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } +}; + +// Pair transport (server): binds and listens for incoming payloads. +template class PortImpl : public PortImplBase { +public: + using base_t = PortImplBase; PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair); @@ -17,15 +90,60 @@ public: for (const auto &[_, ep] : this->mc_endpoints__) { this->m_sock__.bind(ep); } + + listen__(this->stop_source().get_token()); } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; private: void listen__(std::stop_token st) const override { - while (!st.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100u)); - } + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); } }; diff --git a/src/port_pull_impl.hpp b/src/port_pull_impl.hpp index d925ff3..b9d8d04 100644 --- a/src/port_pull_impl.hpp +++ b/src/port_pull_impl.hpp @@ -3,6 +3,7 @@ #include "port_impl_base.hpp" #include "port_types.hpp" #include +#include #include #define FMT_HEADER_ONLY diff --git a/src/port_radio_impl.hpp b/src/port_radio_impl.hpp new file mode 100644 index 0000000..950d6c3 --- /dev/null +++ b/src/port_radio_impl.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" +#include + +#define FMT_HEADER_ONLY +#include +#include + +// Radio transport: binds endpoints and sends group-tagged payloads. +template class PortImpl final : public PortImplBase { +public: + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::radio); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + zmq::message_t msg(data.data(), data.size()); + if (!addr.empty()) { + msg.set_group(addr.c_str()); + } + + this->m_sock__.send(msg, zmq::send_flags::none); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on RADIO pattern socket"); } +}; diff --git a/src/port_rep_impl.hpp b/src/port_rep_impl.hpp index 026dc60..5512a1c 100644 --- a/src/port_rep_impl.hpp +++ b/src/port_rep_impl.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -12,6 +13,7 @@ #include "port_impl_base.hpp" #include "port_types.hpp" #include "tuple.hpp" +#include "utils.hpp" // Reply transport: listens for requests and sends reply payloads. template class PortImpl : public PortImplBase { @@ -24,6 +26,8 @@ public: this->m_sock__.bind(ep); } + this->m_sock__.set(zmq::sockopt::sndtimeo, static_cast(base_t::sc_recv_timeout_ms__)); + // Start async listener loop. listen__(this->stop_source().get_token()); } @@ -42,7 +46,7 @@ private: while (!st.stop_requested()) { std::vector> events(1u); - size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + uint64_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); for (int32_t i = 0; i < num_events; ++i) { zmq::message_t msg; @@ -59,13 +63,13 @@ private: if (batch.size()) { auto reply_data = this->mc_cbk__(batch.front(), typehash); typename base_t::port_payload_s reply_payload = { - .typehash = typeid(typename decltype(reply_data)::value_type).hash_code(), + .typehash = type_hash>(), }; for (const auto &d : reply_data) { using adapter_in_type_t = std::remove_cvref_t; - size_t typehash = typeid(adapter_in_type_t).hash_code(); + uint64_t typehash = type_hash(); // Find matching encoder by type to build reply payload. tp::for_each(this->mc_port__->adapters(), [&](const auto &e) { diff --git a/src/port_req_impl.hpp b/src/port_req_impl.hpp index 9ea6c8a..7b06de4 100644 --- a/src/port_req_impl.hpp +++ b/src/port_req_impl.hpp @@ -2,6 +2,8 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include #include #define FMT_HEADER_ONLY @@ -21,7 +23,7 @@ public: } // Avoid blocking forever if no REP is available. - this->m_sock__.set(zmq::sockopt::rcvtimeo, static_cast(base_t::sc_recv_timeout_ms__)); + this->m_sock__.set(zmq::sockopt::rcvtimeo, static_cast(base_t::sc_recv_timeout_ms__)); } void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { @@ -44,6 +46,7 @@ public: }); } catch (const zmq::error_t &err) { fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + throw; } }; diff --git a/src/port_router_impl.hpp b/src/port_router_impl.hpp index 98f5c0e..aa4293f 100644 --- a/src/port_router_impl.hpp +++ b/src/port_router_impl.hpp @@ -2,6 +2,8 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include #define FMT_HEADER_ONLY #include diff --git a/src/port_subscriber_impl.hpp b/src/port_subscriber_impl.hpp index 2322cfc..2b8824d 100644 --- a/src/port_subscriber_impl.hpp +++ b/src/port_subscriber_impl.hpp @@ -3,6 +3,7 @@ #include "port_impl_base.hpp" #include "port_types.hpp" #include +#include #include #include #include diff --git a/src/port_types.hpp b/src/port_types.hpp index 2b4f506..2fb6cf2 100644 --- a/src/port_types.hpp +++ b/src/port_types.hpp @@ -11,7 +11,8 @@ enum class port_types_e : uint32_t { REP, PUSH, PULL, - PAIR, + PAIR_CLIENT, + PAIR_SERVER, RADIO, DISH, @@ -31,7 +32,8 @@ template<> struct endpoints_s : endpoints_base_s {}; template<> struct endpoints_s : endpoints_base_s {}; template<> struct endpoints_s : endpoints_base_s {}; template<> struct endpoints_s : endpoints_base_s {}; -template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; template<> struct endpoints_s : endpoints_base_s {}; template<> struct endpoints_s : endpoints_base_s {}; template<> struct endpoints_s : endpoints_base_s {}; diff --git a/src/ports.hpp b/src/ports.hpp index 05baa0a..573ff81 100644 --- a/src/ports.hpp +++ b/src/ports.hpp @@ -10,4 +10,5 @@ #include "port_router_impl.hpp" #include "port_dealer_impl.hpp" #include "port_pair_impl.hpp" - +#include "port_radio_impl.hpp" +#include "port_dish_impl.hpp" diff --git a/src/templates.cpp b/src/templates.cpp index 9bcbc43..e070aa0 100644 --- a/src/templates.cpp +++ b/src/templates.cpp @@ -2,9 +2,11 @@ #include #include #include +#include #include #include #include +#include #define FMT_HEADER_ONLY #include @@ -126,212 +128,449 @@ void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map< }); } +void pair_server_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &pair = *ports.at("pair_server_port"); + + auto &int_cbk = pair.callback("int-vector-int"); + auto &string_cbk = pair.callback("string-vector-string"); + auto &double_cbk = pair.callback("double-vector-double"); + + int_cbk.connect([](const int32_t &i) { fmt::print("PAIR server: got data: {} of {} type\r\n", i, type_name>()); }); + string_cbk.connect([](const std::string &s) { fmt::print("PAIR server: got data: {} of {} type\r\n", s, type_name>()); }); + double_cbk.connect([](const double &d) { fmt::print("PAIR server: got data: {} of {} type\r\n", d, type_name>()); }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + pair << 4 << 5 << double{6.f} << std::string("server->client"); +} + +void pair_client_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &pair = *ports.at("pair_client_port"); + + auto &int_cbk = pair.callback("int-vector-int"); + auto &string_cbk = pair.callback("string-vector-string"); + auto &double_cbk = pair.callback("double-vector-double"); + + int_cbk.connect([](const int32_t &i) { fmt::print("PAIR client: got data: {} of {} type\r\n", i, type_name>()); }); + string_cbk.connect([](const std::string &s) { fmt::print("PAIR client: got data: {} of {} type\r\n", s, type_name>()); }); + double_cbk.connect([](const double &d) { fmt::print("PAIR client: got data: {} of {} type\r\n", d, type_name>()); }); + + pair << 1 << 2 << double{3.f} << std::string("test"); +} + +void dish_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &dish = *ports.at("dish_port"); + + auto &int_cbk = dish.callback("int-vector-int"); + auto &string_cbk = dish.callback("string-vector-string"); + auto &double_cbk = dish.callback("double-vector-double"); + + int_cbk.connect([](const int32_t &i, const std::string &group) { + fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", i, type_name>(), group); + }); + + string_cbk.connect([](const std::string &s, const std::string &group) { + fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", s, type_name>(), group); + }); + + double_cbk.connect([](const double &d, const std::string &group) { + fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", d, type_name>(), group); + }); +} + +void radio_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &radio = *ports.at("radio_port"); + + radio["grp0"] << 1 << 2 << double{3.f}; + radio["grp1"] << std::string("test"); +} + int main(int argc, char *argv[], char *envp[]) { using enum port_types_e; codecs_s codecs; // Shared ZMQ context for in-process transports. zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads - auto a = AdapterBuilder(); - auto b = AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("string-vector-string"); // Make module that contains only 1 port working on PUBLISHER pattern - // TODO: merge all builders to one complex "Module" builder - auto publisher_module = makeModule(argc, argv, envp, "publisher_module", zmq_ctx, - std::tuple{ - makePort(PUB, "publisher_port", - { - {"test", "inproc://PUB-SUB"}, - } /* This port will publish messages here */, - zmq_ctx, - std::tuple{ - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("int-vector-int") - .finalize(), + auto publisher_module = ModuleBuilder() + .withName("publisher_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PUB) + .withName("publisher_port") + .withEndpoints({ + {"test", "inproc://PUB-SUB"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_string) - .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() - .withName("string-vector-string") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_double) - .decodeDataBy(&codecs.decoders.to_double) - .withCallbackSignature() - .withName("double-vector-double") - .finalize(), - }), - }); + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); // Make module that contains only 1 port working on SUBSCRIBER pattern - auto subscriber_module = makeModule(argc, argv, envp, "subscriber_module", zmq_ctx, - std::tuple{ - makePort(SUB, "subscriber_port", - { - {"test", "inproc://PUB-SUB"}, - } /* this port will read data here */, - zmq_ctx, - std::tuple{ - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("int-vector-int") - .finalize(), + auto subscriber_module = ModuleBuilder() + .withName("subscriber_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(SUB) + .withName("subscriber_port") + .withEndpoints({ + {"test", "inproc://PUB-SUB"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_string) - .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() - .withName("string-vector-string") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_double) - .decodeDataBy(&codecs.decoders.to_double) - .withCallbackSignature() - .withName("double-vector-double") - .finalize(), - }, + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .withArgs(std::tuple{ + std::list{"topic0", "topic1", "topic2", "topic3"}, + }) + .finalize(), + }) + .finalize(argc, argv, envp); - // This type of port requires arguments - topics to subscribe - std::tuple{ - // Topics to subscribe - std::list{"topic0", "topic1", "topic2", "topic3"}, - }), - }); + auto pusher_module = ModuleBuilder() + .withName("pusher_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PUSH) + .withName("push_port") + .withEndpoints({ + {"test", "inproc://PUSH-PULL"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - auto pusher_module = makeModule(argc, argv, envp, "pusher_module", zmq_ctx, - std::tuple{ - makePort(PUSH, "push_port", - { - {"test", "inproc://PUSH-PULL"}, - } /* This port will publish messages here */, - zmq_ctx, - std::tuple{ - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("int-vector-int") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_string) - .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() - .withName("string-vector-string") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_double) - .decodeDataBy(&codecs.decoders.to_double) - .withCallbackSignature() - .withName("double-vector-double") - .finalize(), - }), - }); + auto puller_module = ModuleBuilder() + .withName("puller_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PULL) + .withName("pull_port") + .withEndpoints({ + {"test", "inproc://PUSH-PULL"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - auto puller_module = makeModule(argc, argv, envp, "puller_module", zmq_ctx, - std::tuple{ - makePort(PULL, "pull_port", - { - {"test", "inproc://PUSH-PULL"}, - } /* This port will publish messages here */, - zmq_ctx, - std::tuple{ - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("int-vector-int") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_string) - .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() - .withName("string-vector-string") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_double) - .decodeDataBy(&codecs.decoders.to_double) - .withCallbackSignature() - .withName("double-vector-double") - .finalize(), - }), - }); + auto req_module = ModuleBuilder() + .withName("req_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(REQ) + .withName("req_port") + .withEndpoints({ + {"test", "inproc://REQ-REP"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - auto req_module = makeModule(argc, argv, envp, "req_module", zmq_ctx, - std::tuple{ - makePort(REQ, "req_port", - { - {"test", "inproc://REQ-REP"}, - } /* This port will publish messages here */, - zmq_ctx, - std::tuple{ - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("int-vector-int") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_string) - .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() - .withName("string-vector-string") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_double) - .decodeDataBy(&codecs.decoders.to_double) - .withCallbackSignature() - .withName("double-vector-double") - .finalize(), - }), - }); + auto rep_module = ModuleBuilder() + .withName("rep_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(REP) + .withName("rep_port") + .withEndpoints({ + {"test", "inproc://REQ-REP"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - auto rep_module = makeModule(argc, argv, envp, "rep_module", zmq_ctx, - std::tuple{ - makePort(REP, "rep_port", - { - {"test", "inproc://REQ-REP"}, - } /* This port will publish messages here */, - zmq_ctx, - std::tuple{ - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_int) - .decodeDataBy(&codecs.decoders.to_int) - .withCallbackSignature() - .withName("int-vector-int") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_string) - .decodeDataBy(&codecs.decoders.to_string) - .withCallbackSignature() - .withName("string-vector-string") - .finalize(), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - AdapterBuilder() - .encodeDataBy(&codecs.encoders.from_double) - .decodeDataBy(&codecs.decoders.to_double) - .withCallbackSignature() - .withName("double-vector-double") - .finalize(), - }), - }); + auto pair_server_module = ModuleBuilder() + .withName("pair_server_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PAIR_SERVER) + .withName("pair_server_port") + .withEndpoints({ + {"test", "inproc://PAIR"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto pair_client_module = ModuleBuilder() + .withName("pair_client_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PAIR_CLIENT) + .withName("pair_client_port") + .withEndpoints({ + {"test", "inproc://PAIR"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto dish_module = ModuleBuilder() + .withName("dish_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(DISH) + .withName("dish_port") + .withEndpoints({ + {"test", "inproc://RADIO-DISH"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .withArgs(std::tuple{ + std::list{"grp0", "grp1"}, + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto radio_module = ModuleBuilder() + .withName("radio_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(RADIO) + .withName("radio_port") + .withEndpoints({ + {"test", "inproc://RADIO-DISH"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); fmt::print("\r\nPUB-SUB test:\r\n"); subscriber_module->run(subscriber_entry); // Subscribe and get data @@ -347,6 +586,16 @@ int main(int argc, char *argv[], char *envp[]) { rep_module->run(rep_entry); req_module->run(req_entry); std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + fmt::print("\r\nPAIR test:\r\n"); + pair_server_module->run(pair_server_entry); + pair_client_module->run(pair_client_entry); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + fmt::print("\r\nRADIO-DISH test:\r\n"); + dish_module->run(dish_entry); + radio_module->run(radio_entry); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); fmt::print("DONE!\r\n"); return 0; diff --git a/src/tuple.hpp b/src/tuple.hpp index bfb4042..060ef9a 100644 --- a/src/tuple.hpp +++ b/src/tuple.hpp @@ -24,7 +24,7 @@ template constexpr Func for_ea } template auto transform_impl(std::tuple const &inputs, Function function, std::index_sequence is) { - return std::tuple...>{function(std::get(inputs))...}; + return std::tuple...>{function(std::get(inputs))...}; } template auto subtuple(const std::tuple &t, std::index_sequence) { return std::make_tuple(std::get(t)...); } diff --git a/src/utils.hpp b/src/utils.hpp index 8b93ebf..9995e19 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include // Trait to detect template specializations. @@ -25,6 +26,19 @@ template constexpr std::string_view type_name() { #endif } +// Compile-time FNV-1a hash for stable type IDs across builds. +constexpr uint64_t fnv1a_64(std::string_view sv) { + uint64_t hash = 14695981039346656037ull; + for (const char &c : sv) { + hash ^= static_cast(c); + hash *= 1099511628211ull; + } + + return hash; +} + +template constexpr uint64_t type_hash() { return fnv1a_64(type_name()); } + // Type tag wrapper used to pass types through templates. template struct tag_s { using type = T;