diff --git a/CMakeLists.txt b/CMakeLists.txt index d8a2d55..aad2ac1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,3 +61,5 @@ target_link_directories(${CMAKE_PROJECT_NAME} ${libzmq_BINARY_DIR} ${cppzmq_BINARY_DIR} ) + +target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE zmq) diff --git a/src/module_factory.hpp b/src/module_factory.hpp index c9bee3c..a783d0e 100644 --- a/src/module_factory.hpp +++ b/src/module_factory.hpp @@ -3,7 +3,7 @@ #include "module.hpp" #include -template +template auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) { return std::make_unique>(argc, argv, envp, name, zmq_ctx, std::forward...>>(ports)); } diff --git a/src/port.hpp b/src/port.hpp index eebe74f..2ace523 100644 --- a/src/port.hpp +++ b/src/port.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "port_base.hpp" #include "ports.hpp" @@ -14,7 +15,6 @@ using namespace boost::callable_traits; template class Port; template class Port, std::tuple> : public PortBase...>>> { - public: using port_data_type_t = std::tuple_element_t<0, std::tuple...>>; @@ -34,19 +34,22 @@ public: }(std::make_index_sequence{})) { std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward(args)...); }, std::forward(args)); } - + void stop() const override { this->stop__(); mc_impl_->close(); } protected: - void send__(const void *data, size_t size, size_t hash) const override { + void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override { + const void *adapter_ptr = nullptr; + tp::for_each(mc_adapters_, [&](const auto &e) { const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e; if (adapter_typehash == hash) { using adapter_in_type_t = std::remove_cvref_tdecoder())>>; + adapter_ptr = &adapter; typename PortImplBase::port_payload_s payload = { .typehash = hash, @@ -55,9 +58,13 @@ protected: msgpack::sbuffer buf; msgpack::pack(buf, payload); - mc_impl_->send(buf); + mc_impl_->send(addr, buf); } }); + + if (!adapter_ptr) { + throw std::runtime_error(fmt::format("No suitable adapter found for type #{}\r\n", hash)); + } } void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final { diff --git a/src/port_base.hpp b/src/port_base.hpp index 0309fae..760c909 100644 --- a/src/port_base.hpp +++ b/src/port_base.hpp @@ -9,28 +9,24 @@ #include #include -#include "port_types.hpp" #include "adapter_base.hpp" - -template constexpr std::string_view type_name() { - using namespace std; -#ifdef __clang__ - string_view p = __PRETTY_FUNCTION__; - return string_view(p.data() + 34, p.size() - 34 - 1); -#elif defined(__GNUC__) - string_view p = __PRETTY_FUNCTION__; -# if __cplusplus < 201402 - return string_view(p.data() + 36, p.size() - 36 - 1); -# else - return string_view(p.data() + 49, p.find(';', 49) - 49); -# endif -#elif defined(_MSC_VER) - string_view p = __FUNCSIG__; - return string_view(p.data() + 84, p.size() - 84 - 7); -#endif -} +#include "port_types.hpp" +#include "utils.hpp" template class PortBase { + class AddressedPort_ { + public: + AddressedPort_(const PortBase *port, const std::string &address) : mc_addr_(address), mc_port_(port) {} + template const auto &operator<<(const InType &in) const { + mc_port_->send__(mc_addr_, &in, sizeof(InType), typeid(InType).hash_code()); + return *this; + } + + private: + const std::string mc_addr_; + const PortBase *mc_port_; + }; + public: PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx) : mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash()(name)), mc_endpoint_hash_(std::hash()(endpoint)) {} @@ -41,8 +37,10 @@ public: inline const auto &name_hash() const { return mc_name_hash_; } inline const auto &endpoint_hash() const { return mc_endpoint_hash_; } + const auto operator[](const std::string &address) const { return AddressedPort_(this, address); } + template const PortBase &operator<<(const InType &in) const { - send__(&in, sizeof(InType), typeid(InType).hash_code()); + send__("", &in, sizeof(InType), typeid(InType).hash_code()); return *this; } @@ -56,7 +54,7 @@ public: protected: void stop__() const { m_ss_.request_stop(); } - virtual void send__(const void *data, size_t size, size_t type_hash) const = 0; + virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0; virtual void listen__(std::stop_token) const = 0; virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0; diff --git a/src/port_dealer_impl.hpp b/src/port_dealer_impl.hpp index e11ca04..012632a 100644 --- a/src/port_dealer_impl.hpp +++ b/src/port_dealer_impl.hpp @@ -17,6 +17,6 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_factory.hpp b/src/port_factory.hpp index 8fa1f38..8d155fd 100644 --- a/src/port_factory.hpp +++ b/src/port_factory.hpp @@ -4,7 +4,7 @@ template auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, - std::tuple &&args) { + std::tuple &&args = {}) { return std::make_unique, std::tuple>>(pt, name, endpoint, zmq_ctx, std::forward...>>(adapters), std::forward>(args)); } diff --git a/src/port_impl_base.hpp b/src/port_impl_base.hpp index 576705a..375c964 100644 --- a/src/port_impl_base.hpp +++ b/src/port_impl_base.hpp @@ -21,7 +21,7 @@ public: virtual ~PortImplBase() = default; virtual void listen(std::stop_token st) const = 0; - virtual void send(const msgpack::sbuffer &data) const = 0; + virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0; void close() { m_sock__.close(); diff --git a/src/port_pair_impl.hpp b/src/port_pair_impl.hpp index a6fb58a..be1e06b 100644 --- a/src/port_pair_impl.hpp +++ b/src/port_pair_impl.hpp @@ -17,5 +17,5 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_publisher_impl.hpp b/src/port_publisher_impl.hpp index 7e7edaa..9e40d5b 100644 --- a/src/port_publisher_impl.hpp +++ b/src/port_publisher_impl.hpp @@ -9,8 +9,7 @@ template class PortImpl final : public PortImplBase { public: - PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list &topics, Callback &&callback) - : PortImplBase(zmq_ctx, endpoint, std::forward(callback)), mc_topics_(topics) { + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) { this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub); this->m_sock__.bind(this->mc_endpoint__); } @@ -20,17 +19,12 @@ public: void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override { + void send(const std::string &addr, const msgpack::sbuffer &data) const override { try { - for (const auto &topic : mc_topics_) { - this->m_sock__.send(zmq::message_t(topic), zmq::send_flags::sndmore); - this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); - } + this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore); + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); } catch (const zmq::error_t &err) { fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); } }; - -private: - const std::list mc_topics_; }; diff --git a/src/port_pull_impl.hpp b/src/port_pull_impl.hpp index 314d1ab..cec7730 100644 --- a/src/port_pull_impl.hpp +++ b/src/port_pull_impl.hpp @@ -17,5 +17,5 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_push_impl.hpp b/src/port_push_impl.hpp index bb84d25..dd0c794 100644 --- a/src/port_push_impl.hpp +++ b/src/port_push_impl.hpp @@ -17,5 +17,5 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_rep_impl.hpp b/src/port_rep_impl.hpp index 2c914f6..60a018f 100644 --- a/src/port_rep_impl.hpp +++ b/src/port_rep_impl.hpp @@ -17,6 +17,6 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_req_impl.hpp b/src/port_req_impl.hpp index f98a755..2a505d1 100644 --- a/src/port_req_impl.hpp +++ b/src/port_req_impl.hpp @@ -17,6 +17,6 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_router_impl.hpp b/src/port_router_impl.hpp index 46d07e7..4c5e431 100644 --- a/src/port_router_impl.hpp +++ b/src/port_router_impl.hpp @@ -17,6 +17,6 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override {}; + void send(const std::string &addr, const msgpack::sbuffer &data) const override {}; }; diff --git a/src/port_subscriber_impl.hpp b/src/port_subscriber_impl.hpp index 1e07785..8e4007a 100644 --- a/src/port_subscriber_impl.hpp +++ b/src/port_subscriber_impl.hpp @@ -48,7 +48,7 @@ public: } // Send to socket depending on implementation - void send(const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; + void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; private: const std::list mc_topics_; diff --git a/src/templates.cpp b/src/templates.cpp index 26dc32e..9f26017 100644 --- a/src/templates.cpp +++ b/src/templates.cpp @@ -26,7 +26,10 @@ void entry(int32_t argc, char **argv, char **envp, const std::unordered_map void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name()); }); subscriber.listen(); - publisher << 1 << 2 << double{3.f} << std::string("test"); + publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test"); + publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test"); + publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test"); + publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test"); std::this_thread::sleep_for(std::chrono::milliseconds(1000u)); subscriber.stop(); @@ -37,83 +40,77 @@ int main(int argc, char *argv[], char *envp[]) { zmq::context_t zmq_ctx; auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx, - std::tuple{ - makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx, - std::tuple{ - makeAdapter("int-string-int", std::pair{ - [](const int32_t &i) -> std::vector { - auto str = std::to_string(i + 5); - return {str.begin(), str.end()}; - }, + std::tuple{ + makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx, + std::tuple{ + makeAdapter("int-string-int", std::pair{ + [](const int32_t &i) -> std::vector { + auto str = std::to_string(i + 5); + return {str.begin(), str.end()}; + }, - [](const std::vector &s) -> int32_t { return 5; }, - }), + [](const std::vector &s) -> int32_t { return 5; }, + }), - makeAdapter("string-string-string", std::pair{ - [](const std::string &i) -> std::vector { - auto str = i + "_test"; - return {str.begin(), str.end()}; - }, + makeAdapter("string-string-string", std::pair{ + [](const std::string &i) -> std::vector { + auto str = i + "_test"; + return {str.begin(), str.end()}; + }, - [](const std::vector &i) -> std::string { return "works!"; }, - }), + [](const std::vector &i) -> std::string { return "works!"; }, + }), - makeAdapter("double-string-double", std::pair{ - [](const double &i) -> std::vector { - auto str = std::to_string(i / 2.f); - return {str.begin(), str.end()}; - }, + makeAdapter("double-string-double", std::pair{ + [](const double &i) -> std::vector { + auto str = std::to_string(i / 2.f); + return {str.begin(), str.end()}; + }, - [](const std::vector &s) -> double { return .1f; }, - }), - }, - std::tuple{ - std::list{ - "topic0", - "topic1", - "topic2", + [](const std::vector &s) -> double { return .1f; }, + }), + }), + + makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx, + std::tuple{ + makeAdapter("int-string-int", std::pair{ + [](const int32_t &i) -> std::vector { + auto str = std::to_string(i + 5); + return {str.begin(), str.end()}; + }, + + [](const std::vector &s) -> int32_t { return 5; }, + }), + + makeAdapter("string-string-string", std::pair{ + [](const std::string &i) -> std::vector { + auto str = i + "_test"; + return {str.begin(), str.end()}; + }, + + [](const std::vector &i) -> std::string { return "works!"; }, + }), + + makeAdapter("double-string-double", std::pair{ + [](const double &i) -> std::vector { + auto str = std::to_string(i / 2.f); + return {str.begin(), str.end()}; + }, + + [](const std::vector &s) -> double { return .1f; }, + }), }, - }), - makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx, - std::tuple{ - makeAdapter("int-string-int", std::pair{ - [](const int32_t &i) -> std::vector { - auto str = std::to_string(i + 5); - return {str.begin(), str.end()}; - }, + std::tuple{ + std::list{ + "topic0", + "topic1", + "topic2", + "topic3", + }, + }), + }); - [](const std::vector &s) -> int32_t { return 5; }, - }), - - makeAdapter("string-string-string", std::pair{ - [](const std::string &i) -> std::vector { - auto str = i + "_test"; - return {str.begin(), str.end()}; - }, - - [](const std::vector &i) -> std::string { return "works!"; }, - }), - - makeAdapter("double-string-double", std::pair{ - [](const double &i) -> std::vector { - auto str = std::to_string(i / 2.f); - return {str.begin(), str.end()}; - }, - - [](const std::vector &s) -> double { return .1f; }, - }), - }, - - std::tuple{ - std::list{ - "topic0", - "topic1", - "topic2", - }, - }), - }); - - mod.run(entry); + mod->run(entry); return 0; }