diff --git a/.clangd b/.clangd index b4dd5e5..e9d1d6b 100644 --- a/.clangd +++ b/.clangd @@ -1,2 +1,2 @@ CompileFlags: - Add: ['-std=gnu++23'] + Add: ['-std=gnu++23', -I/opt/cling/include] diff --git a/CMakeLists.txt b/CMakeLists.txt index aad2ac1..bcb0d19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,6 +19,14 @@ FetchContent_Declare( ) FetchContent_MakeAvailable(inja) +# FetchContent_Declare( +# json +# GIT_REPOSITORY https://github.com/nlohmann/json +# GIT_PROGRESS TRUE +# GIT_TAG main +# ) + +# FetchContent_MakeAvailable(json) FetchContent_Declare( libzmq diff --git a/src/adapter_base.hpp b/src/adapter_base.hpp index de64803..eb02a43 100644 --- a/src/adapter_base.hpp +++ b/src/adapter_base.hpp @@ -37,7 +37,6 @@ public: virtual callback_type_t &callback() const = 0; inline const auto &name() const { return mc_name_; } -protected: private: const std::string mc_name_; }; diff --git a/src/port.hpp b/src/port.hpp index 74504d1..91e88d0 100644 --- a/src/port.hpp +++ b/src/port.hpp @@ -32,9 +32,9 @@ public: using port_data_type_t = std::tuple_element_t<0, std::tuple...>>; using callback_aargs_t = tp::tuple_tail>::callback_type_t::signature_type>>::type; - Port(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, - std::tuple &&args) - : PortBase(pt, name, endpoint, zmq_ctx), + Port(enum port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters, std::tuple &&args) + : PortBase(pt, name, endpoints, zmq_ctx), // Init adapters mc_adapters_([&](std::index_sequence) { @@ -50,7 +50,7 @@ public: typeid(adapter_callback_type_t).hash_code(), std::forward>(std::get(adapters))); }.template operator()()...); }(std::make_index_sequence{})) { - std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward(args)...); }, std::forward(args)); + std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoints), std::forward(args)...); }, std::forward(args)); } ~Port() override { stop(); } diff --git a/src/port_base.hpp b/src/port_base.hpp index 98812dc..5faa537 100644 --- a/src/port_base.hpp +++ b/src/port_base.hpp @@ -34,16 +34,15 @@ template class PortBase { }; public: - PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx) - : mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash()(name)), mc_endpoint_hash_(std::hash()(endpoint)) {} + PortBase(enum port_types_e port_type, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx) + : mc_type_(port_type), mc_name_(name), mc_endpoints_(endpoints), mc_name_hash_(std::hash()(name)) {} virtual ~PortBase() = default; inline const auto &type() const { return mc_type_; } inline const auto &name() const { return mc_name_; } - inline const auto &endpoint() const { return mc_endpoint_; } + inline const auto &endpoints() const { return mc_endpoints_; } inline const auto &name_hash() const { return mc_name_hash_; } - inline const auto &endpoint_hash() const { return mc_endpoint_hash_; } const auto operator[](const std::string &address) const { return AddressedPort_(this, address); } @@ -52,9 +51,7 @@ public: return *this; } - // void listen() const { listen__(m_ss_.get_token()); }; virtual void stop() const = 0; - template auto &callback(const std::string &name) const { return GetCallbackHelper_>::type>(this).template operator()(name); } @@ -66,8 +63,9 @@ protected: private: const enum port_types_e mc_type_; - const std::string mc_name_, mc_endpoint_; - const size_t mc_name_hash_, mc_endpoint_hash_; + const std::string mc_name_; + const std::map mc_endpoints_; + const size_t mc_name_hash_; template class GetCallbackHelper_; template class GetCallbackHelper_> { diff --git a/src/port_dealer_impl.hpp b/src/port_dealer_impl.hpp index 5c44d47..4653b7d 100644 --- a/src/port_dealer_impl.hpp +++ b/src/port_dealer_impl.hpp @@ -9,8 +9,8 @@ template class PortImpl : public PortImplBase { public: - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) {} + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) {} // Send to socket depending on implementation void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; diff --git a/src/port_factory.hpp b/src/port_factory.hpp index 8d155fd..5872961 100644 --- a/src/port_factory.hpp +++ b/src/port_factory.hpp @@ -3,8 +3,8 @@ #include "port.hpp" template -auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, - std::tuple &&args = {}) { - return std::make_unique, std::tuple>>(pt, name, endpoint, zmq_ctx, std::forward...>>(adapters), +auto makePort(enum port_types_e pt, const std::string &name, const std::map &endpoints, zmq::context_t &zmq_ctx, + std::tuple...> &&adapters, std::tuple &&args = {}) { + return std::make_unique, std::tuple>>(pt, name, endpoints, zmq_ctx, std::forward...>>(adapters), std::forward>(args)); } diff --git a/src/port_impl_base.hpp b/src/port_impl_base.hpp index 92aa41f..67c842e 100644 --- a/src/port_impl_base.hpp +++ b/src/port_impl_base.hpp @@ -18,8 +18,8 @@ public: MSGPACK_DEFINE(typehash, data); }; - PortImplBase(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : mc_endpoint__(endpoint), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {} + PortImplBase(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : mc_endpoints__(endpoints), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {} virtual ~PortImplBase() = default; virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0; @@ -38,7 +38,7 @@ protected: mutable std::future m_listener_thread__; mutable zmq::socket_t m_sock__; zmq::context_t &m_ctx__; - const std::string mc_endpoint__; + const std::map mc_endpoints__; const Callback mc_cbk__; const Port *mc_port__; virtual void listen__(std::stop_token st) const = 0; diff --git a/src/port_pair_impl.hpp b/src/port_pair_impl.hpp index b49b224..9c94c53 100644 --- a/src/port_pair_impl.hpp +++ b/src/port_pair_impl.hpp @@ -9,8 +9,14 @@ template class PortImpl : public PortImplBase { public: - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) {} + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } + } // Send to socket depending on implementation void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; diff --git a/src/port_publisher_impl.hpp b/src/port_publisher_impl.hpp index 9b29c81..7e99a11 100644 --- a/src/port_publisher_impl.hpp +++ b/src/port_publisher_impl.hpp @@ -9,10 +9,13 @@ template class PortImpl final : public PortImplBase { public: - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub); - this->m_sock__.bind(this->mc_endpoint__); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } } ~PortImpl() override {} diff --git a/src/port_pull_impl.hpp b/src/port_pull_impl.hpp index a2a391f..d047dc5 100644 --- a/src/port_pull_impl.hpp +++ b/src/port_pull_impl.hpp @@ -12,10 +12,13 @@ template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pull); - this->m_sock__.connect(this->mc_endpoint__); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } listen__(this->stop_source().get_token()); } diff --git a/src/port_push_impl.hpp b/src/port_push_impl.hpp index 61c2f89..9954efe 100644 --- a/src/port_push_impl.hpp +++ b/src/port_push_impl.hpp @@ -9,10 +9,13 @@ template class PortImpl : public PortImplBase { public: - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push); - this->m_sock__.bind(this->mc_endpoint__); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } } // Send to socket depending on implementation diff --git a/src/port_rep_impl.hpp b/src/port_rep_impl.hpp index 303d5f6..a0612c4 100644 --- a/src/port_rep_impl.hpp +++ b/src/port_rep_impl.hpp @@ -16,10 +16,12 @@ template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::rep); - this->m_sock__.bind(this->mc_endpoint__); + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.bind(ep); + } listen__(this->stop_source().get_token()); } diff --git a/src/port_req_impl.hpp b/src/port_req_impl.hpp index 3ed2651..a2096de 100644 --- a/src/port_req_impl.hpp +++ b/src/port_req_impl.hpp @@ -2,6 +2,7 @@ #include "port_impl_base.hpp" #include "port_types.hpp" +#include #define FMT_HEADER_ONLY #include @@ -10,10 +11,13 @@ template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::req); - this->m_sock__.connect(this->mc_endpoint__); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } } void send(const std::string &addr, const msgpack::sbuffer &data) const override { diff --git a/src/port_router_impl.hpp b/src/port_router_impl.hpp index 54471d8..6f3831f 100644 --- a/src/port_router_impl.hpp +++ b/src/port_router_impl.hpp @@ -9,8 +9,8 @@ template class PortImpl : public PortImplBase { public: - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)) {} + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)) {} // Send to socket depending on implementation void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; diff --git a/src/port_subscriber_impl.hpp b/src/port_subscriber_impl.hpp index fe201f9..498d9b0 100644 --- a/src/port_subscriber_impl.hpp +++ b/src/port_subscriber_impl.hpp @@ -16,10 +16,13 @@ template class PortImpl : public PortImplBase { public: using base_t = PortImplBase; - PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list &topics, Callback &&callback) - : PortImplBase(port, zmq_ctx, endpoint, std::forward(callback)), mc_topics_(topics) { + PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map &endpoints, const std::list &topics, Callback &&callback) + : PortImplBase(port, zmq_ctx, endpoints, std::forward(callback)), mc_topics_(topics) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub); - this->m_sock__.connect(this->mc_endpoint__); + + for (const auto &[_, ep] : this->mc_endpoints__) { + this->m_sock__.connect(ep); + } for (const auto &topic : mc_topics_) { this->m_sock__.set(zmq::sockopt::subscribe, topic); diff --git a/src/port_types.hpp b/src/port_types.hpp index d7d1ae1..f389489 100644 --- a/src/port_types.hpp +++ b/src/port_types.hpp @@ -8,11 +8,29 @@ enum class port_types_e : uint32_t { SUB, REQ, REP, - ROUTER, - DEALER, PUSH, PULL, PAIR, + RADIO, + DISH, + + ROUTER, + DEALER, }; template class PortImpl; + +struct endpoints_base_s {}; +template struct endpoints_s; + +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; +template<> struct endpoints_s : endpoints_base_s {}; diff --git a/src/templates.cpp b/src/templates.cpp index f252aa1..317f3d3 100644 --- a/src/templates.cpp +++ b/src/templates.cpp @@ -119,7 +119,11 @@ int main(int argc, char *argv[], char *envp[]) { // TODO: merge all builders to one complex "Module" builder auto publisher_module = makeModule(argc, argv, envp, "publisher_module", zmq_ctx, std::tuple{ - makePort(PUB, "publisher_port", "inproc://PUB-SUB" /* This port will publish messages here */, zmq_ctx, + makePort(PUB, "publisher_port", + { + {"test", "inproc://PUB-SUB"}, + } /* This port will publish messages here */, + zmq_ctx, std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) @@ -147,7 +151,11 @@ int main(int argc, char *argv[], char *envp[]) { // Make module that contains only 1 port working on SUBSCRIBER pattern auto subscriber_module = makeModule(argc, argv, envp, "subscriber_module", zmq_ctx, std::tuple{ - makePort(SUB, "subscriber_port", "inproc://PUB-SUB" /* this port will read data here */, zmq_ctx, + makePort(SUB, "subscriber_port", + { + {"test", "inproc://PUB-SUB"}, + } /* this port will read data here */, + zmq_ctx, std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) @@ -180,7 +188,11 @@ int main(int argc, char *argv[], char *envp[]) { auto pusher_module = makeModule(argc, argv, envp, "pusher_module", zmq_ctx, std::tuple{ - makePort(PUSH, "push_port", "inproc://PUSH-PULL" /* This port will publish messages here */, zmq_ctx, + makePort(PUSH, "push_port", + { + {"test", "inproc://PUSH-PULL"}, + } /* This port will publish messages here */, + zmq_ctx, std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) @@ -207,7 +219,11 @@ int main(int argc, char *argv[], char *envp[]) { auto puller_module = makeModule(argc, argv, envp, "puller_module", zmq_ctx, std::tuple{ - makePort(PULL, "pull_port", "inproc://PUSH-PULL" /* This port will publish messages here */, zmq_ctx, + makePort(PULL, "pull_port", + { + {"test", "inproc://PUSH-PULL"}, + } /* This port will publish messages here */, + zmq_ctx, std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) @@ -234,7 +250,11 @@ int main(int argc, char *argv[], char *envp[]) { auto req_module = makeModule(argc, argv, envp, "req_module", zmq_ctx, std::tuple{ - makePort(REQ, "req_port", "inproc://REQ-REP" /* This port will publish messages here */, zmq_ctx, + makePort(REQ, "req_port", + { + {"test", "inproc://REQ-REP"}, + } /* This port will publish messages here */, + zmq_ctx, std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) @@ -261,7 +281,11 @@ int main(int argc, char *argv[], char *envp[]) { auto rep_module = makeModule(argc, argv, envp, "rep_module", zmq_ctx, std::tuple{ - makePort(REP, "rep_port", "inproc://REQ-REP" /* This port will publish messages here */, zmq_ctx, + makePort(REP, "rep_port", + { + {"test", "inproc://REQ-REP"}, + } /* This port will publish messages here */, + zmq_ctx, std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int)