diff --git a/.clangd b/.clangd index b4dd5e5..e9d1d6b 100644 --- a/.clangd +++ b/.clangd @@ -1,2 +1,2 @@ CompileFlags: - Add: ['-std=gnu++23'] + Add: ['-std=gnu++23', -I/opt/cling/include] diff --git a/CMakeLists.txt b/CMakeLists.txt index aad2ac1..bcb0d19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,6 +19,14 @@ FetchContent_Declare( ) FetchContent_MakeAvailable(inja) +# FetchContent_Declare( +# json +# GIT_REPOSITORY https://github.com/nlohmann/json +# GIT_PROGRESS TRUE +# GIT_TAG main +# ) + +# FetchContent_MakeAvailable(json) FetchContent_Declare( libzmq diff --git a/src/adapter.hpp b/src/adapter.hpp index 5a86e17..ef1aa6e 100644 --- a/src/adapter.hpp +++ b/src/adapter.hpp @@ -1,5 +1,6 @@ #pragma once +#include "src/tuple.hpp" #include #include #include @@ -16,23 +17,26 @@ using namespace boost::callable_traits; -template - requires( - // Encoder return type is same as Decoder argument type, encoder and decoder have only 1 arg - std::tuple_size_v> == 1 && std::is_same_v, std::remove_cvref_t>>> && - std::tuple_size_v> == 1 && std::is_same_v, std::remove_cvref_t>>>) +// Adapter that pairs an encoder/decoder with a callback signal. +template class Adapter; +template + requires(std::tuple_size_v> == 1 && // Decoder has only 1 argument + std::is_same_v, std::remove_cvref_t>>> && // Encoder return type is same with decoder argument + std::tuple_size_v> == 1 && // Encoder accepts only one argument + std::is_same_v, std::remove_cvref_t>>> // Decoder return type is same with encoder argument + ) -class Adapter : public AdapterBase>> { +class Adapter> : public AdapterBase>, CallbackRetTypeTag, CbkAargs...> { public: using encoder_type_t = Encoder; using decoder_type_t = Decoder; - using adapter_type_t = Adapter; - using callback_arg_type_t = std::tuple_element_t<0, boost::callable_traits::args_t>; - using base_t = AdapterBase; - using callback_type_t = base_t::callback_type_t; - Adapter(const std::string &name, std::pair &&fns) - : AdapterBase(name), mc_enc_(std::forward(fns.first)), mc_dec_(std::forward(fns.second)) {} + using callback_arg_type_t = std::tuple_element_t<0, args_t>; + using base_t = AdapterBase; + + Adapter(const std::string &name, std::tuple &&fns) + : AdapterBase(name), mc_enc_(std::forward(std::get<0u>(fns))), + mc_dec_(std::forward(std::get<1u>(fns))) {} inline const auto &encoder() const { return mc_enc_; } inline const auto &decoder() const { return mc_dec_; } @@ -42,5 +46,5 @@ public: private: const Encoder mc_enc_; const Decoder mc_dec_; - mutable callback_type_t m_callback_; + mutable typename base_t::callback_type_t m_callback_; }; diff --git a/src/adapter_base.hpp b/src/adapter_base.hpp index 57b02e4..0633114 100644 --- a/src/adapter_base.hpp +++ b/src/adapter_base.hpp @@ -1,18 +1,40 @@ #pragma once #include +#include +#include + +// Base adapter interface exposing a typed callback signal and name. +template class AdapterBase { + using cbk_ret_type_t_ = typename CallbackRetTypeTag::type; + + // If return type is not void we combine all callback invocation results. + class CollectAllCombiner_ { + public: + using result_type = std::conditional_t, std::vector, std::false_type>; + + template result_type operator()(InputIterator first, InputIterator last) const { + result_type results; + if constexpr (!std::is_same_v) { + for (; first != last; ++first) { + results.push_back(*first); + } + } + + return results; + } + }; -template class AdapterBase { public: - using callback_type_t = boost::signals2::signal; + // Use combiner if return type is not void + using signature_t = std::conditional_t, void(const InType &, CbkAargs &&...), cbk_ret_type_t_(const InType &, CbkAargs &&...)>; + using callback_type_t = std::conditional_t, boost::signals2::signal, boost::signals2::signal>; AdapterBase(const std::string &name) : mc_name_(name) {} virtual callback_type_t &callback() const = 0; inline const auto &name() const { return mc_name_; } -protected: private: const std::string mc_name_; }; - diff --git a/src/adapter_factory.hpp b/src/adapter_factory.hpp index f78e49f..74f4b19 100644 --- a/src/adapter_factory.hpp +++ b/src/adapter_factory.hpp @@ -1,7 +1,74 @@ #pragma once -#include "adapter.hpp" +#include "utils.hpp" +#include +#include +#include -template auto makeAdapter(const std::string &name, std::pair &&fns) { - return std::make_unique>(name, std::forward>(fns)); -} +#include "adapter.hpp" +#include "src/tuple.hpp" +using namespace boost::callable_traits; + +template class AdapterBuilder; +template class AdapterBuilderNamed; + +// Fluent builder to assemble an Adapter from encoder/decoder/signature/name. +template <> class AdapterBuilder<> { +public: + template AdapterBuilder encodeDataBy(Encoder &&encoder) { return AdapterBuilder(std::forward(encoder)); } +}; + +template class AdapterBuilder { +public: + AdapterBuilder(Encoder &&encoder) : m_encoder_(std::forward(encoder)) {} + template AdapterBuilder decodeDataBy(Decoder &&decoder) { + // Advance builder with decoder selected. + return AdapterBuilder(std::forward(m_encoder_), std::forward(decoder)); + } + +private: + std::decay_t m_encoder_; +}; + +template class AdapterBuilder { +public: + AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward(encoder)), m_decoder_(std::forward(decoder)) {} + template AdapterBuilder withCallbackSignature() { + // Advance builder with callback signature selected. + return AdapterBuilder(std::forward(m_encoder_), std::forward(m_decoder_)); + } + +private: + std::decay_t m_encoder_; + std::decay_t m_decoder_; +}; + +template class AdapterBuilder { +public: + AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward(encoder)), m_decoder_(std::forward(decoder)) {} + AdapterBuilderNamed withName(const std::string &name) { + // Advance builder with adapter name selected. + return AdapterBuilderNamed(std::forward(m_encoder_), std::forward(m_decoder_), name); + } + +private: + std::decay_t m_encoder_; + std::decay_t m_decoder_; +}; + +template class AdapterBuilderNamed { +public: + AdapterBuilderNamed(Encoder &&encoder, Decoder &&decoder, const std::string &name) + : m_encoder_(std::forward(encoder)), m_decoder_(std::forward(decoder)), mc_name_(name) {} + + auto finalize() { + // Produce a concrete Adapter instance. + return std::make_unique>, typename tp::tuple_tail>::type>>( + mc_name_, std::make_tuple(std::forward(m_encoder_), std::forward(m_decoder_), tag_s>{})); + } + +private: + const std::string mc_name_; + std::decay_t m_encoder_; + std::decay_t m_decoder_; +}; diff --git a/src/codecs.cpp b/src/codecs.cpp new file mode 100644 index 0000000..42762c9 --- /dev/null +++ b/src/codecs.cpp @@ -0,0 +1,34 @@ +#include "codecs.hpp" + +template <> std::vector codecs_s>::encoders_s::from_int(const int32_t &i) { + auto str = std::to_string(i); // Convert to string first + return {str.begin(), str.end()}; // String to byte array +} + +template <> auto codecs_s>::encoders_s::from_string(const std::string &s) -> std::vector { return {s.begin(), s.end()}; }; + +template <> auto codecs_s>::encoders_s::from_double(const double &d) -> std::vector { + auto str = std::to_string(d); + return {str.begin(), str.end()}; +}; + +template <> auto codecs_s>::decoders_s::to_int(const std::vector &s) -> int32_t { + int32_t ret; + auto str = std::string(s.begin(), s.end()); + if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) { + return ret; + } + + throw std::runtime_error(fmt::format("Invalid convert from {} to integer type", str)); +}; + +template <> auto codecs_s>::decoders_s::to_string(const std::vector &i) -> std::string { return std::string(i.begin(), i.end()); }; +template <> auto codecs_s>::decoders_s::to_double(const std::vector &s) -> double { + double ret; + auto str = std::string(s.begin(), s.end()); + if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) { + return ret; + } + + throw std::runtime_error(fmt::format("Invalid convert from {} to double type", str)); +}; diff --git a/src/codecs.hpp b/src/codecs.hpp new file mode 100644 index 0000000..1b67cc1 --- /dev/null +++ b/src/codecs.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include +#include + +#define FMT_HEADER_ONLY +#include +#include + +// Simple encoder/decoder set for an environment payload type. +template struct codecs_s { + struct encoders_s { + static auto from_int(const int32_t &) -> EnvDataType; + static auto from_string(const std::string &s) -> EnvDataType; + static auto from_double(const double &d) -> EnvDataType; + } encoders; + + struct decoders_s { + static auto to_int(const EnvDataType &s) -> int32_t; + static auto to_string(const EnvDataType &i) -> std::string; + static auto to_double(const EnvDataType &s) -> double; + } decoders; +}; diff --git a/src/module.hpp b/src/module.hpp index c20823b..8bc0408 100644 --- a/src/module.hpp +++ b/src/module.hpp @@ -4,16 +4,17 @@ #include #include #include -#include #include "port_base.hpp" #include "tuple.hpp" +// Module type hint (currently unused). enum class module_type_e : uint32_t { STANDALONE, INCOMPOSITION, }; +// Base interface for a named module; exposes ports and run entrypoint. template class ModuleBase { public: ModuleBase(int32_t argc, char **argv, char **envp, const std::string &name) @@ -40,6 +41,7 @@ private: } mc_cli_args_; }; +// Concrete module that owns typed ports and dispatches entry with a port map. template class Module : public ModuleBase>> { public: using port_data_type_t = std::tuple_element_t<0, std::tuple>; diff --git a/src/module_factory.hpp b/src/module_factory.hpp index a783d0e..3ef7431 100644 --- a/src/module_factory.hpp +++ b/src/module_factory.hpp @@ -2,8 +2,59 @@ #include "module.hpp" #include +#include +#include +#include +// Factory for building a Module from owned ports. template auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) { return std::make_unique>(argc, argv, envp, name, zmq_ctx, std::forward...>>(ports)); } + +class ModuleBuilderNamed; +class ModuleBuilderContext; +template class ModuleBuilderPorts; + +// Fluent builder to assemble a Module from name/context/ports. +class ModuleBuilder { +public: + ModuleBuilderNamed withName(std::string name); +}; + +class ModuleBuilderNamed { +public: + explicit ModuleBuilderNamed(std::string name) : name_(std::move(name)) {} + ModuleBuilderContext withContext(zmq::context_t &zmq_ctx); + +private: + std::string name_; +}; + +class ModuleBuilderContext { +public: + ModuleBuilderContext(std::string name, zmq::context_t &zmq_ctx) : name_(std::move(name)), ctx_(&zmq_ctx) {} + + template ModuleBuilderPorts withPorts(std::tuple...> &&ports) { + return ModuleBuilderPorts(name_, *ctx_, std::forward...>>(ports)); + } + +private: + std::string name_; + zmq::context_t *ctx_; +}; + +template class ModuleBuilderPorts { +public: + ModuleBuilderPorts(std::string name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) : name_(std::move(name)), ctx_(&zmq_ctx), ports_(std::move(ports)) {} + + auto finalize(int32_t argc, char **argv, char **envp) { return makeModule(argc, argv, envp, name_, *ctx_, std::move(ports_)); } + +private: + std::string name_; + zmq::context_t *ctx_; + std::tuple...> ports_; +}; + +inline ModuleBuilderNamed ModuleBuilder::withName(std::string name) { return ModuleBuilderNamed(std::move(name)); } +inline ModuleBuilderContext ModuleBuilderNamed::withContext(zmq::context_t &zmq_ctx) { return ModuleBuilderContext(std::move(name_), zmq_ctx); } diff --git a/src/port.hpp b/src/port.hpp index 2ace523..9a910f0 100644 --- a/src/port.hpp +++ b/src/port.hpp @@ -5,60 +5,100 @@ #include #include #include +#include +#include +#include #include "port_base.hpp" #include "ports.hpp" +#include "src/port_types.hpp" #include "tuple.hpp" using namespace boost::callable_traits; +// Typed port implementation owning adapters and a ZMQ transport. template class Port; template + requires( + // Check return types + (std::is_same_v>::base_t::callback_type_t::result_type> && + ...) && + + // Check number of args + ((std::tuple_size_v> == + std::tuple_size_v>::callback_type_t::signature_type>>) && + ...) && + + // All adapters are unique + ([]() consteval { + return [](std::index_sequence) consteval { + using tuple_t = std::tuple; + return ((tp::tuple_index, tuple_t>::value == Is) && ...); + }(std::index_sequence_for{}); + }.template operator()>...>())) + class Port, std::tuple> : public PortBase...>>> { public: + static constexpr auto callback_args_num = std::tuple_size_v>::callback_type_t::signature_type>>; + using this_t = Port, std::tuple>; using port_data_type_t = std::tuple_element_t<0, std::tuple...>>; + using callback_aargs_t = tp::tuple_tail>::callback_type_t::signature_type>>::type; - Port(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, - std::tuple &&args) - : PortBase(pt, name, endpoint, zmq_ctx), + Port(enum port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters, std::tuple &&args) + : PortBase(pt, name, endpoints, zmq_ctx), // Init adapters mc_adapters_([&](std::index_sequence) { return std::make_tuple([&]() { using adapter_type_t = std::remove_cvref_t(adapters))>; - using adapter_input_type_t = return_type_t; + using adapter_input_type_t = std::remove_cvref_t>; + using adapter_callback_type_t = std::remove_cvref_t; - return std::make_tuple(std::get(adapters)->name(), std::hash()(std::get(adapters)->name()), - typeid(std::remove_cvref_t).hash_code(), std::forward>(std::get(adapters))); + // Cache name, name hash, input type hash, callback type hash, and adapter pointer. + return std::make_tuple(std::get(adapters)->name(), std::hash()(std::get(adapters)->name()), type_hash(), + type_hash(), std::forward>(std::get(adapters))); }.template operator()()...); }(std::make_index_sequence{})) { - std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward(args)...); }, std::forward(args)); + // Instantiate the port implementation with any extra args (e.g., SUB topics). + std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoints), std::forward(args)...); }, std::forward(args)); } + ~Port() override { stop(); } + void stop() const override { - this->stop__(); - mc_impl_->close(); + stop__(); + m_impl__->close(); } + inline const auto &adapters() const { return mc_adapters_; } + protected: - void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override { + void stop__() const override { m_impl__->stop_source().request_stop(); } + void send__(const void *data, size_t size, uint64_t hash, const std::string &addr = "") const override { + if (!addr.empty() && this->type() != port_types_e::PUB && this->type() != port_types_e::RADIO) { + throw std::runtime_error("Addressed send is only supported for PUB/RADIO ports"); + } + const void *adapter_ptr = nullptr; tp::for_each(mc_adapters_, [&](const auto &e) { - const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e; + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; if (adapter_typehash == hash) { using adapter_in_type_t = std::remove_cvref_tdecoder())>>; adapter_ptr = &adapter; - typename PortImplBase::port_payload_s payload = { + // Encode to the environment type and pack payload. + typename PortImplBase>::port_payload_s payload = { .typehash = hash, - .data = adapter->encoder()(*reinterpret_cast(data)), + .data = {adapter->encoder()(*reinterpret_cast(data))}, }; msgpack::sbuffer buf; msgpack::pack(buf, payload); - mc_impl_->send(addr, buf); + // Send encoded payload via the transport. + m_impl__->send(buf, addr); } }); @@ -67,20 +107,22 @@ protected: } } - void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final { + void *get_adapter__(const std::string &name, uint64_t namehash, uint64_t typehash, uint64_t cbk_typehash) const override final { void *ret = nullptr; tp::for_each(mc_adapters_, [&](auto &a) { if (!ret) { - auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = a; - if (adapter_typehash == typehash && adapter_namehash == namehash) { + auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = a; + // Match by adapter name/type and callback signature. + if (adapter_typehash == typehash && adapter_namehash == namehash && adapter_cbk_typehash == cbk_typehash) { ret = reinterpret_cast(adapter.get()); } } }); if (!ret) { - throw std::runtime_error(fmt::format("No adapter '{}' in port '{}'\r\n", name, this->name())); + throw std::runtime_error( + fmt::format("No such callback in adapter '{}' in port '{}' (namehash: #{}, typehash: #{}, cbk_typehash: #{})\r\n", name, this->name(), namehash, typehash, cbk_typehash)); } return ret; @@ -88,20 +130,57 @@ protected: private: using base_t_ = PortBase; - using cbk_type_t_ = std::function; + using cbk_return_type_t_ = typename std::tuple_element_t<0, std::tuple>::base_t::callback_type_t::result_type; - mutable std::tuple>...> mc_adapters_; - mutable std::unique_ptr> mc_impl_; + template class PortImplCallback_; + template class PortImplCallback_> { + public: + PortImplCallback_(const Port *port) : mc_port_(port) {} + using type_t = std::function; - void listen__(std::stop_token st) const override { mc_impl_->listen(st); } + cbk_return_type_t_ operator()(const port_data_type_t &data, uint64_t hash, Aargs &&...callback_args) const { + std::conditional_t, cbk_return_type_t_, std::false_type> ret{}; + bool found = false; + + tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) { + if (!found) { + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; + if (adapter_typehash == hash) { + // Decode payload and dispatch to the adapter's signal. + if constexpr (std::is_void_vcallback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) { + adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); + } else { + ret = adapter->callback()(adapter->decoder()(data), std::forward(callback_args)...); + } + + found = true; + } + } + }); + + if (!found) { + throw std::runtime_error(fmt::format("Adapter with type hash: {} not found", hash)); + } + + if constexpr (!std::is_void_v) { + return ret; + } + } + + private: + const Port *mc_port_; + }; + + mutable std::unique_ptr>> m_impl__{nullptr}; + mutable std::tuple>...> mc_adapters_; template void init_impl_(enum port_types_e pt, ImplArgs &&...args) const { using enum port_types_e; static constexpr auto make_null_impl_pair = []() consteval { if constexpr (port_type == UNKNOWN) { - return std::make_pair(port_type, static_cast *>(nullptr)); + return std::make_pair(port_type, static_cast> *>(nullptr)); } else { - return std::make_pair(port_type, static_cast *>(nullptr)); + return std::make_pair(port_type, static_cast> *>(nullptr)); } }; @@ -109,30 +188,24 @@ private: std::make_tuple(make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), - make_null_impl_pair.template operator()()); + make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), + make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()()); tp::for_each(impl_map, [&](const auto &p) { const auto &[type, null_pimpl] = p; if (type == pt) { using impl_type_t = std::remove_pointer_t; - if constexpr (std::is_constructible_v) { - mc_impl_ = std::make_unique( - // Args - std::forward(args)..., - - // Callback - [this](const port_data_type_t &data, size_t hash) { - tp::for_each(mc_adapters_, [&](const auto &e) { - const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e; - if (adapter_typehash == hash) { - adapter->callback()(adapter->decoder()(data)); - } - }); - }); + // Construct the selected PortImpl if its constructor matches the args. + if constexpr (std::is_constructible_v &&>) { + m_impl__ = std::make_unique(this, std::forward(args)..., PortImplCallback_(this)); } } }); + + if (!m_impl__) { + throw std::runtime_error("No PortImpl for port type ..."); + } } }; diff --git a/src/port_base.hpp b/src/port_base.hpp index 760c909..622ae57 100644 --- a/src/port_base.hpp +++ b/src/port_base.hpp @@ -1,8 +1,14 @@ #pragma once -#include -#include +#include "src/tuple.hpp" +#include +#include +#include #include +#include +#include + +#define ZMQ_BUILD_DRAFT_API #include #define FMT_HEADER_ONLY @@ -13,12 +19,16 @@ #include "port_types.hpp" #include "utils.hpp" +using namespace boost::callable_traits; + +// Abstract port API for sending encoded messages and accessing callbacks. template class PortBase { + // Helper binding an address/topic to a port for chained sends. class AddressedPort_ { public: AddressedPort_(const PortBase *port, const std::string &address) : mc_addr_(address), mc_port_(port) {} template const auto &operator<<(const InType &in) const { - mc_port_->send__(mc_addr_, &in, sizeof(InType), typeid(InType).hash_code()); + mc_port_->send__(&in, sizeof(InType), type_hash>(), mc_addr_); return *this; } @@ -28,39 +38,59 @@ template class PortBase { }; public: - PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx) - : mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash()(name)), mc_endpoint_hash_(std::hash()(endpoint)) {} + PortBase(enum port_types_e port_type, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx) + : mc_type_(port_type), mc_name_(name), mc_endpoints_(endpoints), mc_name_hash_(std::hash()(name)) {} + + virtual ~PortBase() = default; inline const auto &type() const { return mc_type_; } inline const auto &name() const { return mc_name_; } - inline const auto &endpoint() const { return mc_endpoint_; } + inline const auto &endpoints() const { return mc_endpoints_; } inline const auto &name_hash() const { return mc_name_hash_; } - inline const auto &endpoint_hash() const { return mc_endpoint_hash_; } const auto operator[](const std::string &address) const { return AddressedPort_(this, address); } template const PortBase &operator<<(const InType &in) const { - send__("", &in, sizeof(InType), typeid(InType).hash_code()); + // Use empty address for non-addressed sends. + send__(&in, sizeof(InType), type_hash>()); return *this; } - void listen() const { listen__(m_ss_.get_token()); }; virtual void stop() const = 0; - - template auto &callback(const std::string &name) const { - return (*static_cast *>(get_adapter__(name, std::hash()(name), typeid(std::remove_cvref_t).hash_code()))).callback(); + template auto &callback(const std::string &name) const { + // Lookup adapter by callback signature and name. + return GetCallbackHelper_>::type>(this).template operator()(name); } protected: - void stop__() const { m_ss_.request_stop(); } - - virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0; - virtual void listen__(std::stop_token) const = 0; - virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0; + virtual void stop__() const = 0; + virtual void send__(const void *data, size_t size, uint64_t type_hash, const std::string &addr = "") const = 0; + virtual void *get_adapter__(const std::string &name, uint64_t namehash, uint64_t typehash, uint64_t cbk_typehash) const = 0; private: const enum port_types_e mc_type_; - const std::string mc_name_, mc_endpoint_; - const size_t mc_name_hash_, mc_endpoint_hash_; - mutable std::stop_source m_ss_; + const std::string mc_name_; + const std::map mc_endpoints_; + const uint64_t mc_name_hash_; + + // Type-safe callback lookup helper. + template class GetCallbackHelper_; + template class GetCallbackHelper_> { + public: + GetCallbackHelper_(const PortBase *port) : mc_port_(port) {} + + template auto &operator()(const std::string &name) const { + using ret_type_t = std::remove_cvref_t>>; + using arg_type_t = std::remove_cvref_t>>>; + using cbk_type_t = typename AdapterBase, Aargs...>::callback_type_t::signature_type; + + return (*static_cast, Aargs...> *>( + mc_port_->get_adapter__(name, std::hash()(name), type_hash(), type_hash>()))) + .callback(); + // typeid(typename AdapterBase, Aargs...>::callback_type_t).hash_code() + } + + private: + const PortBase *mc_port_; + }; }; diff --git a/src/port_dealer_impl.hpp b/src/port_dealer_impl.hpp index 012632a..e0b2ce8 100644 --- a/src/port_dealer_impl.hpp +++ b/src/port_dealer_impl.hpp @@ -2,21 +2,26 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +// Dealer transport placeholder (no send/recv logic yet). +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) {} + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {}; + +private: + void listen__(std::stop_token st) const override { while (!st.stop_requested()) { std::this_thread::sleep_for(std::chrono::milliseconds(100u)); } } - - // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; - diff --git a/src/port_dish_impl.hpp b/src/port_dish_impl.hpp new file mode 100644 index 0000000..ab2d096 --- /dev/null +++ b/src/port_dish_impl.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" +#include +#include +#include + +#define FMT_HEADER_ONLY +#include +#include + +// Dish transport: connects, joins groups, and dispatches payloads with group name. +template class PortImpl : public PortImplBase { +public: + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, const std::list &groups, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)), mc_groups_(groups) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::dish); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } + + // Join each group. + for (const auto &group : mc_groups_) { + this->m_sock__.join(group.c_str()); + } + + // Start async listener loop. + listen__(this->stop_source().get_token()); + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on DISH pattern socket"); }; + +private: + const std::list mc_groups_; + + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + std::string group = std::string(msg.group()); + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + // Dispatch each decoded item. + for (const auto &data : batch) { + this->mc_cbk__(data, typehash, group); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } +}; diff --git a/src/port_factory.hpp b/src/port_factory.hpp index 8d155fd..ab1b850 100644 --- a/src/port_factory.hpp +++ b/src/port_factory.hpp @@ -1,10 +1,119 @@ #pragma once #include "port.hpp" +#include +#include +#include +#include +// Factory for building a Port from adapters and extra args. template -auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, - std::tuple &&args = {}) { - return std::make_unique, std::tuple>>(pt, name, endpoint, zmq_ctx, std::forward...>>(adapters), +auto makePort(enum port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters, std::tuple &&args = {}) { + return std::make_unique, std::tuple>>(pt, name, endpoints, zmq_ctx, std::forward...>>(adapters), std::forward>(args)); } + +class PortBuilderNamed; +class PortBuilderEndpoints; +class PortBuilderContext; +class PortBuilderWithContext; + +template class PortBuilderAdapters; +template class PortBuilderArgs; + +class PortBuilderNamed { +public: + explicit PortBuilderNamed(port_types_e pt) : m_pt_(pt) {} + PortBuilderEndpoints withName(const std::string &name); + +private: + port_types_e m_pt_; +}; + +// Fluent builder to assemble a Port from type/name/endpoints/context/adapters/args. +class PortBuilder { +public: + inline PortBuilderNamed withType(port_types_e pt) { return PortBuilderNamed(pt); } +}; + +class PortBuilderWithContext { +public: + PortBuilderWithContext(port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx) + : m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx) {} + + template inline PortBuilderAdapters> withAdapters(std::tuple...> &&adapters) { + return PortBuilderAdapters>(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(adapters)); + } + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; + zmq::context_t *m_ctx_; +}; + +class PortBuilderContext { +public: + PortBuilderContext(port_types_e pt, const std::string &name, const std::map &endpoints) : m_pt_(pt), m_name_(name), m_endpoints_(endpoints) {} + PortBuilderWithContext withContext(zmq::context_t &zmq_ctx); + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; +}; + +class PortBuilderEndpoints { +public: + PortBuilderEndpoints(port_types_e pt, const std::string &name) : m_pt_(pt), m_name_(name) {} + PortBuilderContext withEndpoints(const std::map &endpoints); + +private: + port_types_e m_pt_; + std::string m_name_; +}; + +template class PortBuilderAdapters> { +public: + PortBuilderAdapters(port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters) + : m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx), m_adapters_(&adapters) {} + + template PortBuilderArgs, std::tuple> withArgs(std::tuple &&args) { + return PortBuilderArgs, std::tuple>(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(*m_adapters_), + std::forward>(args)); + } + + auto finalize() { return makePort(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(*m_adapters_)); } + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; + zmq::context_t *m_ctx_; + std::tuple...> *m_adapters_; +}; + +template class PortBuilderArgs, std::tuple> { +public: + PortBuilderArgs(port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters, std::tuple &&args) + : m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx), m_adapters_(&adapters), m_args_(std::forward>(args)) {} + + inline auto finalize() { + return makePort(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward...>>(*m_adapters_), std::forward>(m_args_)); + } + +private: + port_types_e m_pt_; + std::string m_name_; + std::map m_endpoints_; + zmq::context_t *m_ctx_; + std::tuple...> *m_adapters_; + std::tuple m_args_; +}; + +inline PortBuilderEndpoints PortBuilderNamed::withName(const std::string &name) { return PortBuilderEndpoints(m_pt_, name); } +inline PortBuilderWithContext PortBuilderContext::withContext(zmq::context_t &zmq_ctx) { return PortBuilderWithContext(m_pt_, m_name_, m_endpoints_, zmq_ctx); } +inline PortBuilderContext PortBuilderEndpoints::withEndpoints(const std::map &endpoints) { return PortBuilderContext(m_pt_, m_name_, endpoints); } diff --git a/src/port_impl_base.hpp b/src/port_impl_base.hpp index 375c964..96d88ab 100644 --- a/src/port_impl_base.hpp +++ b/src/port_impl_base.hpp @@ -4,37 +4,57 @@ #include #include #include +#include + +#define ZMQ_BUILD_DRAFT_API #include -template class PortImplBase { +// Base ZMQ-backed transport for a port; manages socket and listener thread. +template class PortImplBase { using port_data_type_t_ = std::remove_cvref_t>>; public: + // Wire payload: type hash + batch of encoded values. struct port_payload_s { - size_t typehash; - port_data_type_t_ data; + uint64_t typehash; + std::vector data; MSGPACK_DEFINE(typehash, data); }; - PortImplBase(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : mc_endpoint__(endpoint), m_ctx__(zmq_ctx), mc_cbk__(callback) {} - + PortImplBase(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : mc_endpoints__(endpoints), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {} virtual ~PortImplBase() = default; - virtual void listen(std::stop_token st) const = 0; - virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0; + virtual void send(const msgpack::sbuffer &data, const std::string &addr = "") const = 0; void close() { - m_sock__.close(); - + // Join listener thread before closing the socket to avoid polling on a closed fd. if (m_listener_thread__.valid()) { - m_listener_thread__.get(); + try { + m_listener_thread__.get(); + } catch (...) { + // Ignore listener exceptions during shutdown. + } + } + + try { + m_sock__.close(); + } catch (...) { + // Ignore close errors during shutdown. } }; + inline auto &stop_source() { return m_ss_; } + protected: + static constexpr auto sc_recv_timeout_ms__ = 1'000u; mutable std::future m_listener_thread__; mutable zmq::socket_t m_sock__; zmq::context_t &m_ctx__; - const std::string mc_endpoint__; + const std::map mc_endpoints__; const Callback mc_cbk__; + const Port *mc_port__; + virtual void listen__(std::stop_token st) const = 0; +private: + mutable std::stop_source m_ss_; }; diff --git a/src/port_pair_impl.hpp b/src/port_pair_impl.hpp index be1e06b..4831ae8 100644 --- a/src/port_pair_impl.hpp +++ b/src/port_pair_impl.hpp @@ -2,20 +2,148 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +// Pair transport (client): connects and listens for incoming payloads. +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { - while (!st.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); } + + listen__(this->stop_source().get_token()); } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } +}; + +// Pair transport (server): binds and listens for incoming payloads. +template class PortImpl : public PortImplBase { +public: + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } + + listen__(this->stop_source().get_token()); + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } }; diff --git a/src/port_publisher_impl.hpp b/src/port_publisher_impl.hpp index 9e40d5b..644f200 100644 --- a/src/port_publisher_impl.hpp +++ b/src/port_publisher_impl.hpp @@ -7,24 +7,30 @@ #include #include -template class PortImpl final : public PortImplBase { +// Publisher transport: binds endpoints and sends topic + payload frames. +template class PortImpl final : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub); - this->m_sock__.bind(this->mc_endpoint__); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } } ~PortImpl() override {} - void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); } - // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { try { this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore); - this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); } catch (const zmq::error_t &err) { fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); } }; + +private: + void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); } }; diff --git a/src/port_pull_impl.hpp b/src/port_pull_impl.hpp index cec7730..b9d8d04 100644 --- a/src/port_pull_impl.hpp +++ b/src/port_pull_impl.hpp @@ -2,20 +2,73 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +// Pull transport: connects and listens for incoming payloads. +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { - while (!st.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pull); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); } + + listen__(this->stop_source().get_token()); } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); }; + +private: + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } }; diff --git a/src/port_push_impl.hpp b/src/port_push_impl.hpp index dd0c794..e8ac9aa 100644 --- a/src/port_push_impl.hpp +++ b/src/port_push_impl.hpp @@ -7,15 +7,27 @@ #include #include -template class PortImpl : public PortImplBase { +// Push transport: binds endpoints and sends fire-and-forget payloads. +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { - while (!st.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); } } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUSH pattern socket"); } }; diff --git a/src/port_radio_impl.hpp b/src/port_radio_impl.hpp new file mode 100644 index 0000000..950d6c3 --- /dev/null +++ b/src/port_radio_impl.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" +#include + +#define FMT_HEADER_ONLY +#include +#include + +// Radio transport: binds endpoints and sends group-tagged payloads. +template class PortImpl final : public PortImplBase { +public: + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::radio); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + zmq::message_t msg(data.data(), data.size()); + if (!addr.empty()) { + msg.set_group(addr.c_str()); + } + + this->m_sock__.send(msg, zmq::send_flags::none); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on RADIO pattern socket"); } +}; diff --git a/src/port_rep_impl.hpp b/src/port_rep_impl.hpp index 60a018f..5512a1c 100644 --- a/src/port_rep_impl.hpp +++ b/src/port_rep_impl.hpp @@ -1,22 +1,112 @@ #pragma once -#include "port_impl_base.hpp" -#include "port_types.hpp" +#include +#include +#include +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +#include "port_impl_base.hpp" +#include "port_types.hpp" +#include "tuple.hpp" +#include "utils.hpp" + +// Reply transport: listens for requests and sends reply payloads. +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { - while (!st.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::rep); + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); } + + this->m_sock__.set(zmq::sockopt::sndtimeo, static_cast(base_t::sc_recv_timeout_ms__)); + + // Start async listener loop. + listen__(this->stop_source().get_token()); } // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; -}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); }; +private: + void listen__(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); + + while (!st.stop_requested()) { + std::vector> events(1u); + uint64_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); + + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + // Callback should return reply data for REP sockets. + if constexpr (!std::is_void_vmc_cbk__)>>>) { + if (batch.size()) { + auto reply_data = this->mc_cbk__(batch.front(), typehash); + typename base_t::port_payload_s reply_payload = { + .typehash = type_hash>(), + }; + + for (const auto &d : reply_data) { + using adapter_in_type_t = std::remove_cvref_t; + + uint64_t typehash = type_hash(); + + // Find matching encoder by type to build reply payload. + tp::for_each(this->mc_port__->adapters(), [&](const auto &e) { + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e; + if constexpr (std::is_same_vencoder())>>>, + adapter_in_type_t>) { + if (adapter_typehash == typehash) { + reply_payload.data.push_back(adapter->encoder()(d)); + } + } + }); + } + + msgpack::sbuffer buf; + msgpack::pack(buf, reply_payload); + + // Send reply for this item. + this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none); + } + } else { + throw std::runtime_error("Callback of REPLY pattern socket should return non-void value"); + } + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } + } + }, + st); + } +}; diff --git a/src/port_req_impl.hpp b/src/port_req_impl.hpp index 2a505d1..7b06de4 100644 --- a/src/port_req_impl.hpp +++ b/src/port_req_impl.hpp @@ -2,21 +2,54 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +// Request transport: sends a payload and waits for a reply. +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { - while (!st.stop_requested()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::req); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); } + + // Avoid blocking forever if no REP is available. + this->m_sock__.set(zmq::sockopt::rcvtimeo, static_cast(base_t::sc_recv_timeout_ms__)); } - // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; -}; + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { + try { + zmq::message_t reply; + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none); + this->m_sock__.recv(reply, zmq::recv_flags::none).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(reply.data()), reply.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + for (const auto &data : batch) { + this->mc_cbk__(data, typehash); + } + + return std::optional(res); + }); + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + throw; + } + }; + +private: + void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on REQUEST pattern socket"); } +}; diff --git a/src/port_router_impl.hpp b/src/port_router_impl.hpp index 4c5e431..aa4293f 100644 --- a/src/port_router_impl.hpp +++ b/src/port_router_impl.hpp @@ -2,21 +2,26 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +// Router transport placeholder (no send/recv logic yet). +template class PortImpl : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} - void listen(std::stop_token st) const override { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) {} + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {}; + +private: + void listen__(std::stop_token st) const override { while (!st.stop_requested()) { std::this_thread::sleep_for(std::chrono::milliseconds(100u)); } } - - // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; - diff --git a/src/port_subscriber_impl.hpp b/src/port_subscriber_impl.hpp index 8e4007a..2b8824d 100644 --- a/src/port_subscriber_impl.hpp +++ b/src/port_subscriber_impl.hpp @@ -2,54 +2,93 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include +#include +#include +#include +#include +#include +#include #define FMT_HEADER_ONLY #include #include -template class PortImpl : public PortImplBase { +// Subscriber transport: connects, subscribes to topics, and dispatches payloads. +template class PortImpl : public PortImplBase { public: - using base_t = PortImplBase; - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list &topics, Callback &&callback) - : PortImplBase(zmq_ctx, endpoint, std::forward(callback)), mc_topics_(topics) { + using base_t = PortImplBase; + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, const std::list &topics, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)), mc_topics_(topics) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub); - this->m_sock__.connect(this->mc_endpoint__); + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } + + // Subscribe to each topic prefix. for (const auto &topic : mc_topics_) { this->m_sock__.set(zmq::sockopt::subscribe, topic); } + + // Start async listener loop. + listen__(this->stop_source().get_token()); } - void listen(std::stop_token st) const override { + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; + +private: + const std::list mc_topics_; + + void listen__(std::stop_token st) const override { this->m_listener_thread__ = std::async( std::launch::async, [this](std::stop_token st) { - while (!st.stop_requested()) { - zmq::message_t msg; + try { + zmq::poller_t poller; + poller.add(this->m_sock__, zmq::event_flags::pollin); - this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { - fmt::print("Received envelope: {}\r\n", std::string(static_cast(msg.data()), msg.size())); - this->m_sock__.recv(msg).and_then([&](const auto &res) { - typename base_t::port_payload_s payload; + while (!st.stop_requested()) { + std::vector> events(1u); + size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__)); - msgpack::sbuffer buf; - buf.write(reinterpret_cast(msg.data()), msg.size()); - const auto [typehash, data] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + for (int32_t i = 0; i < num_events; ++i) { + zmq::message_t msg; - this->mc_cbk__(data, typehash); - return std::optional(res); - }); + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + // First frame is the topic envelope. + std::string envelope = std::string(static_cast(msg.data()), msg.size()); - return std::optional(res); - }); + this->m_sock__.recv(msg).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + // Dispatch each decoded item. + for (const auto &data : batch) { + this->mc_cbk__(data, typehash, envelope); + } + + return std::optional(res); + }); + + return std::optional(res); + }); + } + } + } catch (const zmq::error_t &) { + if (!st.stop_requested()) { + throw; + } + } catch (...) { + if (!st.stop_requested()) { + throw; + } } }, st); } - - // Send to socket depending on implementation - void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; - -private: - const std::list mc_topics_; }; diff --git a/src/port_types.hpp b/src/port_types.hpp index d7d1ae1..2fb6cf2 100644 --- a/src/port_types.hpp +++ b/src/port_types.hpp @@ -2,17 +2,39 @@ #include +// Supported ZMQ socket patterns for ports. enum class port_types_e : uint32_t { UNKNOWN = 0, PUB, SUB, REQ, REP, - ROUTER, - DEALER, PUSH, PULL, - PAIR, + PAIR_CLIENT, + PAIR_SERVER, + RADIO, + DISH, + + ROUTER, + DEALER, }; template class PortImpl; + +// Endpoint tag base type. +struct endpoints_base_s {}; +template struct endpoints_s; + +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; diff --git a/src/ports.hpp b/src/ports.hpp index 05baa0a..573ff81 100644 --- a/src/ports.hpp +++ b/src/ports.hpp @@ -10,4 +10,5 @@ #include "port_router_impl.hpp" #include "port_dealer_impl.hpp" #include "port_pair_impl.hpp" - +#include "port_radio_impl.hpp" +#include "port_dish_impl.hpp" diff --git a/src/templates.cpp b/src/templates.cpp index 9f26017..e070aa0 100644 --- a/src/templates.cpp +++ b/src/templates.cpp @@ -1,116 +1,602 @@ #include +#include #include #include +#include #include #include #include - -#include +#include #define FMT_HEADER_ONLY #include #include #include "adapter_factory.hpp" +#include "codecs.hpp" #include "module_factory.hpp" #include "port_factory.hpp" -void entry(int32_t argc, char **argv, char **envp, const std::unordered_map> *> &ports) { - const auto &subscriber = *ports.at("subscriber"); - const auto &publisher = *ports.at("publisher"); +using env_data_type_t = std::vector; - auto &a = subscriber.callback("int-string-int"); - auto &b = subscriber.callback("string-string-string"); +void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port - a.connect([](const int32_t &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name()); }); - b.connect([](const std::string &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name()); }); + // Fetch typed callbacks by adapter name and signature. + auto &int_cbk = subscriber.callback("int-vector-int"); + auto &string_cbk = subscriber.callback("string-vector-string"); + auto &double_cbk = subscriber.callback("double-vector-double"); - subscriber.listen(); + // Connect callbacks + int_cbk.connect([](const int32_t &i, const std::string &addr) { + fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", i, type_name>(), addr); + }); + + string_cbk.connect([](const std::string &s, const std::string &addr) { + fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", s, type_name>(), addr); + }); + + double_cbk.connect([](const double &d, const std::string &addr) { + fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", d, type_name>(), addr); + }); +} + +// Publisher module entrypoint +void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &publisher = *ports.at("publisher_port"); // Get publisher port + + // Publish data publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test"); publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test"); publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test"); publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test"); +} - std::this_thread::sleep_for(std::chrono::milliseconds(1000u)); - subscriber.stop(); +// Publisher module entrypoint +void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &publisher = *ports.at("push_port"); // Get publisher port + publisher << 1 << 2 << double{3.f} << std::string("test"); +} + +void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &puller = *ports.at("pull_port"); // Get subscriber port + + // Fetch typed callbacks by adapter name and signature. + auto &int_cbk = puller.callback("int-vector-int"); + auto &string_cbk = puller.callback("string-vector-string"); + auto &double_cbk = puller.callback("double-vector-double"); + + // Connect callbacks + int_cbk.connect([](const int32_t &i) { fmt::print("PULL socket: got data: {} of {} type\r\n", i, type_name>()); }); + string_cbk.connect([](const std::string &s) { fmt::print("PULL socket: got data: {} of {} type\r\n", s, type_name>()); }); + double_cbk.connect([](const double &d) { fmt::print("PULL socket: got data: {} of {} type\r\n", d, type_name>()); }); +} + +void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &req = *ports.at("req_port"); // Get publisher port + // static_assert(std::is_same_v, void>, ""); + // Fetch typed callbacks by adapter name and signature. + auto &int_cbk = req.callback("int-vector-int"); + auto &string_cbk = req.callback("string-vector-string"); + auto &double_cbk = req.callback("double-vector-double"); + + // Connect callbacks + int_cbk.connect([](const int32_t &i) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", i, type_name>()); }); + string_cbk.connect([](const std::string &s) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", s, type_name>()); }); + double_cbk.connect([](const double &d) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", d, type_name>()); }); + + req << 1 << 2 << double{3.f} << std::string("test"); +} + +void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &rep = *ports.at("rep_port"); // Get subscriber port + + // Fetch typed callbacks by adapter name and signature. + auto &int_cbk = rep.callback("int-vector-int"); + auto &string_cbk = rep.callback("string-vector-string"); + auto &double_cbk = rep.callback("double-vector-double"); + + // Connect callbacks + int_cbk.connect([](const int32_t &i) -> std::string { + fmt::print("REPLY socket: got data: {} of {} type\r\n", i, type_name>()); + // Handle data ... + return fmt::format("'Handle request: {} of type: {}'", i, type_name>()); + }); + + string_cbk.connect([](const std::string &s) -> std::string { + fmt::print("REPLY socket: got data: {} of {} type (FIRST callback)\r\n", s, type_name>()); + // Handle data ... + return fmt::format("'Handle request: {} of type: {}' (FIRST callback)", s, type_name>()); + }); + + string_cbk.connect([](const std::string &s) -> std::string { + fmt::print("REPLY socket: got data: {} of {} type (SECOND callback)\r\n", s, type_name>()); + // Handle data ... + return fmt::format("'Handle request: {} of type: {}' (SECOND callback)", s, type_name>()); + }); + + double_cbk.connect([](const double &d) -> std::string { + fmt::print("REPLY socket: got data: {} of {} type\r\n", d, type_name>()); + // Handle data ... + return fmt::format("'Handle request: {} of type: {}'", d, type_name>()); + }); +} + +void pair_server_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &pair = *ports.at("pair_server_port"); + + auto &int_cbk = pair.callback("int-vector-int"); + auto &string_cbk = pair.callback("string-vector-string"); + auto &double_cbk = pair.callback("double-vector-double"); + + int_cbk.connect([](const int32_t &i) { fmt::print("PAIR server: got data: {} of {} type\r\n", i, type_name>()); }); + string_cbk.connect([](const std::string &s) { fmt::print("PAIR server: got data: {} of {} type\r\n", s, type_name>()); }); + double_cbk.connect([](const double &d) { fmt::print("PAIR server: got data: {} of {} type\r\n", d, type_name>()); }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + pair << 4 << 5 << double{6.f} << std::string("server->client"); +} + +void pair_client_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &pair = *ports.at("pair_client_port"); + + auto &int_cbk = pair.callback("int-vector-int"); + auto &string_cbk = pair.callback("string-vector-string"); + auto &double_cbk = pair.callback("double-vector-double"); + + int_cbk.connect([](const int32_t &i) { fmt::print("PAIR client: got data: {} of {} type\r\n", i, type_name>()); }); + string_cbk.connect([](const std::string &s) { fmt::print("PAIR client: got data: {} of {} type\r\n", s, type_name>()); }); + double_cbk.connect([](const double &d) { fmt::print("PAIR client: got data: {} of {} type\r\n", d, type_name>()); }); + + pair << 1 << 2 << double{3.f} << std::string("test"); +} + +void dish_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &dish = *ports.at("dish_port"); + + auto &int_cbk = dish.callback("int-vector-int"); + auto &string_cbk = dish.callback("string-vector-string"); + auto &double_cbk = dish.callback("double-vector-double"); + + int_cbk.connect([](const int32_t &i, const std::string &group) { + fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", i, type_name>(), group); + }); + + string_cbk.connect([](const std::string &s, const std::string &group) { + fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", s, type_name>(), group); + }); + + double_cbk.connect([](const double &d, const std::string &group) { + fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", d, type_name>(), group); + }); +} + +void radio_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { + // Resolve port by name. + const auto &radio = *ports.at("radio_port"); + + radio["grp0"] << 1 << 2 << double{3.f}; + radio["grp1"] << std::string("test"); } int main(int argc, char *argv[], char *envp[]) { using enum port_types_e; - zmq::context_t zmq_ctx; + codecs_s codecs; + // Shared ZMQ context for in-process transports. + zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads - auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx, - std::tuple{ - makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx, - std::tuple{ - makeAdapter("int-string-int", std::pair{ - [](const int32_t &i) -> std::vector { - auto str = std::to_string(i + 5); - return {str.begin(), str.end()}; - }, + // Make module that contains only 1 port working on PUBLISHER pattern + auto publisher_module = ModuleBuilder() + .withName("publisher_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PUB) + .withName("publisher_port") + .withEndpoints({ + {"test", "inproc://PUB-SUB"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - [](const std::vector &s) -> int32_t { return 5; }, - }), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - makeAdapter("string-string-string", std::pair{ - [](const std::string &i) -> std::vector { - auto str = i + "_test"; - return {str.begin(), str.end()}; - }, + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - [](const std::vector &i) -> std::string { return "works!"; }, - }), + // Make module that contains only 1 port working on SUBSCRIBER pattern + auto subscriber_module = ModuleBuilder() + .withName("subscriber_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(SUB) + .withName("subscriber_port") + .withEndpoints({ + {"test", "inproc://PUB-SUB"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - makeAdapter("double-string-double", std::pair{ - [](const double &i) -> std::vector { - auto str = std::to_string(i / 2.f); - return {str.begin(), str.end()}; - }, + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - [](const std::vector &s) -> double { return .1f; }, - }), - }), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .withArgs(std::tuple{ + std::list{"topic0", "topic1", "topic2", "topic3"}, + }) + .finalize(), + }) + .finalize(argc, argv, envp); - makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx, - std::tuple{ - makeAdapter("int-string-int", std::pair{ - [](const int32_t &i) -> std::vector { - auto str = std::to_string(i + 5); - return {str.begin(), str.end()}; - }, + auto pusher_module = ModuleBuilder() + .withName("pusher_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PUSH) + .withName("push_port") + .withEndpoints({ + {"test", "inproc://PUSH-PULL"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - [](const std::vector &s) -> int32_t { return 5; }, - }), + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - makeAdapter("string-string-string", std::pair{ - [](const std::string &i) -> std::vector { - auto str = i + "_test"; - return {str.begin(), str.end()}; - }, + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - [](const std::vector &i) -> std::string { return "works!"; }, - }), + auto puller_module = ModuleBuilder() + .withName("puller_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PULL) + .withName("pull_port") + .withEndpoints({ + {"test", "inproc://PUSH-PULL"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), - makeAdapter("double-string-double", std::pair{ - [](const double &i) -> std::vector { - auto str = std::to_string(i / 2.f); - return {str.begin(), str.end()}; - }, + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), - [](const std::vector &s) -> double { return .1f; }, - }), - }, + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); - std::tuple{ - std::list{ - "topic0", - "topic1", - "topic2", - "topic3", - }, - }), - }); + auto req_module = ModuleBuilder() + .withName("req_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(REQ) + .withName("req_port") + .withEndpoints({ + {"test", "inproc://REQ-REP"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto rep_module = ModuleBuilder() + .withName("rep_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(REP) + .withName("rep_port") + .withEndpoints({ + {"test", "inproc://REQ-REP"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto pair_server_module = ModuleBuilder() + .withName("pair_server_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PAIR_SERVER) + .withName("pair_server_port") + .withEndpoints({ + {"test", "inproc://PAIR"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto pair_client_module = ModuleBuilder() + .withName("pair_client_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(PAIR_CLIENT) + .withName("pair_client_port") + .withEndpoints({ + {"test", "inproc://PAIR"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto dish_module = ModuleBuilder() + .withName("dish_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(DISH) + .withName("dish_port") + .withEndpoints({ + {"test", "inproc://RADIO-DISH"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .withArgs(std::tuple{ + std::list{"grp0", "grp1"}, + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + auto radio_module = ModuleBuilder() + .withName("radio_module") + .withContext(zmq_ctx) + .withPorts(std::tuple{ + PortBuilder() + .withType(RADIO) + .withName("radio_port") + .withEndpoints({ + {"test", "inproc://RADIO-DISH"}, + }) + .withContext(zmq_ctx) + .withAdapters(std::tuple{ + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_int) + .decodeDataBy(&codecs.decoders.to_int) + .withCallbackSignature() + .withName("int-vector-int") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_string) + .decodeDataBy(&codecs.decoders.to_string) + .withCallbackSignature() + .withName("string-vector-string") + .finalize(), + + AdapterBuilder() + .encodeDataBy(&codecs.encoders.from_double) + .decodeDataBy(&codecs.decoders.to_double) + .withCallbackSignature() + .withName("double-vector-double") + .finalize(), + }) + .finalize(), + }) + .finalize(argc, argv, envp); + + fmt::print("\r\nPUB-SUB test:\r\n"); + subscriber_module->run(subscriber_entry); // Subscribe and get data + publisher_module->run(publisher_entry); // Publish data + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + fmt::print("\r\nPUSH-PULL test:\r\n"); + pusher_module->run(pusher_entry); + puller_module->run(puller_entry); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + fmt::print("\r\nREQ-REP test:\r\n"); + rep_module->run(rep_entry); + req_module->run(req_entry); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + fmt::print("\r\nPAIR test:\r\n"); + pair_server_module->run(pair_server_entry); + pair_client_module->run(pair_client_entry); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + fmt::print("\r\nRADIO-DISH test:\r\n"); + dish_module->run(dish_entry); + radio_module->run(radio_entry); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + fmt::print("DONE!\r\n"); - mod->run(entry); return 0; } diff --git a/src/tuple.hpp b/src/tuple.hpp new file mode 100644 index 0000000..060ef9a --- /dev/null +++ b/src/tuple.hpp @@ -0,0 +1,127 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +// Tuple utilities for compile-time iteration and transformations. +namespace tp { +namespace detail { + +template constexpr Func for_each_arg(Func f, Args &&...args) { + (f(std::forward(args)), ...); + return f; +} + +template constexpr Func for_each_impl(Tuple &&t, Func &&f, std::index_sequence is) { + (std::forward(f)(std::get(std::forward(t))), ...); + return f; +} + +template auto transform_impl(std::tuple const &inputs, Function function, std::index_sequence is) { + return std::tuple...>{function(std::get(inputs))...}; +} + +template auto subtuple(const std::tuple &t, std::index_sequence) { return std::make_tuple(std::get(t)...); } + +// ZIP utilities +template using zip_tuple_at_index_t = std::tuple>...>; +template zip_tuple_at_index_t zip_tuple_at_index(Tuples &&...tuples) { return {std::get(std::forward(tuples))...}; } +template std::tuple...> tuple_zip_impl(Tuples &&...tuples, std::index_sequence) { + return {zip_tuple_at_index(std::forward(tuples)...)...}; +} +}; // namespace detail + +template constexpr decltype(auto) for_each(Tuple &&tuple, F &&f) { + return [](Tuple &&tuple, F &&f, std::index_sequence) { + (f(std::get(tuple)), ...); + return f; + }(std::forward(tuple), std::forward(f), std::make_index_sequence>::value>{}); +} + +template auto transform(std::tuple const &inputs, Function function) { + return detail::transform_impl(inputs, function, std::make_index_sequence{}); +} + +template constexpr size_t find_if(Tuple &&tuple, Predicate pred) { + size_t index = std::tuple_size>::value; + size_t currentIndex = 0; + bool found = false; + + for_each(tuple, [&](auto &&value) { + if (!found && pred(value)) { + index = currentIndex; + found = true; + } + + ++currentIndex; + }); + + return index; +} + +template void perform(Tuple &&tuple, size_t index, Action action) { + size_t currentIndex = 0; + for_each(tuple, [&action, index, ¤tIndex](auto &&value) { + if (currentIndex == index) { + + action(std::forward(value)); + } + + ++currentIndex; + }); +} + +template bool all_of(Tuple &&tuple, Predicate pred) { + return find_if(tuple, std::not_fn(pred)) == std::tuple_size>::value; +} + +template bool none_of(Tuple &&tuple, Predicate pred) { return find_if(tuple, pred) == std::tuple_size>::value; } +template bool any_of(Tuple &&tuple, Predicate pred) { return !none_of(tuple, pred); } + +template Tuple &operator|(Tuple &&tuple, Function func) { + for_each(tuple, func); + return tuple; +} + +template auto subtuple(const std::tuple &t) { return detail::subtuple(t, std::make_index_sequence()); } + +template ())> struct sub_range; + +template struct sub_range, std::index_sequence> { + static_assert(elems <= sizeof...(Args) - starting, "sub range is out of bounds!"); + using tuple = std::tuple>...>; +}; + +template auto select_tuple(Tuple &&tuple, std::index_sequence) { + return std::tuple...>(std::get(std::forward(tuple))...); +} + +template struct tuple_index; + +template struct tuple_index> { + static const std::size_t value = 0; +}; + +template struct tuple_index> { + static const std::size_t value = 1 + tuple_index>::value; +}; + +// ZIP +template + requires((std::tuple_size_v> == std::tuple_size_v>) && ...) +auto tuple_zip(Head &&head, Tail &&...tail) { + return detail::tuple_zip_impl(std::forward(head), std::forward(tail)..., std::make_index_sequence>>()); +} + +// TAIL +template struct tuple_tail; +template struct tuple_tail> { + using type = std::tuple; +}; // +}; // namespace tp diff --git a/src/utils.hpp b/src/utils.hpp new file mode 100644 index 0000000..9995e19 --- /dev/null +++ b/src/utils.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +// Trait to detect template specializations. +template class Template> struct is_specialization : std::false_type {}; +template