This commit is contained in:
Oleg Shishlyannikov 2025-10-27 19:01:58 +03:00
parent 3c5becfc71
commit 81d15c8a1e
2 changed files with 89 additions and 83 deletions

View File

@ -28,7 +28,6 @@ public:
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
fmt::print("Received envelope: {}\r\n", std::string(static_cast<const char *>(msg.data()), msg.size()));
this->m_sock__.recv(msg).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;

View File

@ -1,4 +1,5 @@
#include <boost/signals2/signal_type.hpp>
#include <charconv>
#include <chrono>
#include <cstdint>
#include <string>
@ -15,102 +16,108 @@
#include "module_factory.hpp"
#include "port_factory.hpp"
void entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<std::vector<uint8_t>> *> &ports) {
const auto &subscriber = *ports.at("subscriber");
const auto &publisher = *ports.at("publisher");
using env_data_type_t = std::vector<uint8_t>;
auto &a = subscriber.callback<int32_t>("int-string-int");
auto &b = subscriber.callback<std::string>("string-string-string");
void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port
a.connect([](const int32_t &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name<decltype(i)>()); });
b.connect([](const std::string &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name<decltype(i)>()); });
auto &int_cbk = subscriber.callback<int32_t>("int-vector<uint8_t>-int");
auto &string_cbk = subscriber.callback<std::string>("string-vector<uint8_t>-string");
auto &double_cbk = subscriber.callback<double>("double-vector<uint8_t>-double");
// Connect callbacks
int_cbk.connect([](const int32_t &i) -> void { fmt::print("SUBSCRIBER: got data: {} of integer type\r\n", i); });
string_cbk.connect([](const std::string &s) -> void { fmt::print("SUBSCRIBER: got data: {} of string type\r\n", s); });
double_cbk.connect([](const double &d) -> void { fmt::print("SUBSCRIBER: got data: {} of double type\r\n", d); });
// Listen for 1 second
subscriber.listen();
publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test");
std::this_thread::sleep_for(std::chrono::milliseconds(1000u));
subscriber.stop();
}
// Publisher module entrypoint
void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
const auto &publisher = *ports.at("publisher_port"); // Get publisher port
// Publish data
publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test");
}
struct codecs_s {
struct encoders_s {
static inline const auto from_int = [](const int32_t &i) -> env_data_type_t {
auto str = std::to_string(i); // Convert to string first
return {str.begin(), str.end()}; // String to byte array
};
static inline const auto from_string = [](const std::string &s) -> env_data_type_t { return {s.begin(), s.end()}; };
static inline const auto from_double = [](const double &d) -> env_data_type_t {
auto str = std::to_string(d);
return {str.begin(), str.end()};
};
} encoders;
struct decoders_s {
static inline const auto to_int = [](const env_data_type_t &s) -> int32_t {
int32_t ret;
auto str = std::string(s.begin(), s.end());
if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) {
return ret;
}
throw std::runtime_error(fmt::format("Invalid convert from {} to integer type", str));
}; // Byte array to int
static inline const auto to_string = [](const env_data_type_t &i) -> std::string { return std::string(i.begin(), i.end()); };
static inline const auto to_double = [](const env_data_type_t &s) -> double {
double ret;
auto str = std::string(s.begin(), s.end());
if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) {
return ret;
}
throw std::runtime_error(fmt::format("Invalid convert from {} to double type", str));
};
} decoders;
} codecs;
int main(int argc, char *argv[], char *envp[]) {
using enum port_types_e;
zmq::context_t zmq_ctx;
zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads
auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx,
std::tuple{
makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx,
// Make module that contains only 1 port working on PUBLISHER pattern
auto publisher_module = makeModule(argc, argv, envp, "publisher_module", zmq_ctx,
std::tuple{
makeAdapter("int-string-int", std::pair{
[](const int32_t &i) -> std::vector<uint8_t> {
auto str = std::to_string(i + 5);
return {str.begin(), str.end()};
},
makePort(PUB, "publisher_port", "inproc://point" /* This port will publish messages here */, zmq_ctx,
std::tuple{
makeAdapter("int-vector<uint8_t>-int", std::pair{codecs.encoders.from_int, codecs.decoders.to_int}),
makeAdapter("string-vector<uint8_t>-string", std::pair{codecs.encoders.from_string, codecs.decoders.to_string}),
makeAdapter("double-vector<uint8_t>-double", std::pair{codecs.encoders.from_double, codecs.decoders.to_double}),
}),
});
[](const std::vector<uint8_t> &s) -> int32_t { return 5; },
}),
// Make module that contains only 1 port working on SUBSCRIBER pattern
auto subscriber_module = makeModule(argc, argv, envp, "subscriber_module", zmq_ctx,
std::tuple{
makePort(SUB, "subscriber_port", "inproc://point" /* this port will read data here */, zmq_ctx,
std::tuple{
makeAdapter("int-vector<uint8_t>-int", std::pair{codecs.encoders.from_int, codecs.decoders.to_int}),
makeAdapter("string-vector<uint8_t>-string", std::pair{codecs.encoders.from_string, codecs.decoders.to_string}),
makeAdapter("double-vector<uint8_t>-double", std::pair{codecs.encoders.from_double, codecs.decoders.to_double}),
},
makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test";
return {str.begin(), str.end()};
},
// This type of port requires arguments - topics to subscribe
std::tuple{
// Topics to subscribe
std::list<std::string>{"topic0", "topic1", "topic2", "topic3"},
}),
});
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}),
makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &s) -> double { return .1f; },
}),
}),
makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx,
std::tuple{
makeAdapter("int-string-int", std::pair{
[](const int32_t &i) -> std::vector<uint8_t> {
auto str = std::to_string(i + 5);
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &s) -> int32_t { return 5; },
}),
makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test";
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}),
makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()};
},
[](const std::vector<uint8_t> &s) -> double { return .1f; },
}),
},
std::tuple{
std::list<std::string>{
"topic0",
"topic1",
"topic2",
"topic3",
},
}),
});
mod->run(entry);
publisher_module->run(publisher_entry); // Publish data
subscriber_module->run(subscriber_entry); // Subscribe and get data
return 0;
}