From 83b8a98f1c3cadbb515ffd6cdc10fb81a65688d9 Mon Sep 17 00:00:00 2001 From: Oleg Shishlyannikov Date: Sun, 26 Oct 2025 12:03:37 +0300 Subject: [PATCH] init --- .clang-format | 3 + .clang_complete | 6 ++ .clangd | 2 + CMakeLists.txt | 63 ++++++++++++++++ README.md | 0 src/adapter.hpp | 46 ++++++++++++ src/adapter_base.hpp | 18 +++++ src/adapter_factory.hpp | 7 ++ src/module.hpp | 87 +++++++++++++++++++++++ src/module_factory.hpp | 9 +++ src/port.hpp | 134 +++++++++++++++++++++++++++++++++++ src/port_base.hpp | 68 ++++++++++++++++++ src/port_dealer_impl.hpp | 22 ++++++ src/port_factory.hpp | 10 +++ src/port_impl_base.hpp | 40 +++++++++++ src/port_pair_impl.hpp | 21 ++++++ src/port_publisher_impl.hpp | 36 ++++++++++ src/port_pull_impl.hpp | 21 ++++++ src/port_push_impl.hpp | 21 ++++++ src/port_rep_impl.hpp | 22 ++++++ src/port_req_impl.hpp | 22 ++++++ src/port_router_impl.hpp | 22 ++++++ src/port_subscriber_impl.hpp | 55 ++++++++++++++ src/port_types.hpp | 18 +++++ src/ports.hpp | 13 ++++ src/templates.cpp | 119 +++++++++++++++++++++++++++++++ 26 files changed, 885 insertions(+) create mode 100644 .clang-format create mode 100644 .clang_complete create mode 100644 .clangd create mode 100644 CMakeLists.txt create mode 100644 README.md create mode 100644 src/adapter.hpp create mode 100644 src/adapter_base.hpp create mode 100644 src/adapter_factory.hpp create mode 100644 src/module.hpp create mode 100644 src/module_factory.hpp create mode 100644 src/port.hpp create mode 100644 src/port_base.hpp create mode 100644 src/port_dealer_impl.hpp create mode 100644 src/port_factory.hpp create mode 100644 src/port_impl_base.hpp create mode 100644 src/port_pair_impl.hpp create mode 100644 src/port_publisher_impl.hpp create mode 100644 src/port_pull_impl.hpp create mode 100644 src/port_push_impl.hpp create mode 100644 src/port_rep_impl.hpp create mode 100644 src/port_req_impl.hpp create mode 100644 src/port_router_impl.hpp create mode 100644 src/port_subscriber_impl.hpp create mode 100644 src/port_types.hpp create mode 100644 src/ports.hpp create mode 100644 src/templates.cpp diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..8af2b2a --- /dev/null +++ b/.clang-format @@ -0,0 +1,3 @@ +Language: Cpp +ColumnLimit: 180 +IndentPPDirectives: AfterHash diff --git a/.clang_complete b/.clang_complete new file mode 100644 index 0000000..53e1e1c --- /dev/null +++ b/.clang_complete @@ -0,0 +1,6 @@ +-I/usr/include/c++/14 +-I/usr/include/x86_64-linux-gnu/c++/14 +-I../build/_deps/tpl-src +-I./thirdparty + +-std=gnu++23 diff --git a/.clangd b/.clangd new file mode 100644 index 0000000..b4dd5e5 --- /dev/null +++ b/.clangd @@ -0,0 +1,2 @@ +CompileFlags: + Add: ['-std=gnu++23'] diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..d8a2d55 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,63 @@ +cmake_minimum_required(VERSION 3.14) + +include(FetchContent) +set(FETCHCONTENT_QUIET FALSE) +set(FTXUI_BUILD_EXAMPLES ON) + +FetchContent_Declare( + fmt + GIT_REPOSITORY https://github.com/fmtlib/fmt + GIT_PROGRESS TRUE +) + +FetchContent_MakeAvailable(fmt) +FetchContent_Declare( + inja + GIT_REPOSITORY https://github.com/pantor/inja + GIT_PROGRESS TRUE + GIT_TAG main +) + +FetchContent_MakeAvailable(inja) + +FetchContent_Declare( + libzmq + GIT_REPOSITORY https://github.com/zeromq/libzmq + GIT_PROGRESS TRUE +) + +FetchContent_MakeAvailable(libzmq) + +FetchContent_Declare( + cppzmq + GIT_REPOSITORY https://github.com/zeromq/cppzmq + GIT_PROGRESS TRUE +) + +FetchContent_MakeAvailable(cppzmq) + +project(module_arch_POC) +set(CMAKE_CXX_STANDARD 23) +set(CMAKE_BUILD_TYPE Debug) +set(CMAKE_MESSAGE_LOG_LEVEL TRACE) +set(CMAKE_VERBOSE_MAKEFILE OFF) +set(CMAKE_CXX_COMPILER /usr/bin/clang++) + +file(GLOB_RECURSE SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp ${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp) +add_executable(${CMAKE_PROJECT_NAME} ${SOURCES}) + +target_include_directories(${CMAKE_PROJECT_NAME} PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${fmt_SOURCE_DIR}/include + ${inja_SOURCE_DIR}/include + ${libzmq_SOURCE_DIR}/include + ${cppzmq_SOURCE_DIR}/include +) + +target_link_directories(${CMAKE_PROJECT_NAME} + PRIVATE + ${fmt_BINARY_DIR} + ${inja_BINARY_DIR} + ${libzmq_BINARY_DIR} + ${cppzmq_BINARY_DIR} +) diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/src/adapter.hpp b/src/adapter.hpp new file mode 100644 index 0000000..5a86e17 --- /dev/null +++ b/src/adapter.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#define FMT_HEADER_ONLY +#include +#include + +#include "adapter_base.hpp" + +using namespace boost::callable_traits; + +template + requires( + // Encoder return type is same as Decoder argument type, encoder and decoder have only 1 arg + std::tuple_size_v> == 1 && std::is_same_v, std::remove_cvref_t>>> && + std::tuple_size_v> == 1 && std::is_same_v, std::remove_cvref_t>>>) + +class Adapter : public AdapterBase>> { +public: + using encoder_type_t = Encoder; + using decoder_type_t = Decoder; + using adapter_type_t = Adapter; + using callback_arg_type_t = std::tuple_element_t<0, boost::callable_traits::args_t>; + using base_t = AdapterBase; + using callback_type_t = base_t::callback_type_t; + + Adapter(const std::string &name, std::pair &&fns) + : AdapterBase(name), mc_enc_(std::forward(fns.first)), mc_dec_(std::forward(fns.second)) {} + + inline const auto &encoder() const { return mc_enc_; } + inline const auto &decoder() const { return mc_dec_; } + + typename base_t::callback_type_t &callback() const override final { return m_callback_; }; + +private: + const Encoder mc_enc_; + const Decoder mc_dec_; + mutable callback_type_t m_callback_; +}; diff --git a/src/adapter_base.hpp b/src/adapter_base.hpp new file mode 100644 index 0000000..57b02e4 --- /dev/null +++ b/src/adapter_base.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include + +template class AdapterBase { +public: + using callback_type_t = boost::signals2::signal; + + AdapterBase(const std::string &name) : mc_name_(name) {} + + virtual callback_type_t &callback() const = 0; + inline const auto &name() const { return mc_name_; } + +protected: +private: + const std::string mc_name_; +}; + diff --git a/src/adapter_factory.hpp b/src/adapter_factory.hpp new file mode 100644 index 0000000..f78e49f --- /dev/null +++ b/src/adapter_factory.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include "adapter.hpp" + +template auto makeAdapter(const std::string &name, std::pair &&fns) { + return std::make_unique>(name, std::forward>(fns)); +} diff --git a/src/module.hpp b/src/module.hpp new file mode 100644 index 0000000..c20823b --- /dev/null +++ b/src/module.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "port_base.hpp" +#include "tuple.hpp" + +enum class module_type_e : uint32_t { + STANDALONE, + INCOMPOSITION, +}; + +template class ModuleBase { +public: + ModuleBase(int32_t argc, char **argv, char **envp, const std::string &name) + : mc_name_(name), mc_cli_args_({ + .argc = argc, + .argv = argv, + .envp = envp, + }) {} + + inline const auto &name() const { return mc_name_; } + + virtual const PortBase &port(const std::string &name) const = 0; + virtual void run(void (*entry)(int32_t, char **, char **, const std::unordered_map *> &)) const = 0; + +protected: + const auto &cli__() const { return mc_cli_args_; } + +private: + const std::string mc_name_; + + const struct { + int32_t argc; + char **argv, **envp; + } mc_cli_args_; +}; + +template class Module : public ModuleBase>> { +public: + using port_data_type_t = std::tuple_element_t<0, std::tuple>; + + Module(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) + : ModuleBase(argc, argv, envp, name), mc_ports_([&](std::index_sequence) { + return std::make_tuple([&]() { + using port_type_t = std::remove_cvref_t(ports))>; + auto &port = std::get(ports); + + return std::make_tuple(port->name(), std::hash()(port->name()), std::forward(port)); + }.template operator()()...); + }(std::make_index_sequence{})) {} + + const PortBase &port(const std::string &name) const override { + const PortBase *ret = nullptr; + size_t hash = std::hash()(name); + + tp::for_each(mc_ports_, [&](const auto &p) { + if (!ret) { + const auto &[port_name, name_hash, port] = p; + if (name == port_name) { + ret = static_cast *>(port.get()); + } + } + }); + + if (!ret) { + throw std::runtime_error(fmt::format("Port with name '{}' not found in '{}' module\r\n", name, this->name())); + } + + return *ret; + } + + void run(void (*entry)(int32_t, char **, char **, const std::unordered_map *> &)) const override { + entry(this->cli__().argc, this->cli__().argv, this->cli__().envp, + [&](std::index_sequence) -> std::unordered_map *> { + return {std::make_pair(std::get<0u>(std::get(ports_())), std::get<2u>(std::get(ports_())).get())...}; + }(std::make_index_sequence>>{})); + } + +private: + inline const auto &ports_() const { return mc_ports_; } + const std::tuple>...> mc_ports_; +}; diff --git a/src/module_factory.hpp b/src/module_factory.hpp new file mode 100644 index 0000000..c9bee3c --- /dev/null +++ b/src/module_factory.hpp @@ -0,0 +1,9 @@ +#pragma once + +#include "module.hpp" +#include + +template +auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple...> &&ports) { + return std::make_unique>(argc, argv, envp, name, zmq_ctx, std::forward...>>(ports)); +} diff --git a/src/port.hpp b/src/port.hpp new file mode 100644 index 0000000..eebe74f --- /dev/null +++ b/src/port.hpp @@ -0,0 +1,134 @@ +#pragma once + +#include +#include +#include +#include + +#include "port_base.hpp" +#include "ports.hpp" +#include "tuple.hpp" + +using namespace boost::callable_traits; + +template class Port; +template +class Port, std::tuple> : public PortBase...>>> { + +public: + using port_data_type_t = std::tuple_element_t<0, std::tuple...>>; + + Port(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, + std::tuple &&args) + : PortBase(pt, name, endpoint, zmq_ctx), + + // Init adapters + mc_adapters_([&](std::index_sequence) { + return std::make_tuple([&]() { + using adapter_type_t = std::remove_cvref_t(adapters))>; + using adapter_input_type_t = return_type_t; + + return std::make_tuple(std::get(adapters)->name(), std::hash()(std::get(adapters)->name()), + typeid(std::remove_cvref_t).hash_code(), std::forward>(std::get(adapters))); + }.template operator()()...); + }(std::make_index_sequence{})) { + std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward(args)...); }, std::forward(args)); + } + + void stop() const override { + this->stop__(); + mc_impl_->close(); + } + +protected: + void send__(const void *data, size_t size, size_t hash) const override { + tp::for_each(mc_adapters_, [&](const auto &e) { + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e; + + if (adapter_typehash == hash) { + using adapter_in_type_t = std::remove_cvref_tdecoder())>>; + + typename PortImplBase::port_payload_s payload = { + .typehash = hash, + .data = adapter->encoder()(*reinterpret_cast(data)), + }; + + msgpack::sbuffer buf; + msgpack::pack(buf, payload); + mc_impl_->send(buf); + } + }); + } + + void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final { + void *ret = nullptr; + + tp::for_each(mc_adapters_, [&](auto &a) { + if (!ret) { + auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = a; + if (adapter_typehash == typehash && adapter_namehash == namehash) { + ret = reinterpret_cast(adapter.get()); + } + } + }); + + if (!ret) { + throw std::runtime_error(fmt::format("No adapter '{}' in port '{}'\r\n", name, this->name())); + } + + return ret; + }; + +private: + using base_t_ = PortBase; + using cbk_type_t_ = std::function; + + mutable std::tuple>...> mc_adapters_; + mutable std::unique_ptr> mc_impl_; + + void listen__(std::stop_token st) const override { mc_impl_->listen(st); } + + template void init_impl_(enum port_types_e pt, ImplArgs &&...args) const { + using enum port_types_e; + static constexpr auto make_null_impl_pair = []() consteval { + if constexpr (port_type == UNKNOWN) { + return std::make_pair(port_type, static_cast *>(nullptr)); + } else { + return std::make_pair(port_type, static_cast *>(nullptr)); + } + }; + + static constexpr auto impl_map = + std::make_tuple(make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), + make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), + make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), make_null_impl_pair.template operator()(), + make_null_impl_pair.template operator()()); + + tp::for_each(impl_map, [&](const auto &p) { + const auto &[type, null_pimpl] = p; + if (type == pt) { + using impl_type_t = std::remove_pointer_t; + + if constexpr (std::is_constructible_v) { + mc_impl_ = std::make_unique( + // Args + std::forward(args)..., + + // Callback + [this](const port_data_type_t &data, size_t hash) { + tp::for_each(mc_adapters_, [&](const auto &e) { + const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e; + if (adapter_typehash == hash) { + adapter->callback()(adapter->decoder()(data)); + } + }); + }); + } + } + }); + } +}; + +template +Port(enum port_types_e, const std::string &, const std::string &, zmq::context_t &, std::tuple...> &&, std::tuple &&) + -> Port, std::tuple>; diff --git a/src/port_base.hpp b/src/port_base.hpp new file mode 100644 index 0000000..0309fae --- /dev/null +++ b/src/port_base.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include +#include +#include +#include + +#define FMT_HEADER_ONLY +#include +#include + +#include "port_types.hpp" +#include "adapter_base.hpp" + +template constexpr std::string_view type_name() { + using namespace std; +#ifdef __clang__ + string_view p = __PRETTY_FUNCTION__; + return string_view(p.data() + 34, p.size() - 34 - 1); +#elif defined(__GNUC__) + string_view p = __PRETTY_FUNCTION__; +# if __cplusplus < 201402 + return string_view(p.data() + 36, p.size() - 36 - 1); +# else + return string_view(p.data() + 49, p.find(';', 49) - 49); +# endif +#elif defined(_MSC_VER) + string_view p = __FUNCSIG__; + return string_view(p.data() + 84, p.size() - 84 - 7); +#endif +} + +template class PortBase { +public: + PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx) + : mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash()(name)), mc_endpoint_hash_(std::hash()(endpoint)) {} + + inline const auto &type() const { return mc_type_; } + inline const auto &name() const { return mc_name_; } + inline const auto &endpoint() const { return mc_endpoint_; } + inline const auto &name_hash() const { return mc_name_hash_; } + inline const auto &endpoint_hash() const { return mc_endpoint_hash_; } + + template const PortBase &operator<<(const InType &in) const { + send__(&in, sizeof(InType), typeid(InType).hash_code()); + return *this; + } + + void listen() const { listen__(m_ss_.get_token()); }; + virtual void stop() const = 0; + + template auto &callback(const std::string &name) const { + return (*static_cast *>(get_adapter__(name, std::hash()(name), typeid(std::remove_cvref_t).hash_code()))).callback(); + } + +protected: + void stop__() const { m_ss_.request_stop(); } + + virtual void send__(const void *data, size_t size, size_t type_hash) const = 0; + virtual void listen__(std::stop_token) const = 0; + virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0; + +private: + const enum port_types_e mc_type_; + const std::string mc_name_, mc_endpoint_; + const size_t mc_name_hash_, mc_endpoint_hash_; + mutable std::stop_source m_ss_; +}; diff --git a/src/port_dealer_impl.hpp b/src/port_dealer_impl.hpp new file mode 100644 index 0000000..e11ca04 --- /dev/null +++ b/src/port_dealer_impl.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; + diff --git a/src/port_factory.hpp b/src/port_factory.hpp new file mode 100644 index 0000000..8fa1f38 --- /dev/null +++ b/src/port_factory.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include "port.hpp" + +template +auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple...> &&adapters, + std::tuple &&args) { + return std::make_unique, std::tuple>>(pt, name, endpoint, zmq_ctx, std::forward...>>(adapters), + std::forward>(args)); +} diff --git a/src/port_impl_base.hpp b/src/port_impl_base.hpp new file mode 100644 index 0000000..576705a --- /dev/null +++ b/src/port_impl_base.hpp @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include +#include +#include + +template class PortImplBase { + using port_data_type_t_ = std::remove_cvref_t>>; + +public: + struct port_payload_s { + size_t typehash; + port_data_type_t_ data; + MSGPACK_DEFINE(typehash, data); + }; + + PortImplBase(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : mc_endpoint__(endpoint), m_ctx__(zmq_ctx), mc_cbk__(callback) {} + + virtual ~PortImplBase() = default; + + virtual void listen(std::stop_token st) const = 0; + virtual void send(const msgpack::sbuffer &data) const = 0; + void close() { + m_sock__.close(); + + if (m_listener_thread__.valid()) { + m_listener_thread__.get(); + } + }; + +protected: + mutable std::future m_listener_thread__; + mutable zmq::socket_t m_sock__; + zmq::context_t &m_ctx__; + const std::string mc_endpoint__; + const Callback mc_cbk__; + +}; diff --git a/src/port_pair_impl.hpp b/src/port_pair_impl.hpp new file mode 100644 index 0000000..a6fb58a --- /dev/null +++ b/src/port_pair_impl.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; diff --git a/src/port_publisher_impl.hpp b/src/port_publisher_impl.hpp new file mode 100644 index 0000000..7e7edaa --- /dev/null +++ b/src/port_publisher_impl.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl final : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list &topics, Callback &&callback) + : PortImplBase(zmq_ctx, endpoint, std::forward(callback)), mc_topics_(topics) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub); + this->m_sock__.bind(this->mc_endpoint__); + } + + ~PortImpl() override {} + + void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override { + try { + for (const auto &topic : mc_topics_) { + this->m_sock__.send(zmq::message_t(topic), zmq::send_flags::sndmore); + this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait); + } + } catch (const zmq::error_t &err) { + fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); + } + }; + +private: + const std::list mc_topics_; +}; diff --git a/src/port_pull_impl.hpp b/src/port_pull_impl.hpp new file mode 100644 index 0000000..314d1ab --- /dev/null +++ b/src/port_pull_impl.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; diff --git a/src/port_push_impl.hpp b/src/port_push_impl.hpp new file mode 100644 index 0000000..bb84d25 --- /dev/null +++ b/src/port_push_impl.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; diff --git a/src/port_rep_impl.hpp b/src/port_rep_impl.hpp new file mode 100644 index 0000000..2c914f6 --- /dev/null +++ b/src/port_rep_impl.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; + diff --git a/src/port_req_impl.hpp b/src/port_req_impl.hpp new file mode 100644 index 0000000..f98a755 --- /dev/null +++ b/src/port_req_impl.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; + diff --git a/src/port_router_impl.hpp b/src/port_router_impl.hpp new file mode 100644 index 0000000..46d07e7 --- /dev/null +++ b/src/port_router_impl.hpp @@ -0,0 +1,22 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase(zmq_ctx, endpoint, std::forward(callback)) {} + void listen(std::stop_token st) const override { + while (!st.stop_requested()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100u)); + } + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override {}; +}; + diff --git a/src/port_subscriber_impl.hpp b/src/port_subscriber_impl.hpp new file mode 100644 index 0000000..1e07785 --- /dev/null +++ b/src/port_subscriber_impl.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include "port_impl_base.hpp" +#include "port_types.hpp" + +#define FMT_HEADER_ONLY +#include +#include + +template class PortImpl : public PortImplBase { +public: + using base_t = PortImplBase; + PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list &topics, Callback &&callback) + : PortImplBase(zmq_ctx, endpoint, std::forward(callback)), mc_topics_(topics) { + this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub); + this->m_sock__.connect(this->mc_endpoint__); + + for (const auto &topic : mc_topics_) { + this->m_sock__.set(zmq::sockopt::subscribe, topic); + } + } + + void listen(std::stop_token st) const override { + this->m_listener_thread__ = std::async( + std::launch::async, + [this](std::stop_token st) { + while (!st.stop_requested()) { + zmq::message_t msg; + + this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) { + fmt::print("Received envelope: {}\r\n", std::string(static_cast(msg.data()), msg.size())); + this->m_sock__.recv(msg).and_then([&](const auto &res) { + typename base_t::port_payload_s payload; + + msgpack::sbuffer buf; + buf.write(reinterpret_cast(msg.data()), msg.size()); + const auto [typehash, data] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload); + + this->mc_cbk__(data, typehash); + return std::optional(res); + }); + + return std::optional(res); + }); + } + }, + st); + } + + // Send to socket depending on implementation + void send(const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; + +private: + const std::list mc_topics_; +}; diff --git a/src/port_types.hpp b/src/port_types.hpp new file mode 100644 index 0000000..d7d1ae1 --- /dev/null +++ b/src/port_types.hpp @@ -0,0 +1,18 @@ +#pragma once + +#include + +enum class port_types_e : uint32_t { + UNKNOWN = 0, + PUB, + SUB, + REQ, + REP, + ROUTER, + DEALER, + PUSH, + PULL, + PAIR, +}; + +template class PortImpl; diff --git a/src/ports.hpp b/src/ports.hpp new file mode 100644 index 0000000..05baa0a --- /dev/null +++ b/src/ports.hpp @@ -0,0 +1,13 @@ +#pragma once + +#include "port_base.hpp" +#include "port_publisher_impl.hpp" +#include "port_subscriber_impl.hpp" +#include "port_push_impl.hpp" +#include "port_pull_impl.hpp" +#include "port_req_impl.hpp" +#include "port_rep_impl.hpp" +#include "port_router_impl.hpp" +#include "port_dealer_impl.hpp" +#include "port_pair_impl.hpp" + diff --git a/src/templates.cpp b/src/templates.cpp new file mode 100644 index 0000000..26dc32e --- /dev/null +++ b/src/templates.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include +#include +#include + +#include + +#define FMT_HEADER_ONLY +#include +#include + +#include "adapter_factory.hpp" +#include "module_factory.hpp" +#include "port_factory.hpp" + +void entry(int32_t argc, char **argv, char **envp, const std::unordered_map> *> &ports) { + const auto &subscriber = *ports.at("subscriber"); + const auto &publisher = *ports.at("publisher"); + + auto &a = subscriber.callback("int-string-int"); + auto &b = subscriber.callback("string-string-string"); + + a.connect([](const int32_t &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name()); }); + b.connect([](const std::string &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name()); }); + + subscriber.listen(); + publisher << 1 << 2 << double{3.f} << std::string("test"); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000u)); + subscriber.stop(); +} + +int main(int argc, char *argv[], char *envp[]) { + using enum port_types_e; + zmq::context_t zmq_ctx; + + auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx, + std::tuple{ + makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx, + std::tuple{ + makeAdapter("int-string-int", std::pair{ + [](const int32_t &i) -> std::vector { + auto str = std::to_string(i + 5); + return {str.begin(), str.end()}; + }, + + [](const std::vector &s) -> int32_t { return 5; }, + }), + + makeAdapter("string-string-string", std::pair{ + [](const std::string &i) -> std::vector { + auto str = i + "_test"; + return {str.begin(), str.end()}; + }, + + [](const std::vector &i) -> std::string { return "works!"; }, + }), + + makeAdapter("double-string-double", std::pair{ + [](const double &i) -> std::vector { + auto str = std::to_string(i / 2.f); + return {str.begin(), str.end()}; + }, + + [](const std::vector &s) -> double { return .1f; }, + }), + }, + std::tuple{ + std::list{ + "topic0", + "topic1", + "topic2", + }, + }), + + makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx, + std::tuple{ + makeAdapter("int-string-int", std::pair{ + [](const int32_t &i) -> std::vector { + auto str = std::to_string(i + 5); + return {str.begin(), str.end()}; + }, + + [](const std::vector &s) -> int32_t { return 5; }, + }), + + makeAdapter("string-string-string", std::pair{ + [](const std::string &i) -> std::vector { + auto str = i + "_test"; + return {str.begin(), str.end()}; + }, + + [](const std::vector &i) -> std::string { return "works!"; }, + }), + + makeAdapter("double-string-double", std::pair{ + [](const double &i) -> std::vector { + auto str = std::to_string(i / 2.f); + return {str.begin(), str.end()}; + }, + + [](const std::vector &s) -> double { return .1f; }, + }), + }, + + std::tuple{ + std::list{ + "topic0", + "topic1", + "topic2", + }, + }), + }); + + mod.run(entry); + return 0; +}