#include #include #include #include #include #include #include #include #include #define FMT_HEADER_ONLY #include #include #include "adapter_factory.hpp" #include "codecs.hpp" #include "module_factory.hpp" #include "port_factory.hpp" using env_data_type_t = std::vector; void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port // Fetch typed callbacks by adapter name and signature. auto &int_cbk = subscriber.callback("int-vector-int"); auto &string_cbk = subscriber.callback("string-vector-string"); auto &double_cbk = subscriber.callback("double-vector-double"); // Connect callbacks int_cbk.connect([](const int32_t &i, const std::string &addr) { fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", i, type_name>(), addr); }); string_cbk.connect([](const std::string &s, const std::string &addr) { fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", s, type_name>(), addr); }); double_cbk.connect([](const double &d, const std::string &addr) { fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", d, type_name>(), addr); }); } // Publisher module entrypoint void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &publisher = *ports.at("publisher_port"); // Get publisher port // Publish data publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test"); publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test"); publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test"); publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test"); } // Publisher module entrypoint void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &publisher = *ports.at("push_port"); // Get publisher port publisher << 1 << 2 << double{3.f} << std::string("test"); } void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &puller = *ports.at("pull_port"); // Get subscriber port // Fetch typed callbacks by adapter name and signature. auto &int_cbk = puller.callback("int-vector-int"); auto &string_cbk = puller.callback("string-vector-string"); auto &double_cbk = puller.callback("double-vector-double"); // Connect callbacks int_cbk.connect([](const int32_t &i) { fmt::print("PULL socket: got data: {} of {} type\r\n", i, type_name>()); }); string_cbk.connect([](const std::string &s) { fmt::print("PULL socket: got data: {} of {} type\r\n", s, type_name>()); }); double_cbk.connect([](const double &d) { fmt::print("PULL socket: got data: {} of {} type\r\n", d, type_name>()); }); } void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &req = *ports.at("req_port"); // Get publisher port // static_assert(std::is_same_v, void>, ""); // Fetch typed callbacks by adapter name and signature. auto &int_cbk = req.callback("int-vector-int"); auto &string_cbk = req.callback("string-vector-string"); auto &double_cbk = req.callback("double-vector-double"); // Connect callbacks int_cbk.connect([](const int32_t &i) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", i, type_name>()); }); string_cbk.connect([](const std::string &s) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", s, type_name>()); }); double_cbk.connect([](const double &d) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", d, type_name>()); }); req << 1 << 2 << double{3.f} << std::string("test"); } void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &rep = *ports.at("rep_port"); // Get subscriber port // Fetch typed callbacks by adapter name and signature. auto &int_cbk = rep.callback("int-vector-int"); auto &string_cbk = rep.callback("string-vector-string"); auto &double_cbk = rep.callback("double-vector-double"); // Connect callbacks int_cbk.connect([](const int32_t &i) -> std::string { fmt::print("REPLY socket: got data: {} of {} type\r\n", i, type_name>()); // Handle data ... return fmt::format("'Handle request: {} of type: {}'", i, type_name>()); }); string_cbk.connect([](const std::string &s) -> std::string { fmt::print("REPLY socket: got data: {} of {} type (FIRST callback)\r\n", s, type_name>()); // Handle data ... return fmt::format("'Handle request: {} of type: {}' (FIRST callback)", s, type_name>()); }); string_cbk.connect([](const std::string &s) -> std::string { fmt::print("REPLY socket: got data: {} of {} type (SECOND callback)\r\n", s, type_name>()); // Handle data ... return fmt::format("'Handle request: {} of type: {}' (SECOND callback)", s, type_name>()); }); double_cbk.connect([](const double &d) -> std::string { fmt::print("REPLY socket: got data: {} of {} type\r\n", d, type_name>()); // Handle data ... return fmt::format("'Handle request: {} of type: {}'", d, type_name>()); }); } void pair_server_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &pair = *ports.at("pair_server_port"); auto &int_cbk = pair.callback("int-vector-int"); auto &string_cbk = pair.callback("string-vector-string"); auto &double_cbk = pair.callback("double-vector-double"); int_cbk.connect([](const int32_t &i) { fmt::print("PAIR server: got data: {} of {} type\r\n", i, type_name>()); }); string_cbk.connect([](const std::string &s) { fmt::print("PAIR server: got data: {} of {} type\r\n", s, type_name>()); }); double_cbk.connect([](const double &d) { fmt::print("PAIR server: got data: {} of {} type\r\n", d, type_name>()); }); std::this_thread::sleep_for(std::chrono::milliseconds(50)); pair << 4 << 5 << double{6.f} << std::string("server->client"); } void pair_client_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &pair = *ports.at("pair_client_port"); auto &int_cbk = pair.callback("int-vector-int"); auto &string_cbk = pair.callback("string-vector-string"); auto &double_cbk = pair.callback("double-vector-double"); int_cbk.connect([](const int32_t &i) { fmt::print("PAIR client: got data: {} of {} type\r\n", i, type_name>()); }); string_cbk.connect([](const std::string &s) { fmt::print("PAIR client: got data: {} of {} type\r\n", s, type_name>()); }); double_cbk.connect([](const double &d) { fmt::print("PAIR client: got data: {} of {} type\r\n", d, type_name>()); }); pair << 1 << 2 << double{3.f} << std::string("test"); } void dish_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &dish = *ports.at("dish_port"); auto &int_cbk = dish.callback("int-vector-int"); auto &string_cbk = dish.callback("string-vector-string"); auto &double_cbk = dish.callback("double-vector-double"); int_cbk.connect([](const int32_t &i, const std::string &group) { fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", i, type_name>(), group); }); string_cbk.connect([](const std::string &s, const std::string &group) { fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", s, type_name>(), group); }); double_cbk.connect([](const double &d, const std::string &group) { fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", d, type_name>(), group); }); } void radio_entry(int32_t argc, char **argv, char **envp, const std::unordered_map *> &ports) { // Resolve port by name. const auto &radio = *ports.at("radio_port"); radio["grp0"] << 1 << 2 << double{3.f}; radio["grp1"] << std::string("test"); } int main(int argc, char *argv[], char *envp[]) { using enum port_types_e; codecs_s codecs; // Shared ZMQ context for in-process transports. zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads // Make module that contains only 1 port working on PUBLISHER pattern auto publisher_module = ModuleBuilder() .withName("publisher_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(PUB) .withName("publisher_port") .withEndpoints({ {"test", "inproc://PUB-SUB"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); // Make module that contains only 1 port working on SUBSCRIBER pattern auto subscriber_module = ModuleBuilder() .withName("subscriber_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(SUB) .withName("subscriber_port") .withEndpoints({ {"test", "inproc://PUB-SUB"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .withArgs(std::tuple{ std::list{"topic0", "topic1", "topic2", "topic3"}, }) .finalize(), }) .finalize(argc, argv, envp); auto pusher_module = ModuleBuilder() .withName("pusher_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(PUSH) .withName("push_port") .withEndpoints({ {"test", "inproc://PUSH-PULL"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); auto puller_module = ModuleBuilder() .withName("puller_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(PULL) .withName("pull_port") .withEndpoints({ {"test", "inproc://PUSH-PULL"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); auto req_module = ModuleBuilder() .withName("req_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(REQ) .withName("req_port") .withEndpoints({ {"test", "inproc://REQ-REP"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); auto rep_module = ModuleBuilder() .withName("rep_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(REP) .withName("rep_port") .withEndpoints({ {"test", "inproc://REQ-REP"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); auto pair_server_module = ModuleBuilder() .withName("pair_server_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(PAIR_SERVER) .withName("pair_server_port") .withEndpoints({ {"test", "inproc://PAIR"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); auto pair_client_module = ModuleBuilder() .withName("pair_client_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(PAIR_CLIENT) .withName("pair_client_port") .withEndpoints({ {"test", "inproc://PAIR"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); auto dish_module = ModuleBuilder() .withName("dish_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(DISH) .withName("dish_port") .withEndpoints({ {"test", "inproc://RADIO-DISH"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .withArgs(std::tuple{ std::list{"grp0", "grp1"}, }) .finalize(), }) .finalize(argc, argv, envp); auto radio_module = ModuleBuilder() .withName("radio_module") .withContext(zmq_ctx) .withPorts(std::tuple{ PortBuilder() .withType(RADIO) .withName("radio_port") .withEndpoints({ {"test", "inproc://RADIO-DISH"}, }) .withContext(zmq_ctx) .withAdapters(std::tuple{ AdapterBuilder() .encodeDataBy(&codecs.encoders.from_int) .decodeDataBy(&codecs.decoders.to_int) .withCallbackSignature() .withName("int-vector-int") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_string) .decodeDataBy(&codecs.decoders.to_string) .withCallbackSignature() .withName("string-vector-string") .finalize(), AdapterBuilder() .encodeDataBy(&codecs.encoders.from_double) .decodeDataBy(&codecs.decoders.to_double) .withCallbackSignature() .withName("double-vector-double") .finalize(), }) .finalize(), }) .finalize(argc, argv, envp); fmt::print("\r\nPUB-SUB test:\r\n"); subscriber_module->run(subscriber_entry); // Subscribe and get data publisher_module->run(publisher_entry); // Publish data std::this_thread::sleep_for(std::chrono::milliseconds(500)); fmt::print("\r\nPUSH-PULL test:\r\n"); pusher_module->run(pusher_entry); puller_module->run(puller_entry); std::this_thread::sleep_for(std::chrono::milliseconds(500)); fmt::print("\r\nREQ-REP test:\r\n"); rep_module->run(rep_entry); req_module->run(req_entry); std::this_thread::sleep_for(std::chrono::milliseconds(500)); fmt::print("\r\nPAIR test:\r\n"); pair_server_module->run(pair_server_entry); pair_client_module->run(pair_client_entry); std::this_thread::sleep_for(std::chrono::milliseconds(500)); fmt::print("\r\nRADIO-DISH test:\r\n"); dish_module->run(dish_entry); radio_module->run(radio_entry); std::this_thread::sleep_for(std::chrono::milliseconds(500)); fmt::print("DONE!\r\n"); return 0; }