add addresses

This commit is contained in:
Oleg Shishlyannikov 2025-10-26 14:40:44 +03:00
parent 83b8a98f1c
commit f72156dbd1
16 changed files with 115 additions and 117 deletions

View File

@ -61,3 +61,5 @@ target_link_directories(${CMAKE_PROJECT_NAME}
${libzmq_BINARY_DIR} ${libzmq_BINARY_DIR}
${cppzmq_BINARY_DIR} ${cppzmq_BINARY_DIR}
) )
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE zmq)

View File

@ -3,7 +3,7 @@
#include "module.hpp" #include "module.hpp"
#include <memory> #include <memory>
template <enum module_type_e mt, typename... Ports> template <typename... Ports>
auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Ports>...> &&ports) { auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Ports>...> &&ports) {
return std::make_unique<Module<Ports...>>(argc, argv, envp, name, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Ports>...>>(ports)); return std::make_unique<Module<Ports...>>(argc, argv, envp, name, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Ports>...>>(ports));
} }

View File

@ -4,6 +4,7 @@
#include <boost/callable_traits/args.hpp> #include <boost/callable_traits/args.hpp>
#include <boost/callable_traits/return_type.hpp> #include <boost/callable_traits/return_type.hpp>
#include <cstddef> #include <cstddef>
#include <stdexcept>
#include "port_base.hpp" #include "port_base.hpp"
#include "ports.hpp" #include "ports.hpp"
@ -14,7 +15,6 @@ using namespace boost::callable_traits;
template <typename...> class Port; template <typename...> class Port;
template <typename... Adapters, typename... Args> template <typename... Adapters, typename... Args>
class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> { class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> {
public: public:
using port_data_type_t = std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>; using port_data_type_t = std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>;
@ -41,12 +41,15 @@ public:
} }
protected: protected:
void send__(const void *data, size_t size, size_t hash) const override { void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override {
const void *adapter_ptr = nullptr;
tp::for_each(mc_adapters_, [&](const auto &e) { tp::for_each(mc_adapters_, [&](const auto &e) {
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e; const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e;
if (adapter_typehash == hash) { if (adapter_typehash == hash) {
using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>; using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>;
adapter_ptr = &adapter;
typename PortImplBase<cbk_type_t_>::port_payload_s payload = { typename PortImplBase<cbk_type_t_>::port_payload_s payload = {
.typehash = hash, .typehash = hash,
@ -55,9 +58,13 @@ protected:
msgpack::sbuffer buf; msgpack::sbuffer buf;
msgpack::pack(buf, payload); msgpack::pack(buf, payload);
mc_impl_->send(buf); mc_impl_->send(addr, buf);
} }
}); });
if (!adapter_ptr) {
throw std::runtime_error(fmt::format("No suitable adapter found for type #{}\r\n", hash));
}
} }
void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final { void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final {

View File

@ -9,28 +9,24 @@
#include <fmt/format.h> #include <fmt/format.h>
#include <fmt/ranges.h> #include <fmt/ranges.h>
#include "port_types.hpp"
#include "adapter_base.hpp" #include "adapter_base.hpp"
#include "port_types.hpp"
template <class T> constexpr std::string_view type_name() { #include "utils.hpp"
using namespace std;
#ifdef __clang__
string_view p = __PRETTY_FUNCTION__;
return string_view(p.data() + 34, p.size() - 34 - 1);
#elif defined(__GNUC__)
string_view p = __PRETTY_FUNCTION__;
# if __cplusplus < 201402
return string_view(p.data() + 36, p.size() - 36 - 1);
# else
return string_view(p.data() + 49, p.find(';', 49) - 49);
# endif
#elif defined(_MSC_VER)
string_view p = __FUNCSIG__;
return string_view(p.data() + 84, p.size() - 84 - 7);
#endif
}
template <typename EncodedType> class PortBase { template <typename EncodedType> class PortBase {
class AddressedPort_ {
public:
AddressedPort_(const PortBase<EncodedType> *port, const std::string &address) : mc_addr_(address), mc_port_(port) {}
template <typename InType> const auto &operator<<(const InType &in) const {
mc_port_->send__(mc_addr_, &in, sizeof(InType), typeid(InType).hash_code());
return *this;
}
private:
const std::string mc_addr_;
const PortBase<EncodedType> *mc_port_;
};
public: public:
PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx) PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx)
: mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash<std::string>()(name)), mc_endpoint_hash_(std::hash<std::string>()(endpoint)) {} : mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash<std::string>()(name)), mc_endpoint_hash_(std::hash<std::string>()(endpoint)) {}
@ -41,8 +37,10 @@ public:
inline const auto &name_hash() const { return mc_name_hash_; } inline const auto &name_hash() const { return mc_name_hash_; }
inline const auto &endpoint_hash() const { return mc_endpoint_hash_; } inline const auto &endpoint_hash() const { return mc_endpoint_hash_; }
const auto operator[](const std::string &address) const { return AddressedPort_(this, address); }
template <typename InType> const PortBase<EncodedType> &operator<<(const InType &in) const { template <typename InType> const PortBase<EncodedType> &operator<<(const InType &in) const {
send__(&in, sizeof(InType), typeid(InType).hash_code()); send__("", &in, sizeof(InType), typeid(InType).hash_code());
return *this; return *this;
} }
@ -56,7 +54,7 @@ public:
protected: protected:
void stop__() const { m_ss_.request_stop(); } void stop__() const { m_ss_.request_stop(); }
virtual void send__(const void *data, size_t size, size_t type_hash) const = 0; virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0;
virtual void listen__(std::stop_token) const = 0; virtual void listen__(std::stop_token) const = 0;
virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0; virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0;

View File

@ -17,6 +17,6 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -4,7 +4,7 @@
template <typename... Adapters, typename... Args> template <typename... Adapters, typename... Args>
auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Adapters>...> &&adapters, auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Adapters>...> &&adapters,
std::tuple<Args...> &&args) { std::tuple<Args...> &&args = {}) {
return std::make_unique<Port<std::tuple<Adapters...>, std::tuple<Args...>>>(pt, name, endpoint, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(adapters), return std::make_unique<Port<std::tuple<Adapters...>, std::tuple<Args...>>>(pt, name, endpoint, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(adapters),
std::forward<std::tuple<Args...>>(args)); std::forward<std::tuple<Args...>>(args));
} }

View File

@ -21,7 +21,7 @@ public:
virtual ~PortImplBase() = default; virtual ~PortImplBase() = default;
virtual void listen(std::stop_token st) const = 0; virtual void listen(std::stop_token st) const = 0;
virtual void send(const msgpack::sbuffer &data) const = 0; virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0;
void close() { void close() {
m_sock__.close(); m_sock__.close();

View File

@ -17,5 +17,5 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -9,8 +9,7 @@
template <typename Callback> class PortImpl<port_types_e::PUB, Callback> final : public PortImplBase<Callback> { template <typename Callback> class PortImpl<port_types_e::PUB, Callback> final : public PortImplBase<Callback> {
public: public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list<std::string> &topics, Callback &&callback) PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {
: PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)), mc_topics_(topics) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub); this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub);
this->m_sock__.bind(this->mc_endpoint__); this->m_sock__.bind(this->mc_endpoint__);
} }
@ -20,17 +19,12 @@ public:
void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); } void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override { void send(const std::string &addr, const msgpack::sbuffer &data) const override {
try { try {
for (const auto &topic : mc_topics_) { this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore);
this->m_sock__.send(zmq::message_t(topic), zmq::send_flags::sndmore); this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
}
} catch (const zmq::error_t &err) { } catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what()); fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
} }
}; };
private:
const std::list<std::string> mc_topics_;
}; };

View File

@ -17,5 +17,5 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -17,5 +17,5 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -17,6 +17,6 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -17,6 +17,6 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -17,6 +17,6 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override {}; void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
}; };

View File

@ -48,7 +48,7 @@ public:
} }
// Send to socket depending on implementation // Send to socket depending on implementation
void send(const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); }; void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); };
private: private:
const std::list<std::string> mc_topics_; const std::list<std::string> mc_topics_;

View File

@ -26,7 +26,10 @@ void entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std:
b.connect([](const std::string &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name<decltype(i)>()); }); b.connect([](const std::string &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name<decltype(i)>()); });
subscriber.listen(); subscriber.listen();
publisher << 1 << 2 << double{3.f} << std::string("test"); publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test");
std::this_thread::sleep_for(std::chrono::milliseconds(1000u)); std::this_thread::sleep_for(std::chrono::milliseconds(1000u));
subscriber.stop(); subscriber.stop();
@ -37,83 +40,77 @@ int main(int argc, char *argv[], char *envp[]) {
zmq::context_t zmq_ctx; zmq::context_t zmq_ctx;
auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx, auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx,
std::tuple{ std::tuple{
makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx, makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx,
std::tuple{ std::tuple{
makeAdapter("int-string-int", std::pair{ makeAdapter("int-string-int", std::pair{
[](const int32_t &i) -> std::vector<uint8_t> { [](const int32_t &i) -> std::vector<uint8_t> {
auto str = std::to_string(i + 5); auto str = std::to_string(i + 5);
return {str.begin(), str.end()}; return {str.begin(), str.end()};
}, },
[](const std::vector<uint8_t> &s) -> int32_t { return 5; }, [](const std::vector<uint8_t> &s) -> int32_t { return 5; },
}), }),
makeAdapter("string-string-string", std::pair{ makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> { [](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test"; auto str = i + "_test";
return {str.begin(), str.end()}; return {str.begin(), str.end()};
}, },
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; }, [](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}), }),
makeAdapter("double-string-double", std::pair{ makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> { [](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f); auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()}; return {str.begin(), str.end()};
}, },
[](const std::vector<uint8_t> &s) -> double { return .1f; }, [](const std::vector<uint8_t> &s) -> double { return .1f; },
}), }),
}, }),
std::tuple{
std::list<std::string>{ makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx,
"topic0", std::tuple{
"topic1", makeAdapter("int-string-int", std::pair{
"topic2", [](const int32_t &i) -> std::vector<uint8_t> {
auto str = std::to_string(i + 5);
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &s) -> int32_t { return 5; },
}),
makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test";
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}),
makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &s) -> double { return .1f; },
}),
}, },
}),
makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx, std::tuple{
std::tuple{ std::list<std::string>{
makeAdapter("int-string-int", std::pair{ "topic0",
[](const int32_t &i) -> std::vector<uint8_t> { "topic1",
auto str = std::to_string(i + 5); "topic2",
return {str.begin(), str.end()}; "topic3",
}, },
}),
});
[](const std::vector<uint8_t> &s) -> int32_t { return 5; }, mod->run(entry);
}),
makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test";
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}),
makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &s) -> double { return .1f; },
}),
},
std::tuple{
std::list<std::string>{
"topic0",
"topic1",
"topic2",
},
}),
});
mod.run(entry);
return 0; return 0;
} }