dev #2

Open
OlegShishlyannikov wants to merge 6 commits from dev into main
29 changed files with 1876 additions and 255 deletions

View File

@ -1,2 +1,2 @@
CompileFlags:
Add: ['-std=gnu++23']
Add: ['-std=gnu++23', -I/opt/cling/include]

View File

@ -19,6 +19,14 @@ FetchContent_Declare(
)
FetchContent_MakeAvailable(inja)
# FetchContent_Declare(
# json
# GIT_REPOSITORY https://github.com/nlohmann/json
# GIT_PROGRESS TRUE
# GIT_TAG main
# )
# FetchContent_MakeAvailable(json)
FetchContent_Declare(
libzmq

View File

@ -1,5 +1,6 @@
#pragma once
#include "src/tuple.hpp"
#include <boost/callable_traits.hpp>
#include <boost/callable_traits/args.hpp>
#include <boost/callable_traits/return_type.hpp>
@ -16,23 +17,26 @@
using namespace boost::callable_traits;
template <typename Encoder, typename Decoder>
requires(
// Encoder return type is same as Decoder argument type, encoder and decoder have only 1 arg
std::tuple_size_v<args_t<Decoder>> == 1 && std::is_same_v<return_type_t<Encoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Decoder>>>> &&
std::tuple_size_v<args_t<Encoder>> == 1 && std::is_same_v<return_type_t<Decoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Encoder>>>>)
// Adapter that pairs an encoder/decoder with a callback signal.
template<typename ...> class Adapter;
template <typename Encoder, typename Decoder, typename CallbackRetTypeTag, typename... CbkAargs>
requires(std::tuple_size_v<args_t<Decoder>> == 1 && // Decoder has only 1 argument
std::is_same_v<return_type_t<Encoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Decoder>>>> && // Encoder return type is same with decoder argument
std::tuple_size_v<args_t<Encoder>> == 1 && // Encoder accepts only one argument
std::is_same_v<return_type_t<Decoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Encoder>>>> // Decoder return type is same with encoder argument
)
class Adapter : public AdapterBase<std::tuple_element_t<0, boost::callable_traits::args_t<Encoder>>> {
class Adapter<Encoder, Decoder, CallbackRetTypeTag, std::tuple<CbkAargs...>> : public AdapterBase<std::tuple_element_t<0, args_t<Encoder>>, CallbackRetTypeTag, CbkAargs...> {
public:
using encoder_type_t = Encoder;
using decoder_type_t = Decoder;
using adapter_type_t = Adapter;
using callback_arg_type_t = std::tuple_element_t<0, boost::callable_traits::args_t<Encoder>>;
using base_t = AdapterBase<callback_arg_type_t>;
using callback_type_t = base_t::callback_type_t;
Adapter(const std::string &name, std::pair<Encoder, Decoder> &&fns)
: AdapterBase<callback_arg_type_t>(name), mc_enc_(std::forward<Encoder>(fns.first)), mc_dec_(std::forward<Decoder>(fns.second)) {}
using callback_arg_type_t = std::tuple_element_t<0, args_t<Encoder>>;
using base_t = AdapterBase<callback_arg_type_t, CallbackRetTypeTag, CbkAargs...>;
Adapter(const std::string &name, std::tuple<Encoder, Decoder, CallbackRetTypeTag> &&fns)
: AdapterBase<callback_arg_type_t, CallbackRetTypeTag, CbkAargs...>(name), mc_enc_(std::forward<Encoder>(std::get<0u>(fns))),
mc_dec_(std::forward<Decoder>(std::get<1u>(fns))) {}
inline const auto &encoder() const { return mc_enc_; }
inline const auto &decoder() const { return mc_dec_; }
@ -42,5 +46,5 @@ public:
private:
const Encoder mc_enc_;
const Decoder mc_dec_;
mutable callback_type_t m_callback_;
mutable typename base_t::callback_type_t m_callback_;
};

View File

@ -1,18 +1,40 @@
#pragma once
#include <boost/signals2.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <type_traits>
// Base adapter interface exposing a typed callback signal and name.
template <typename InType, typename CallbackRetTypeTag, typename... CbkAargs> class AdapterBase {
using cbk_ret_type_t_ = typename CallbackRetTypeTag::type;
// If return type is not void we combine all callback invocation results.
class CollectAllCombiner_ {
public:
using result_type = std::conditional_t<!std::is_void_v<cbk_ret_type_t_>, std::vector<cbk_ret_type_t_>, std::false_type>;
template <typename InputIterator> result_type operator()(InputIterator first, InputIterator last) const {
result_type results;
if constexpr (!std::is_same_v<result_type, std::false_type>) {
for (; first != last; ++first) {
results.push_back(*first);
}
}
return results;
}
};
template <typename InType> class AdapterBase {
public:
using callback_type_t = boost::signals2::signal<void(const InType &)>;
// Use combiner if return type is not void
using signature_t = std::conditional_t<std::is_void_v<cbk_ret_type_t_>, void(const InType &, CbkAargs &&...), cbk_ret_type_t_(const InType &, CbkAargs &&...)>;
using callback_type_t = std::conditional_t<!std::is_void_v<cbk_ret_type_t_>, boost::signals2::signal<signature_t, CollectAllCombiner_>, boost::signals2::signal<signature_t>>;
AdapterBase(const std::string &name) : mc_name_(name) {}
virtual callback_type_t &callback() const = 0;
inline const auto &name() const { return mc_name_; }
protected:
private:
const std::string mc_name_;
};

View File

@ -1,7 +1,74 @@
#pragma once
#include "adapter.hpp"
#include "utils.hpp"
#include <boost/callable_traits/args.hpp>
#include <boost/callable_traits/return_type.hpp>
#include <tuple>
template <typename Encoder, typename Decoder> auto makeAdapter(const std::string &name, std::pair<Encoder, Decoder> &&fns) {
return std::make_unique<Adapter<Encoder, Decoder>>(name, std::forward<std::pair<Encoder, Decoder>>(fns));
}
#include "adapter.hpp"
#include "src/tuple.hpp"
using namespace boost::callable_traits;
template <typename...> class AdapterBuilder;
template <typename...> class AdapterBuilderNamed;
// Fluent builder to assemble an Adapter from encoder/decoder/signature/name.
template <> class AdapterBuilder<> {
public:
template <typename Encoder> AdapterBuilder<Encoder> encodeDataBy(Encoder &&encoder) { return AdapterBuilder<Encoder>(std::forward<Encoder>(encoder)); }
};
template <typename Encoder> class AdapterBuilder<Encoder> {
public:
AdapterBuilder(Encoder &&encoder) : m_encoder_(std::forward<Encoder>(encoder)) {}
template <typename Decoder> AdapterBuilder<Encoder, Decoder> decodeDataBy(Decoder &&decoder) {
// Advance builder with decoder selected.
return AdapterBuilder<Encoder, Decoder>(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(decoder));
}
private:
std::decay_t<Encoder> m_encoder_;
};
template <typename Encoder, typename Decoder> class AdapterBuilder<Encoder, Decoder> {
public:
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward<Encoder>(encoder)), m_decoder_(std::forward<Decoder>(decoder)) {}
template <typename CallbackSignature> AdapterBuilder<Encoder, Decoder, CallbackSignature> withCallbackSignature() {
// Advance builder with callback signature selected.
return AdapterBuilder<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(m_decoder_));
}
private:
std::decay_t<Encoder> m_encoder_;
std::decay_t<Decoder> m_decoder_;
};
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilder<Encoder, Decoder, CallbackSignature> {
public:
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward<Encoder>(encoder)), m_decoder_(std::forward<Decoder>(decoder)) {}
AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> withName(const std::string &name) {
// Advance builder with adapter name selected.
return AdapterBuilderNamed<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(m_decoder_), name);
}
private:
std::decay_t<Encoder> m_encoder_;
std::decay_t<Decoder> m_decoder_;
};
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> {
public:
AdapterBuilderNamed(Encoder &&encoder, Decoder &&decoder, const std::string &name)
: m_encoder_(std::forward<Encoder>(encoder)), m_decoder_(std::forward<Decoder>(decoder)), mc_name_(name) {}
auto finalize() {
// Produce a concrete Adapter instance.
return std::make_unique<Adapter<Encoder, Decoder, tag_s<return_type_t<CallbackSignature>>, typename tp::tuple_tail<args_t<CallbackSignature>>::type>>(
mc_name_, std::make_tuple(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(m_decoder_), tag_s<return_type_t<CallbackSignature>>{}));
}
private:
const std::string mc_name_;
std::decay_t<Encoder> m_encoder_;
std::decay_t<Decoder> m_decoder_;
};

34
src/codecs.cpp Normal file
View File

@ -0,0 +1,34 @@
#include "codecs.hpp"
template <> std::vector<uint8_t> codecs_s<std::vector<uint8_t>>::encoders_s::from_int(const int32_t &i) {
auto str = std::to_string(i); // Convert to string first
return {str.begin(), str.end()}; // String to byte array
}
template <> auto codecs_s<std::vector<uint8_t>>::encoders_s::from_string(const std::string &s) -> std::vector<uint8_t> { return {s.begin(), s.end()}; };
template <> auto codecs_s<std::vector<uint8_t>>::encoders_s::from_double(const double &d) -> std::vector<uint8_t> {
auto str = std::to_string(d);
return {str.begin(), str.end()};
};
template <> auto codecs_s<std::vector<uint8_t>>::decoders_s::to_int(const std::vector<uint8_t> &s) -> int32_t {
int32_t ret;
auto str = std::string(s.begin(), s.end());
if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) {
return ret;
}
throw std::runtime_error(fmt::format("Invalid convert from {} to integer type", str));
};
template <> auto codecs_s<std::vector<uint8_t>>::decoders_s::to_string(const std::vector<uint8_t> &i) -> std::string { return std::string(i.begin(), i.end()); };
template <> auto codecs_s<std::vector<uint8_t>>::decoders_s::to_double(const std::vector<uint8_t> &s) -> double {
double ret;
auto str = std::string(s.begin(), s.end());
if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) {
return ret;
}
throw std::runtime_error(fmt::format("Invalid convert from {} to double type", str));
};

26
src/codecs.hpp Normal file
View File

@ -0,0 +1,26 @@
#pragma once
#include <cstdint>
#include <charconv>
#include <stdexcept>
#include <string>
#include <functional>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
// Simple encoder/decoder set for an environment payload type.
template<typename EnvDataType> struct codecs_s {
struct encoders_s {
static auto from_int(const int32_t &) -> EnvDataType;
static auto from_string(const std::string &s) -> EnvDataType;
static auto from_double(const double &d) -> EnvDataType;
} encoders;
struct decoders_s {
static auto to_int(const EnvDataType &s) -> int32_t;
static auto to_string(const EnvDataType &i) -> std::string;
static auto to_double(const EnvDataType &s) -> double;
} decoders;
};

View File

@ -4,16 +4,17 @@
#include <tuple>
#include <unordered_map>
#include <utility>
#include <zmq.hpp>
#include "port_base.hpp"
#include "tuple.hpp"
// Module type hint (currently unused).
enum class module_type_e : uint32_t {
STANDALONE,
INCOMPOSITION,
};
// Base interface for a named module; exposes ports and run entrypoint.
template <typename EnvDataType> class ModuleBase {
public:
ModuleBase(int32_t argc, char **argv, char **envp, const std::string &name)
@ -40,6 +41,7 @@ private:
} mc_cli_args_;
};
// Concrete module that owns typed ports and dispatches entry with a port map.
template <typename... Ports> class Module : public ModuleBase<std::tuple_element_t<0, std::tuple<typename Ports::port_data_type_t...>>> {
public:
using port_data_type_t = std::tuple_element_t<0, std::tuple<typename Ports::port_data_type_t...>>;

View File

@ -2,8 +2,59 @@
#include "module.hpp"
#include <memory>
#include <string>
#include <tuple>
#include <utility>
// Factory for building a Module from owned ports.
template <typename... Ports>
auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Ports>...> &&ports) {
return std::make_unique<Module<Ports...>>(argc, argv, envp, name, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Ports>...>>(ports));
}
class ModuleBuilderNamed;
class ModuleBuilderContext;
template <typename...> class ModuleBuilderPorts;
// Fluent builder to assemble a Module from name/context/ports.
class ModuleBuilder {
public:
ModuleBuilderNamed withName(std::string name);
};
class ModuleBuilderNamed {
public:
explicit ModuleBuilderNamed(std::string name) : name_(std::move(name)) {}
ModuleBuilderContext withContext(zmq::context_t &zmq_ctx);
private:
std::string name_;
};
class ModuleBuilderContext {
public:
ModuleBuilderContext(std::string name, zmq::context_t &zmq_ctx) : name_(std::move(name)), ctx_(&zmq_ctx) {}
template <typename... Ports> ModuleBuilderPorts<Ports...> withPorts(std::tuple<std::unique_ptr<Ports>...> &&ports) {
return ModuleBuilderPorts<Ports...>(name_, *ctx_, std::forward<std::tuple<std::unique_ptr<Ports>...>>(ports));
}
private:
std::string name_;
zmq::context_t *ctx_;
};
template <typename... Ports> class ModuleBuilderPorts {
public:
ModuleBuilderPorts(std::string name, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Ports>...> &&ports) : name_(std::move(name)), ctx_(&zmq_ctx), ports_(std::move(ports)) {}
auto finalize(int32_t argc, char **argv, char **envp) { return makeModule(argc, argv, envp, name_, *ctx_, std::move(ports_)); }
private:
std::string name_;
zmq::context_t *ctx_;
std::tuple<std::unique_ptr<Ports>...> ports_;
};
inline ModuleBuilderNamed ModuleBuilder::withName(std::string name) { return ModuleBuilderNamed(std::move(name)); }
inline ModuleBuilderContext ModuleBuilderNamed::withContext(zmq::context_t &zmq_ctx) { return ModuleBuilderContext(std::move(name_), zmq_ctx); }

View File

@ -5,60 +5,100 @@
#include <boost/callable_traits/return_type.hpp>
#include <cstddef>
#include <stdexcept>
#include <tuple>
#include <type_traits>
#include <utility>
#include "port_base.hpp"
#include "ports.hpp"
#include "src/port_types.hpp"
#include "tuple.hpp"
using namespace boost::callable_traits;
// Typed port implementation owning adapters and a ZMQ transport.
template <typename...> class Port;
template <typename... Adapters, typename... Args>
requires(
// Check return types
(std::is_same_v<typename Adapters::base_t::callback_type_t::result_type, typename std::tuple_element_t<0, std::tuple<Adapters...>>::base_t::callback_type_t::result_type> &&
...) &&
// Check number of args
((std::tuple_size_v<args_t<typename Adapters::base_t::callback_type_t::signature_type>> ==
std::tuple_size_v<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>) &&
...) &&
// All adapters are unique
([]<typename... Ts>() consteval {
return []<size_t... Is>(std::index_sequence<Is...>) consteval {
using tuple_t = std::tuple<Ts...>;
return ((tp::tuple_index<std::tuple_element_t<Is, tuple_t>, tuple_t>::value == Is) && ...);
}(std::index_sequence_for<Ts...>{});
}.template operator()<std::remove_cvref_t<return_type_t<typename Adapters::decoder_type_t>>...>()))
class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> {
public:
static constexpr auto callback_args_num = std::tuple_size_v<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>;
using this_t = Port<std::tuple<Adapters...>, std::tuple<Args...>>;
using port_data_type_t = std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>;
using callback_aargs_t = tp::tuple_tail<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>::type;
Port(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Adapters>...> &&adapters,
std::tuple<Args...> &&args)
: PortBase<port_data_type_t>(pt, name, endpoint, zmq_ctx),
Port(enum port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx,
std::tuple<std::unique_ptr<Adapters>...> &&adapters, std::tuple<Args...> &&args)
: PortBase<port_data_type_t>(pt, name, endpoints, zmq_ctx),
// Init adapters
mc_adapters_([&]<size_t... Ids>(std::index_sequence<Ids...>) {
return std::make_tuple([&]<size_t Idx>() {
using adapter_type_t = std::remove_cvref_t<decltype(*std::get<Idx>(adapters))>;
using adapter_input_type_t = return_type_t<typename adapter_type_t::decoder_type_t>;
using adapter_input_type_t = std::remove_cvref_t<return_type_t<typename adapter_type_t::decoder_type_t>>;
using adapter_callback_type_t = std::remove_cvref_t<typename adapter_type_t::base_t::callback_type_t::signature_type>;
return std::make_tuple(std::get<Idx>(adapters)->name(), std::hash<std::string>()(std::get<Idx>(adapters)->name()),
typeid(std::remove_cvref_t<adapter_input_type_t>).hash_code(), std::forward<std::unique_ptr<adapter_type_t>>(std::get<Idx>(adapters)));
// Cache name, name hash, input type hash, callback type hash, and adapter pointer.
return std::make_tuple(std::get<Idx>(adapters)->name(), std::hash<std::string>()(std::get<Idx>(adapters)->name()), type_hash<adapter_input_type_t>(),
type_hash<adapter_callback_type_t>(), std::forward<std::unique_ptr<adapter_type_t>>(std::get<Idx>(adapters)));
}.template operator()<Ids>()...);
}(std::make_index_sequence<sizeof...(Adapters)>{})) {
std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward<Args>(args)...); }, std::forward<decltype(args)>(args));
// Instantiate the port implementation with any extra args (e.g., SUB topics).
std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoints), std::forward<Args>(args)...); }, std::forward<decltype(args)>(args));
}
~Port() override { stop(); }
void stop() const override {
this->stop__();
mc_impl_->close();
stop__();
m_impl__->close();
}
inline const auto &adapters() const { return mc_adapters_; }
protected:
void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override {
void stop__() const override { m_impl__->stop_source().request_stop(); }
void send__(const void *data, size_t size, uint64_t hash, const std::string &addr = "") const override {
if (!addr.empty() && this->type() != port_types_e::PUB && this->type() != port_types_e::RADIO) {
throw std::runtime_error("Addressed send is only supported for PUB/RADIO ports");
}
const void *adapter_ptr = nullptr;
tp::for_each(mc_adapters_, [&](const auto &e) {
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e;
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
if (adapter_typehash == hash) {
using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>;
adapter_ptr = &adapter;
typename PortImplBase<cbk_type_t_>::port_payload_s payload = {
// Encode to the environment type and pack payload.
typename PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>::port_payload_s payload = {
.typehash = hash,
.data = adapter->encoder()(*reinterpret_cast<const adapter_in_type_t *>(data)),
.data = {adapter->encoder()(*reinterpret_cast<const adapter_in_type_t *>(data))},
};
msgpack::sbuffer buf;
msgpack::pack(buf, payload);
mc_impl_->send(addr, buf);
// Send encoded payload via the transport.
m_impl__->send(buf, addr);
}
});
@ -67,20 +107,22 @@ protected:
}
}
void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final {
void *get_adapter__(const std::string &name, uint64_t namehash, uint64_t typehash, uint64_t cbk_typehash) const override final {
void *ret = nullptr;
tp::for_each(mc_adapters_, [&](auto &a) {
if (!ret) {
auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = a;
if (adapter_typehash == typehash && adapter_namehash == namehash) {
auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = a;
// Match by adapter name/type and callback signature.
if (adapter_typehash == typehash && adapter_namehash == namehash && adapter_cbk_typehash == cbk_typehash) {
ret = reinterpret_cast<void *>(adapter.get());
}
}
});
if (!ret) {
throw std::runtime_error(fmt::format("No adapter '{}' in port '{}'\r\n", name, this->name()));
throw std::runtime_error(
fmt::format("No such callback in adapter '{}' in port '{}' (namehash: #{}, typehash: #{}, cbk_typehash: #{})\r\n", name, this->name(), namehash, typehash, cbk_typehash));
}
return ret;
@ -88,20 +130,57 @@ protected:
private:
using base_t_ = PortBase<port_data_type_t>;
using cbk_type_t_ = std::function<void(const port_data_type_t &, size_t)>;
using cbk_return_type_t_ = typename std::tuple_element_t<0, std::tuple<Adapters...>>::base_t::callback_type_t::result_type;
mutable std::tuple<std::tuple<std::string, size_t, size_t, std::unique_ptr<Adapters>>...> mc_adapters_;
mutable std::unique_ptr<PortImplBase<cbk_type_t_>> mc_impl_;
template <typename...> class PortImplCallback_;
template <typename... Aargs> class PortImplCallback_<std::tuple<Aargs...>> {
public:
PortImplCallback_(const Port *port) : mc_port_(port) {}
using type_t = std::function<cbk_return_type_t_(const port_data_type_t &, uint64_t, Aargs &&...)>;
void listen__(std::stop_token st) const override { mc_impl_->listen(st); }
cbk_return_type_t_ operator()(const port_data_type_t &data, uint64_t hash, Aargs &&...callback_args) const {
std::conditional_t<!std::is_void_v<cbk_return_type_t_>, cbk_return_type_t_, std::false_type> ret{};
bool found = false;
tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) {
if (!found) {
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
if (adapter_typehash == hash) {
// Decode payload and dispatch to the adapter's signal.
if constexpr (std::is_void_v<std::invoke_result_t<std::remove_cvref_t<decltype(adapter->callback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) {
adapter->callback()(adapter->decoder()(data), std::forward<Aargs>(callback_args)...);
} else {
ret = adapter->callback()(adapter->decoder()(data), std::forward<Aargs>(callback_args)...);
}
found = true;
}
}
});
if (!found) {
throw std::runtime_error(fmt::format("Adapter with type hash: {} not found", hash));
}
if constexpr (!std::is_void_v<cbk_return_type_t_>) {
return ret;
}
}
private:
const Port *mc_port_;
};
mutable std::unique_ptr<PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>> m_impl__{nullptr};
mutable std::tuple<std::tuple<std::string, uint64_t, uint64_t, uint64_t, std::unique_ptr<Adapters>>...> mc_adapters_;
template <typename... ImplArgs> void init_impl_(enum port_types_e pt, ImplArgs &&...args) const {
using enum port_types_e;
static constexpr auto make_null_impl_pair = []<enum port_types_e port_type>() consteval {
if constexpr (port_type == UNKNOWN) {
return std::make_pair(port_type, static_cast<PortImplBase<cbk_type_t_> *>(nullptr));
return std::make_pair(port_type, static_cast<PortImplBase<this_t, PortImplCallback_<callback_aargs_t>> *>(nullptr));
} else {
return std::make_pair(port_type, static_cast<PortImpl<port_type, cbk_type_t_> *>(nullptr));
return std::make_pair(port_type, static_cast<PortImpl<port_type, this_t, PortImplCallback_<callback_aargs_t>> *>(nullptr));
}
};
@ -109,30 +188,24 @@ private:
std::make_tuple(make_null_impl_pair.template operator()<UNKNOWN>(), make_null_impl_pair.template operator()<PUB>(), make_null_impl_pair.template operator()<SUB>(),
make_null_impl_pair.template operator()<REQ>(), make_null_impl_pair.template operator()<REP>(), make_null_impl_pair.template operator()<ROUTER>(),
make_null_impl_pair.template operator()<DEALER>(), make_null_impl_pair.template operator()<PUSH>(), make_null_impl_pair.template operator()<PULL>(),
make_null_impl_pair.template operator()<PAIR>());
make_null_impl_pair.template operator()<PAIR_CLIENT>(), make_null_impl_pair.template operator()<PAIR_SERVER>(),
make_null_impl_pair.template operator()<RADIO>(), make_null_impl_pair.template operator()<DISH>());
tp::for_each(impl_map, [&](const auto &p) {
const auto &[type, null_pimpl] = p;
if (type == pt) {
using impl_type_t = std::remove_pointer_t<decltype(null_pimpl)>;
if constexpr (std::is_constructible_v<impl_type_t, const ImplArgs &..., cbk_type_t_ &&>) {
mc_impl_ = std::make_unique<impl_type_t>(
// Args
std::forward<ImplArgs>(args)...,
// Construct the selected PortImpl if its constructor matches the args.
if constexpr (std::is_constructible_v<impl_type_t, const this_t *, const ImplArgs &..., PortImplCallback_<callback_aargs_t> &&>) {
m_impl__ = std::make_unique<impl_type_t>(this, std::forward<ImplArgs>(args)..., PortImplCallback_<callback_aargs_t>(this));
}
}
});
// Callback
[this](const port_data_type_t &data, size_t hash) {
tp::for_each(mc_adapters_, [&](const auto &e) {
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e;
if (adapter_typehash == hash) {
adapter->callback()(adapter->decoder()(data));
if (!m_impl__) {
throw std::runtime_error("No PortImpl for port type ...");
}
});
});
}
}
});
}
};

View File

@ -1,8 +1,14 @@
#pragma once
#include <cstdint>
#include <future>
#include "src/tuple.hpp"
#include <boost/callable_traits/args.hpp>
#include <boost/callable_traits/return_type.hpp>
#include <boost/signals2/signal.hpp>
#include <msgpack.hpp>
#include <tuple>
#include <type_traits>
#define ZMQ_BUILD_DRAFT_API
#include <zmq.hpp>
#define FMT_HEADER_ONLY
@ -13,12 +19,16 @@
#include "port_types.hpp"
#include "utils.hpp"
using namespace boost::callable_traits;
// Abstract port API for sending encoded messages and accessing callbacks.
template <typename EncodedType> class PortBase {
// Helper binding an address/topic to a port for chained sends.
class AddressedPort_ {
public:
AddressedPort_(const PortBase<EncodedType> *port, const std::string &address) : mc_addr_(address), mc_port_(port) {}
template <typename InType> const auto &operator<<(const InType &in) const {
mc_port_->send__(mc_addr_, &in, sizeof(InType), typeid(InType).hash_code());
mc_port_->send__(&in, sizeof(InType), type_hash<std::remove_cvref_t<InType>>(), mc_addr_);
return *this;
}
@ -28,39 +38,59 @@ template <typename EncodedType> class PortBase {
};
public:
PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx)
: mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash<std::string>()(name)), mc_endpoint_hash_(std::hash<std::string>()(endpoint)) {}
PortBase(enum port_types_e port_type, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx)
: mc_type_(port_type), mc_name_(name), mc_endpoints_(endpoints), mc_name_hash_(std::hash<std::string>()(name)) {}
virtual ~PortBase() = default;
inline const auto &type() const { return mc_type_; }
inline const auto &name() const { return mc_name_; }
inline const auto &endpoint() const { return mc_endpoint_; }
inline const auto &endpoints() const { return mc_endpoints_; }
inline const auto &name_hash() const { return mc_name_hash_; }
inline const auto &endpoint_hash() const { return mc_endpoint_hash_; }
const auto operator[](const std::string &address) const { return AddressedPort_(this, address); }
template <typename InType> const PortBase<EncodedType> &operator<<(const InType &in) const {
send__("", &in, sizeof(InType), typeid(InType).hash_code());
// Use empty address for non-addressed sends.
send__(&in, sizeof(InType), type_hash<std::remove_cvref_t<InType>>());
return *this;
}
void listen() const { listen__(m_ss_.get_token()); };
virtual void stop() const = 0;
template <typename AdapterInType> auto &callback(const std::string &name) const {
return (*static_cast<AdapterBase<AdapterInType> *>(get_adapter__(name, std::hash<std::string>()(name), typeid(std::remove_cvref_t<AdapterInType>).hash_code()))).callback();
template <typename Signature> auto &callback(const std::string &name) const {
// Lookup adapter by callback signature and name.
return GetCallbackHelper_<typename tp::tuple_tail<args_t<Signature>>::type>(this).template operator()<Signature>(name);
}
protected:
void stop__() const { m_ss_.request_stop(); }
virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0;
virtual void listen__(std::stop_token) const = 0;
virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0;
virtual void stop__() const = 0;
virtual void send__(const void *data, size_t size, uint64_t type_hash, const std::string &addr = "") const = 0;
virtual void *get_adapter__(const std::string &name, uint64_t namehash, uint64_t typehash, uint64_t cbk_typehash) const = 0;
private:
const enum port_types_e mc_type_;
const std::string mc_name_, mc_endpoint_;
const size_t mc_name_hash_, mc_endpoint_hash_;
mutable std::stop_source m_ss_;
const std::string mc_name_;
const std::map<std::string, std::string> mc_endpoints_;
const uint64_t mc_name_hash_;
// Type-safe callback lookup helper.
template <typename...> class GetCallbackHelper_;
template <typename... Aargs> class GetCallbackHelper_<std::tuple<Aargs...>> {
public:
GetCallbackHelper_(const PortBase *port) : mc_port_(port) {}
template <typename Signature> auto &operator()(const std::string &name) const {
using ret_type_t = std::remove_cvref_t<return_type_t<std::function<Signature>>>;
using arg_type_t = std::remove_cvref_t<std::tuple_element_t<0, args_t<std::function<Signature>>>>;
using cbk_type_t = typename AdapterBase<arg_type_t, tag_s<ret_type_t>, Aargs...>::callback_type_t::signature_type;
return (*static_cast<AdapterBase<arg_type_t, tag_s<ret_type_t>, Aargs...> *>(
mc_port_->get_adapter__(name, std::hash<std::string>()(name), type_hash<arg_type_t>(), type_hash<std::remove_cvref_t<cbk_type_t>>())))
.callback();
// typeid(typename AdapterBase<arg_type_t, tag_s<ret_type_t>, Aargs...>::callback_type_t).hash_code()
}
private:
const PortBase *mc_port_;
};
};

View File

@ -2,21 +2,26 @@
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <chrono>
#include <thread>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::DEALER, Callback> : public PortImplBase<Callback> {
// Dealer transport placeholder (no send/recv logic yet).
template <typename Port, typename Callback> class PortImpl<port_types_e::DEALER, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {}
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {};
private:
void listen__(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
}
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
};

84
src/port_dish_impl.hpp Normal file
View File

@ -0,0 +1,84 @@
#pragma once
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <chrono>
#include <list>
#include <optional>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
// Dish transport: connects, joins groups, and dispatches payloads with group name.
template <typename Port, typename Callback> class PortImpl<port_types_e::DISH, Port, Callback> : public PortImplBase<Port, Callback> {
public:
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, const std::list<std::string> &groups, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)), mc_groups_(groups) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::dish);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.connect(ep);
}
// Join each group.
for (const auto &group : mc_groups_) {
this->m_sock__.join(group.c_str());
}
// Start async listener loop.
listen__(this->stop_source().get_token());
}
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on DISH pattern socket"); };
private:
const std::list<std::string> mc_groups_;
void listen__(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
try {
zmq::poller_t poller;
poller.add(this->m_sock__, zmq::event_flags::pollin);
while (!st.stop_requested()) {
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
for (int32_t i = 0; i < num_events; ++i) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
std::string group = std::string(msg.group());
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
// Dispatch each decoded item.
for (const auto &data : batch) {
this->mc_cbk__(data, typehash, group);
}
return std::optional(res);
});
}
}
} catch (const zmq::error_t &) {
if (!st.stop_requested()) {
throw;
}
} catch (...) {
if (!st.stop_requested()) {
throw;
}
}
},
st);
}
};

View File

@ -1,10 +1,119 @@
#pragma once
#include "port.hpp"
#include <map>
#include <string>
#include <tuple>
#include <utility>
// Factory for building a Port from adapters and extra args.
template <typename... Adapters, typename... Args>
auto makePort(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Adapters>...> &&adapters,
std::tuple<Args...> &&args = {}) {
return std::make_unique<Port<std::tuple<Adapters...>, std::tuple<Args...>>>(pt, name, endpoint, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(adapters),
auto makePort(enum port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx,
std::tuple<std::unique_ptr<Adapters>...> &&adapters, std::tuple<Args...> &&args = {}) {
return std::make_unique<Port<std::tuple<Adapters...>, std::tuple<Args...>>>(pt, name, endpoints, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(adapters),
std::forward<std::tuple<Args...>>(args));
}
class PortBuilderNamed;
class PortBuilderEndpoints;
class PortBuilderContext;
class PortBuilderWithContext;
template <typename...> class PortBuilderAdapters;
template <typename...> class PortBuilderArgs;
class PortBuilderNamed {
public:
explicit PortBuilderNamed(port_types_e pt) : m_pt_(pt) {}
PortBuilderEndpoints withName(const std::string &name);
private:
port_types_e m_pt_;
};
// Fluent builder to assemble a Port from type/name/endpoints/context/adapters/args.
class PortBuilder {
public:
inline PortBuilderNamed withType(port_types_e pt) { return PortBuilderNamed(pt); }
};
class PortBuilderWithContext {
public:
PortBuilderWithContext(port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx)
: m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx) {}
template <typename... Adapters> inline PortBuilderAdapters<std::tuple<Adapters...>> withAdapters(std::tuple<std::unique_ptr<Adapters>...> &&adapters) {
return PortBuilderAdapters<std::tuple<Adapters...>>(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(adapters));
}
private:
port_types_e m_pt_;
std::string m_name_;
std::map<std::string, std::string> m_endpoints_;
zmq::context_t *m_ctx_;
};
class PortBuilderContext {
public:
PortBuilderContext(port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints) : m_pt_(pt), m_name_(name), m_endpoints_(endpoints) {}
PortBuilderWithContext withContext(zmq::context_t &zmq_ctx);
private:
port_types_e m_pt_;
std::string m_name_;
std::map<std::string, std::string> m_endpoints_;
};
class PortBuilderEndpoints {
public:
PortBuilderEndpoints(port_types_e pt, const std::string &name) : m_pt_(pt), m_name_(name) {}
PortBuilderContext withEndpoints(const std::map<std::string, std::string> &endpoints);
private:
port_types_e m_pt_;
std::string m_name_;
};
template <typename... Adapters> class PortBuilderAdapters<std::tuple<Adapters...>> {
public:
PortBuilderAdapters(port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx,
std::tuple<std::unique_ptr<Adapters>...> &&adapters)
: m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx), m_adapters_(&adapters) {}
template <typename... Args> PortBuilderArgs<std::tuple<Adapters...>, std::tuple<Args...>> withArgs(std::tuple<Args...> &&args) {
return PortBuilderArgs<std::tuple<Adapters...>, std::tuple<Args...>>(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(*m_adapters_),
std::forward<std::tuple<Args...>>(args));
}
auto finalize() { return makePort(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(*m_adapters_)); }
private:
port_types_e m_pt_;
std::string m_name_;
std::map<std::string, std::string> m_endpoints_;
zmq::context_t *m_ctx_;
std::tuple<std::unique_ptr<Adapters>...> *m_adapters_;
};
template <typename... Adapters, typename... Args> class PortBuilderArgs<std::tuple<Adapters...>, std::tuple<Args...>> {
public:
PortBuilderArgs(port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx,
std::tuple<std::unique_ptr<Adapters>...> &&adapters, std::tuple<Args...> &&args)
: m_pt_(pt), m_name_(name), m_endpoints_(endpoints), m_ctx_(&zmq_ctx), m_adapters_(&adapters), m_args_(std::forward<std::tuple<Args...>>(args)) {}
inline auto finalize() {
return makePort(m_pt_, m_name_, m_endpoints_, *m_ctx_, std::forward<std::tuple<std::unique_ptr<Adapters>...>>(*m_adapters_), std::forward<std::tuple<Args...>>(m_args_));
}
private:
port_types_e m_pt_;
std::string m_name_;
std::map<std::string, std::string> m_endpoints_;
zmq::context_t *m_ctx_;
std::tuple<std::unique_ptr<Adapters>...> *m_adapters_;
std::tuple<Args...> m_args_;
};
inline PortBuilderEndpoints PortBuilderNamed::withName(const std::string &name) { return PortBuilderEndpoints(m_pt_, name); }
inline PortBuilderWithContext PortBuilderContext::withContext(zmq::context_t &zmq_ctx) { return PortBuilderWithContext(m_pt_, m_name_, m_endpoints_, zmq_ctx); }
inline PortBuilderContext PortBuilderEndpoints::withEndpoints(const std::map<std::string, std::string> &endpoints) { return PortBuilderContext(m_pt_, m_name_, endpoints); }

View File

@ -4,37 +4,57 @@
#include <future>
#include <msgpack.hpp>
#include <tuple>
#include <stop_token>
#define ZMQ_BUILD_DRAFT_API
#include <zmq.hpp>
template <typename Callback> class PortImplBase {
// Base ZMQ-backed transport for a port; manages socket and listener thread.
template <typename Port, typename Callback> class PortImplBase {
using port_data_type_t_ = std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<Callback>>>;
public:
// Wire payload: type hash + batch of encoded values.
struct port_payload_s {
size_t typehash;
port_data_type_t_ data;
uint64_t typehash;
std::vector<port_data_type_t_> data;
MSGPACK_DEFINE(typehash, data);
};
PortImplBase(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : mc_endpoint__(endpoint), m_ctx__(zmq_ctx), mc_cbk__(callback) {}
PortImplBase(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: mc_endpoints__(endpoints), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {}
virtual ~PortImplBase() = default;
virtual void listen(std::stop_token st) const = 0;
virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0;
virtual void send(const msgpack::sbuffer &data, const std::string &addr = "") const = 0;
void close() {
m_sock__.close();
// Join listener thread before closing the socket to avoid polling on a closed fd.
if (m_listener_thread__.valid()) {
try {
m_listener_thread__.get();
} catch (...) {
// Ignore listener exceptions during shutdown.
}
}
try {
m_sock__.close();
} catch (...) {
// Ignore close errors during shutdown.
}
};
inline auto &stop_source() { return m_ss_; }
protected:
static constexpr auto sc_recv_timeout_ms__ = 1'000u;
mutable std::future<void> m_listener_thread__;
mutable zmq::socket_t m_sock__;
zmq::context_t &m_ctx__;
const std::string mc_endpoint__;
const std::map<std::string, std::string> mc_endpoints__;
const Callback mc_cbk__;
const Port *mc_port__;
virtual void listen__(std::stop_token st) const = 0;
private:
mutable std::stop_source m_ss_;
};

View File

@ -2,20 +2,148 @@
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <chrono>
#include <optional>
#include <vector>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::PAIR, Callback> : public PortImplBase<Callback> {
// Pair transport (client): connects and listens for incoming payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::PAIR_CLIENT, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.connect(ep);
}
listen__(this->stop_source().get_token());
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
try {
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
}
};
private:
void listen__(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
try {
zmq::poller_t poller;
poller.add(this->m_sock__, zmq::event_flags::pollin);
while (!st.stop_requested()) {
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
for (int32_t i = 0; i < num_events; ++i) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
for (const auto &data : batch) {
this->mc_cbk__(data, typehash);
}
return std::optional(res);
});
}
}
} catch (const zmq::error_t &) {
if (!st.stop_requested()) {
throw;
}
} catch (...) {
if (!st.stop_requested()) {
throw;
}
}
},
st);
}
};
// Pair transport (server): binds and listens for incoming payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::PAIR_SERVER, Port, Callback> : public PortImplBase<Port, Callback> {
public:
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.bind(ep);
}
listen__(this->stop_source().get_token());
}
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
try {
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
}
};
private:
void listen__(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
try {
zmq::poller_t poller;
poller.add(this->m_sock__, zmq::event_flags::pollin);
while (!st.stop_requested()) {
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
for (int32_t i = 0; i < num_events; ++i) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
for (const auto &data : batch) {
this->mc_cbk__(data, typehash);
}
return std::optional(res);
});
}
}
} catch (const zmq::error_t &) {
if (!st.stop_requested()) {
throw;
}
} catch (...) {
if (!st.stop_requested()) {
throw;
}
}
},
st);
}
};

View File

@ -7,24 +7,30 @@
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::PUB, Callback> final : public PortImplBase<Callback> {
// Publisher transport: binds endpoints and sends topic + payload frames.
template <typename Port, typename Callback> class PortImpl<port_types_e::PUB, Port, Callback> final : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub);
this->m_sock__.bind(this->mc_endpoint__);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.bind(ep);
}
}
~PortImpl() override {}
void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); }
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
try {
this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore);
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
}
};
private:
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); }
};

View File

@ -2,20 +2,73 @@
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <chrono>
#include <optional>
#include <zmq.hpp>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::PULL, Callback> : public PortImplBase<Callback> {
// Pull transport: connects and listens for incoming payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::PULL, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pull);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.connect(ep);
}
listen__(this->stop_source().get_token());
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); };
private:
void listen__(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
try {
zmq::poller_t poller;
poller.add(this->m_sock__, zmq::event_flags::pollin);
while (!st.stop_requested()) {
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
for (int32_t i = 0; i < num_events; ++i) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
for (const auto &data : batch) {
this->mc_cbk__(data, typehash);
}
return std::optional(res);
});
}
}
} catch (const zmq::error_t &) {
if (!st.stop_requested()) {
throw;
}
} catch (...) {
if (!st.stop_requested()) {
throw;
}
}
},
st);
}
};

View File

@ -7,15 +7,27 @@
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::PUSH, Callback> : public PortImplBase<Callback> {
// Push transport: binds endpoints and sends fire-and-forget payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::PUSH, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.bind(ep);
}
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
try {
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
}
};
private:
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUSH pattern socket"); }
};

39
src/port_radio_impl.hpp Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <string>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
// Radio transport: binds endpoints and sends group-tagged payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::RADIO, Port, Callback> final : public PortImplBase<Port, Callback> {
public:
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::radio);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.bind(ep);
}
}
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
try {
zmq::message_t msg(data.data(), data.size());
if (!addr.empty()) {
msg.set_group(addr.c_str());
}
this->m_sock__.send(msg, zmq::send_flags::none);
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
}
};
private:
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on RADIO pattern socket"); }
};

View File

@ -1,22 +1,112 @@
#pragma once
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <boost/callable_traits/return_type.hpp>
#include <cstdint>
#include <optional>
#include <stdexcept>
#include <tuple>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::REP, Callback> : public PortImplBase<Callback> {
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include "tuple.hpp"
#include "utils.hpp"
// Reply transport: listens for requests and sends reply payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::REP, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::rep);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.bind(ep);
}
this->m_sock__.set(zmq::sockopt::sndtimeo, static_cast<int32_t>(base_t::sc_recv_timeout_ms__));
// Start async listener loop.
listen__(this->stop_source().get_token());
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
};
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); };
private:
void listen__(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
try {
zmq::poller_t poller;
poller.add(this->m_sock__, zmq::event_flags::pollin);
while (!st.stop_requested()) {
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
uint64_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
for (int32_t i = 0; i < num_events; ++i) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
// Callback should return reply data for REP sockets.
if constexpr (!std::is_void_v<boost::callable_traits::return_type_t<std::remove_cvref_t<decltype(this->mc_cbk__)>>>) {
if (batch.size()) {
auto reply_data = this->mc_cbk__(batch.front(), typehash);
typename base_t::port_payload_s reply_payload = {
.typehash = type_hash<std::remove_cvref_t<typename decltype(reply_data)::value_type>>(),
};
for (const auto &d : reply_data) {
using adapter_in_type_t = std::remove_cvref_t<decltype(d)>;
uint64_t typehash = type_hash<adapter_in_type_t>();
// Find matching encoder by type to build reply payload.
tp::for_each(this->mc_port__->adapters(), [&](const auto &e) {
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
if constexpr (std::is_same_v<std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<decltype(adapter->encoder())>>>,
adapter_in_type_t>) {
if (adapter_typehash == typehash) {
reply_payload.data.push_back(adapter->encoder()(d));
}
}
});
}
msgpack::sbuffer buf;
msgpack::pack(buf, reply_payload);
// Send reply for this item.
this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none);
}
} else {
throw std::runtime_error("Callback of REPLY pattern socket should return non-void value");
}
return std::optional(res);
});
}
}
} catch (const zmq::error_t &) {
if (!st.stop_requested()) {
throw;
}
} catch (...) {
if (!st.stop_requested()) {
throw;
}
}
},
st);
}
};

View File

@ -2,21 +2,54 @@
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <cstdint>
#include <optional>
#include <stdexcept>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::REQ, Callback> : public PortImplBase<Callback> {
// Request transport: sends a payload and waits for a reply.
template <typename Port, typename Callback> class PortImpl<port_types_e::REQ, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
}
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::req);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.connect(ep);
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
// Avoid blocking forever if no REP is available.
this->m_sock__.set(zmq::sockopt::rcvtimeo, static_cast<int32_t>(base_t::sc_recv_timeout_ms__));
}
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
try {
zmq::message_t reply;
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
this->m_sock__.recv(reply, zmq::recv_flags::none).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(reply.data()), reply.size());
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
for (const auto &data : batch) {
this->mc_cbk__(data, typehash);
}
return std::optional(res);
});
} catch (const zmq::error_t &err) {
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
throw;
}
};
private:
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on REQUEST pattern socket"); }
};

View File

@ -2,21 +2,26 @@
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <chrono>
#include <thread>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::ROUTER, Callback> : public PortImplBase<Callback> {
// Router transport placeholder (no send/recv logic yet).
template <typename Port, typename Callback> class PortImpl<port_types_e::ROUTER, Port, Callback> : public PortImplBase<Port, Callback> {
public:
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
void listen(std::stop_token st) const override {
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {}
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {};
private:
void listen__(std::stop_token st) const override {
while (!st.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
}
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
};

View File

@ -2,54 +2,93 @@
#include "port_impl_base.hpp"
#include "port_types.hpp"
#include <chrono>
#include <optional>
#include <ranges>
#include <stop_token>
#include <sys/socket.h>
#include <utility>
#include <zmq.hpp>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
template <typename Callback> class PortImpl<port_types_e::SUB, Callback> : public PortImplBase<Callback> {
// Subscriber transport: connects, subscribes to topics, and dispatches payloads.
template <typename Port, typename Callback> class PortImpl<port_types_e::SUB, Port, Callback> : public PortImplBase<Port, Callback> {
public:
using base_t = PortImplBase<Callback>;
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list<std::string> &topics, Callback &&callback)
: PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)), mc_topics_(topics) {
using base_t = PortImplBase<Port, Callback>;
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, const std::list<std::string> &topics, Callback &&callback)
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)), mc_topics_(topics) {
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub);
this->m_sock__.connect(this->mc_endpoint__);
for (const auto &[_, ep] : this->mc_endpoints__) {
this->m_sock__.connect(ep);
}
// Subscribe to each topic prefix.
for (const auto &topic : mc_topics_) {
this->m_sock__.set(zmq::sockopt::subscribe, topic);
}
// Start async listener loop.
listen__(this->stop_source().get_token());
}
void listen(std::stop_token st) const override {
// Send to socket depending on implementation
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); };
private:
const std::list<std::string> mc_topics_;
void listen__(std::stop_token st) const override {
this->m_listener_thread__ = std::async(
std::launch::async,
[this](std::stop_token st) {
try {
zmq::poller_t poller;
poller.add(this->m_sock__, zmq::event_flags::pollin);
while (!st.stop_requested()) {
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
for (int32_t i = 0; i < num_events; ++i) {
zmq::message_t msg;
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
fmt::print("Received envelope: {}\r\n", std::string(static_cast<const char *>(msg.data()), msg.size()));
// First frame is the topic envelope.
std::string envelope = std::string(static_cast<const char *>(msg.data()), msg.size());
this->m_sock__.recv(msg).and_then([&](const auto &res) {
typename base_t::port_payload_s payload;
msgpack::sbuffer buf;
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
const auto [typehash, data] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
// Dispatch each decoded item.
for (const auto &data : batch) {
this->mc_cbk__(data, typehash, envelope);
}
this->mc_cbk__(data, typehash);
return std::optional(res);
});
return std::optional(res);
});
}
}
} catch (const zmq::error_t &) {
if (!st.stop_requested()) {
throw;
}
} catch (...) {
if (!st.stop_requested()) {
throw;
}
}
},
st);
}
// Send to socket depending on implementation
void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); };
private:
const std::list<std::string> mc_topics_;
};

View File

@ -2,17 +2,39 @@
#include <cstdint>
// Supported ZMQ socket patterns for ports.
enum class port_types_e : uint32_t {
UNKNOWN = 0,
PUB,
SUB,
REQ,
REP,
ROUTER,
DEALER,
PUSH,
PULL,
PAIR,
PAIR_CLIENT,
PAIR_SERVER,
RADIO,
DISH,
ROUTER,
DEALER,
};
template <enum port_types_e, typename...> class PortImpl;
// Endpoint tag base type.
struct endpoints_base_s {};
template <auto> struct endpoints_s;
template<> struct endpoints_s<port_types_e::PUB> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::SUB> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::REQ> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::REP> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::PUSH> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::PULL> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::PAIR_CLIENT> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::PAIR_SERVER> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::RADIO> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::DISH> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::ROUTER> : endpoints_base_s {};
template<> struct endpoints_s<port_types_e::DEALER> : endpoints_base_s {};

View File

@ -10,4 +10,5 @@
#include "port_router_impl.hpp"
#include "port_dealer_impl.hpp"
#include "port_pair_impl.hpp"
#include "port_radio_impl.hpp"
#include "port_dish_impl.hpp"

View File

@ -1,116 +1,602 @@
#include <boost/signals2/signal_type.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <chrono>
#include <cstdint>
#include <list>
#include <string>
#include <thread>
#include <tuple>
#include <zmq.hpp>
#include <unordered_map>
#define FMT_HEADER_ONLY
#include <fmt/format.h>
#include <fmt/ranges.h>
#include "adapter_factory.hpp"
#include "codecs.hpp"
#include "module_factory.hpp"
#include "port_factory.hpp"
void entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<std::vector<uint8_t>> *> &ports) {
const auto &subscriber = *ports.at("subscriber");
const auto &publisher = *ports.at("publisher");
using env_data_type_t = std::vector<uint8_t>;
auto &a = subscriber.callback<int32_t>("int-string-int");
auto &b = subscriber.callback<std::string>("string-string-string");
void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port
a.connect([](const int32_t &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name<decltype(i)>()); });
b.connect([](const std::string &i) -> void { fmt::print("Echo: {}, typename: {}\r\n", i, type_name<decltype(i)>()); });
// Fetch typed callbacks by adapter name and signature.
auto &int_cbk = subscriber.callback<void(const int32_t &, const std::string &)>("int-vector<uint8_t>-int");
auto &string_cbk = subscriber.callback<void(const std::string &, const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = subscriber.callback<void(const double &, const std::string &)>("double-vector<uint8_t>-double");
subscriber.listen();
// Connect callbacks
int_cbk.connect([](const int32_t &i, const std::string &addr) {
fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>(), addr);
});
string_cbk.connect([](const std::string &s, const std::string &addr) {
fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>(), addr);
});
double_cbk.connect([](const double &d, const std::string &addr) {
fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>(), addr);
});
}
// Publisher module entrypoint
void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &publisher = *ports.at("publisher_port"); // Get publisher port
// Publish data
publisher["topic0"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic1"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic2"] << 1 << 2 << double{3.f} << std::string("test");
publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test");
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000u));
subscriber.stop();
// Publisher module entrypoint
void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &publisher = *ports.at("push_port"); // Get publisher port
publisher << 1 << 2 << double{3.f} << std::string("test");
}
void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &puller = *ports.at("pull_port"); // Get subscriber port
// Fetch typed callbacks by adapter name and signature.
auto &int_cbk = puller.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
auto &string_cbk = puller.callback<void(const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = puller.callback<void(const double &)>("double-vector<uint8_t>-double");
// Connect callbacks
int_cbk.connect([](const int32_t &i) { fmt::print("PULL socket: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>()); });
string_cbk.connect([](const std::string &s) { fmt::print("PULL socket: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>()); });
double_cbk.connect([](const double &d) { fmt::print("PULL socket: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>()); });
}
void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &req = *ports.at("req_port"); // Get publisher port
// static_assert(std::is_same_v<typename std::remove_cvref_t<decltype(req)>, void>, "");
// Fetch typed callbacks by adapter name and signature.
auto &int_cbk = req.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
auto &string_cbk = req.callback<void(const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = req.callback<void(const double &)>("double-vector<uint8_t>-double");
// Connect callbacks
int_cbk.connect([](const int32_t &i) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>()); });
string_cbk.connect([](const std::string &s) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>()); });
double_cbk.connect([](const double &d) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>()); });
req << 1 << 2 << double{3.f} << std::string("test");
}
void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &rep = *ports.at("rep_port"); // Get subscriber port
// Fetch typed callbacks by adapter name and signature.
auto &int_cbk = rep.callback<std::string(const int32_t &)>("int-vector<uint8_t>-int");
auto &string_cbk = rep.callback<std::string(const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = rep.callback<std::string(const double &)>("double-vector<uint8_t>-double");
// Connect callbacks
int_cbk.connect([](const int32_t &i) -> std::string {
fmt::print("REPLY socket: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>());
// Handle data ...
return fmt::format("'Handle request: {} of type: {}'", i, type_name<std::remove_cvref_t<decltype(i)>>());
});
string_cbk.connect([](const std::string &s) -> std::string {
fmt::print("REPLY socket: got data: {} of {} type (FIRST callback)\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>());
// Handle data ...
return fmt::format("'Handle request: {} of type: {}' (FIRST callback)", s, type_name<std::remove_cvref_t<decltype(s)>>());
});
string_cbk.connect([](const std::string &s) -> std::string {
fmt::print("REPLY socket: got data: {} of {} type (SECOND callback)\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>());
// Handle data ...
return fmt::format("'Handle request: {} of type: {}' (SECOND callback)", s, type_name<std::remove_cvref_t<decltype(s)>>());
});
double_cbk.connect([](const double &d) -> std::string {
fmt::print("REPLY socket: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>());
// Handle data ...
return fmt::format("'Handle request: {} of type: {}'", d, type_name<std::remove_cvref_t<decltype(d)>>());
});
}
void pair_server_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &pair = *ports.at("pair_server_port");
auto &int_cbk = pair.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
auto &string_cbk = pair.callback<void(const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = pair.callback<void(const double &)>("double-vector<uint8_t>-double");
int_cbk.connect([](const int32_t &i) { fmt::print("PAIR server: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>()); });
string_cbk.connect([](const std::string &s) { fmt::print("PAIR server: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>()); });
double_cbk.connect([](const double &d) { fmt::print("PAIR server: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>()); });
std::this_thread::sleep_for(std::chrono::milliseconds(50));
pair << 4 << 5 << double{6.f} << std::string("server->client");
}
void pair_client_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &pair = *ports.at("pair_client_port");
auto &int_cbk = pair.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
auto &string_cbk = pair.callback<void(const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = pair.callback<void(const double &)>("double-vector<uint8_t>-double");
int_cbk.connect([](const int32_t &i) { fmt::print("PAIR client: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>()); });
string_cbk.connect([](const std::string &s) { fmt::print("PAIR client: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>()); });
double_cbk.connect([](const double &d) { fmt::print("PAIR client: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>()); });
pair << 1 << 2 << double{3.f} << std::string("test");
}
void dish_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &dish = *ports.at("dish_port");
auto &int_cbk = dish.callback<void(const int32_t &, const std::string &)>("int-vector<uint8_t>-int");
auto &string_cbk = dish.callback<void(const std::string &, const std::string &)>("string-vector<uint8_t>-string");
auto &double_cbk = dish.callback<void(const double &, const std::string &)>("double-vector<uint8_t>-double");
int_cbk.connect([](const int32_t &i, const std::string &group) {
fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>(), group);
});
string_cbk.connect([](const std::string &s, const std::string &group) {
fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>(), group);
});
double_cbk.connect([](const double &d, const std::string &group) {
fmt::print("DISH socket: got data: {} of {} type from group: {}\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>(), group);
});
}
void radio_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
// Resolve port by name.
const auto &radio = *ports.at("radio_port");
radio["grp0"] << 1 << 2 << double{3.f};
radio["grp1"] << std::string("test");
}
int main(int argc, char *argv[], char *envp[]) {
using enum port_types_e;
zmq::context_t zmq_ctx;
codecs_s<env_data_type_t> codecs;
// Shared ZMQ context for in-process transports.
zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads
auto mod = makeModule(argc, argv, envp, "test_mod", zmq_ctx,
std::tuple{
makePort(PUB, "publisher", "inproc://publisher_port", zmq_ctx,
std::tuple{
makeAdapter("int-string-int", std::pair{
[](const int32_t &i) -> std::vector<uint8_t> {
auto str = std::to_string(i + 5);
return {str.begin(), str.end()};
},
// Make module that contains only 1 port working on PUBLISHER pattern
auto publisher_module = ModuleBuilder()
.withName("publisher_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(PUB)
.withName("publisher_port")
.withEndpoints({
{"test", "inproc://PUB-SUB"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
[](const std::vector<uint8_t> &s) -> int32_t { return 5; },
}),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test";
return {str.begin(), str.end()};
},
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}),
// Make module that contains only 1 port working on SUBSCRIBER pattern
auto subscriber_module = ModuleBuilder()
.withName("subscriber_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(SUB)
.withName("subscriber_port")
.withEndpoints({
{"test", "inproc://PUB-SUB"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &, const std::string &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()};
},
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &, const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
[](const std::vector<uint8_t> &s) -> double { return .1f; },
}),
}),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &, const std::string &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.withArgs(std::tuple{
std::list<std::string>{"topic0", "topic1", "topic2", "topic3"},
})
.finalize(),
})
.finalize(argc, argv, envp);
makePort(SUB, "subscriber", "inproc://publisher_port", zmq_ctx,
std::tuple{
makeAdapter("int-string-int", std::pair{
[](const int32_t &i) -> std::vector<uint8_t> {
auto str = std::to_string(i + 5);
return {str.begin(), str.end()};
},
auto pusher_module = ModuleBuilder()
.withName("pusher_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(PUSH)
.withName("push_port")
.withEndpoints({
{"test", "inproc://PUSH-PULL"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
[](const std::vector<uint8_t> &s) -> int32_t { return 5; },
}),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
makeAdapter("string-string-string", std::pair{
[](const std::string &i) -> std::vector<uint8_t> {
auto str = i + "_test";
return {str.begin(), str.end()};
},
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
[](const std::vector<uint8_t> &i) -> std::string { return "works!"; },
}),
auto puller_module = ModuleBuilder()
.withName("puller_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(PULL)
.withName("pull_port")
.withEndpoints({
{"test", "inproc://PUSH-PULL"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
makeAdapter("double-string-double", std::pair{
[](const double &i) -> std::vector<uint8_t> {
auto str = std::to_string(i / 2.f);
return {str.begin(), str.end()};
},
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
[](const std::vector<uint8_t> &s) -> double { return .1f; },
}),
},
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
std::tuple{
std::list<std::string>{
"topic0",
"topic1",
"topic2",
"topic3",
},
}),
});
auto req_module = ModuleBuilder()
.withName("req_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(REQ)
.withName("req_port")
.withEndpoints({
{"test", "inproc://REQ-REP"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
auto rep_module = ModuleBuilder()
.withName("rep_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(REP)
.withName("rep_port")
.withEndpoints({
{"test", "inproc://REQ-REP"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<std::string(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<std::string(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<std::string(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
auto pair_server_module = ModuleBuilder()
.withName("pair_server_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(PAIR_SERVER)
.withName("pair_server_port")
.withEndpoints({
{"test", "inproc://PAIR"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
auto pair_client_module = ModuleBuilder()
.withName("pair_client_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(PAIR_CLIENT)
.withName("pair_client_port")
.withEndpoints({
{"test", "inproc://PAIR"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
auto dish_module = ModuleBuilder()
.withName("dish_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(DISH)
.withName("dish_port")
.withEndpoints({
{"test", "inproc://RADIO-DISH"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &, const std::string &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &, const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &, const std::string &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.withArgs(std::tuple{
std::list<std::string>{"grp0", "grp1"},
})
.finalize(),
})
.finalize(argc, argv, envp);
auto radio_module = ModuleBuilder()
.withName("radio_module")
.withContext(zmq_ctx)
.withPorts(std::tuple{
PortBuilder()
.withType(RADIO)
.withName("radio_port")
.withEndpoints({
{"test", "inproc://RADIO-DISH"},
})
.withContext(zmq_ctx)
.withAdapters(std::tuple{
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_int)
.decodeDataBy(&codecs.decoders.to_int)
.withCallbackSignature<void(const int32_t &)>()
.withName("int-vector<uint8_t>-int")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_string)
.decodeDataBy(&codecs.decoders.to_string)
.withCallbackSignature<void(const std::string &)>()
.withName("string-vector<uint8_t>-string")
.finalize(),
AdapterBuilder()
.encodeDataBy(&codecs.encoders.from_double)
.decodeDataBy(&codecs.decoders.to_double)
.withCallbackSignature<void(const double &)>()
.withName("double-vector<uint8_t>-double")
.finalize(),
})
.finalize(),
})
.finalize(argc, argv, envp);
fmt::print("\r\nPUB-SUB test:\r\n");
subscriber_module->run(subscriber_entry); // Subscribe and get data
publisher_module->run(publisher_entry); // Publish data
std::this_thread::sleep_for(std::chrono::milliseconds(500));
fmt::print("\r\nPUSH-PULL test:\r\n");
pusher_module->run(pusher_entry);
puller_module->run(puller_entry);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
fmt::print("\r\nREQ-REP test:\r\n");
rep_module->run(rep_entry);
req_module->run(req_entry);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
fmt::print("\r\nPAIR test:\r\n");
pair_server_module->run(pair_server_entry);
pair_client_module->run(pair_client_entry);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
fmt::print("\r\nRADIO-DISH test:\r\n");
dish_module->run(dish_entry);
radio_module->run(radio_entry);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
fmt::print("DONE!\r\n");
mod->run(entry);
return 0;
}

127
src/tuple.hpp Normal file
View File

@ -0,0 +1,127 @@
#pragma once
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <functional>
#include <optional>
#include <stdexcept>
#include <tuple>
#include <utility>
// Tuple utilities for compile-time iteration and transformations.
namespace tp {
namespace detail {
template <typename Func, typename... Args> constexpr Func for_each_arg(Func f, Args &&...args) {
(f(std::forward<Args>(args)), ...);
return f;
}
template <typename Tuple, typename Func, std::size_t... I> constexpr Func for_each_impl(Tuple &&t, Func &&f, std::index_sequence<I...> is) {
(std::forward<Func>(f)(std::get<I>(std::forward<Tuple>(t))), ...);
return f;
}
template <typename... Ts, typename Function, size_t... Is> auto transform_impl(std::tuple<Ts...> const &inputs, Function function, std::index_sequence<Is...> is) {
return std::tuple<std::invoke_result_t<Function(Ts)>...>{function(std::get<Is>(inputs))...};
}
template <typename... T, std::size_t... i> auto subtuple(const std::tuple<T...> &t, std::index_sequence<i...>) { return std::make_tuple(std::get<i>(t)...); }
// ZIP utilities
template <std::size_t I, typename... Tuples> using zip_tuple_at_index_t = std::tuple<std::tuple_element_t<I, std::decay_t<Tuples>>...>;
template <std::size_t I, typename... Tuples> zip_tuple_at_index_t<I, Tuples...> zip_tuple_at_index(Tuples &&...tuples) { return {std::get<I>(std::forward<Tuples>(tuples))...}; }
template <typename... Tuples, std::size_t... I> std::tuple<zip_tuple_at_index_t<I, Tuples...>...> tuple_zip_impl(Tuples &&...tuples, std::index_sequence<I...>) {
return {zip_tuple_at_index<I>(std::forward<Tuples>(tuples)...)...};
}
}; // namespace detail
template <class Tuple, class F> constexpr decltype(auto) for_each(Tuple &&tuple, F &&f) {
return []<std::size_t... I>(Tuple &&tuple, F &&f, std::index_sequence<I...>) {
(f(std::get<I>(tuple)), ...);
return f;
}(std::forward<Tuple>(tuple), std::forward<F>(f), std::make_index_sequence<std::tuple_size<std::remove_reference_t<Tuple>>::value>{});
}
template <typename... Ts, typename Function> auto transform(std::tuple<Ts...> const &inputs, Function function) {
return detail::transform_impl(inputs, function, std::make_index_sequence<sizeof...(Ts)>{});
}
template <typename Tuple, typename Predicate> constexpr size_t find_if(Tuple &&tuple, Predicate pred) {
size_t index = std::tuple_size<std::remove_reference_t<Tuple>>::value;
size_t currentIndex = 0;
bool found = false;
for_each(tuple, [&](auto &&value) {
if (!found && pred(value)) {
index = currentIndex;
found = true;
}
++currentIndex;
});
return index;
}
template <typename Tuple, typename Action> void perform(Tuple &&tuple, size_t index, Action action) {
size_t currentIndex = 0;
for_each(tuple, [&action, index, &currentIndex](auto &&value) {
if (currentIndex == index) {
action(std::forward<decltype(value)>(value));
}
++currentIndex;
});
}
template <typename Tuple, typename Predicate> bool all_of(Tuple &&tuple, Predicate pred) {
return find_if(tuple, std::not_fn(pred)) == std::tuple_size<std::decay_t<Tuple>>::value;
}
template <typename Tuple, typename Predicate> bool none_of(Tuple &&tuple, Predicate pred) { return find_if(tuple, pred) == std::tuple_size<std::decay_t<Tuple>>::value; }
template <typename Tuple, typename Predicate> bool any_of(Tuple &&tuple, Predicate pred) { return !none_of(tuple, pred); }
template <typename Tuple, typename Function> Tuple &operator|(Tuple &&tuple, Function func) {
for_each(tuple, func);
return tuple;
}
template <int trim, typename... T> auto subtuple(const std::tuple<T...> &t) { return detail::subtuple(t, std::make_index_sequence<sizeof...(T) - trim>()); }
template <size_t starting, size_t elems, class Tuple, class Seq = decltype(std::make_index_sequence<elems>())> struct sub_range;
template <size_t starting, size_t elems, class... Args, size_t... indx> struct sub_range<starting, elems, std::tuple<Args...>, std::index_sequence<indx...>> {
static_assert(elems <= sizeof...(Args) - starting, "sub range is out of bounds!");
using tuple = std::tuple<std::tuple_element_t<indx + starting, std::tuple<Args...>>...>;
};
template <typename Tuple, std::size_t... Ints> auto select_tuple(Tuple &&tuple, std::index_sequence<Ints...>) {
return std::tuple<std::tuple_element_t<Ints, Tuple>...>(std::get<Ints>(std::forward<Tuple>(tuple))...);
}
template <class T, class Tuple> struct tuple_index;
template <class T, class... Types> struct tuple_index<T, std::tuple<T, Types...>> {
static const std::size_t value = 0;
};
template <class T, class U, class... Types> struct tuple_index<T, std::tuple<U, Types...>> {
static const std::size_t value = 1 + tuple_index<T, std::tuple<Types...>>::value;
};
// ZIP
template <typename Head, typename... Tail>
requires((std::tuple_size_v<std::decay_t<Tail>> == std::tuple_size_v<std::decay_t<Head>>) && ...)
auto tuple_zip(Head &&head, Tail &&...tail) {
return detail::tuple_zip_impl<Head, Tail...>(std::forward<Head>(head), std::forward<Tail>(tail)..., std::make_index_sequence<std::tuple_size_v<std::decay_t<Head>>>());
}
// TAIL
template <typename T> struct tuple_tail;
template <typename Head, typename... Tail> struct tuple_tail<std::tuple<Head, Tail...>> {
using type = std::tuple<Tail...>;
}; //
}; // namespace tp

45
src/utils.hpp Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include <cstdint>
#include <string_view>
// Trait to detect template specializations.
template <class T, template <class...> class Template> struct is_specialization : std::false_type {};
template <template <class...> class Template, class... Args> struct is_specialization<Template<Args...>, Template> : std::true_type {};
// Best-effort type name string for debugging.
template <class T> constexpr std::string_view type_name() {
using namespace std;
#ifdef __clang__
string_view p = __PRETTY_FUNCTION__;
return string_view(p.data() + 34, p.size() - 34 - 1);
#elif defined(__GNUC__)
string_view p = __PRETTY_FUNCTION__;
# if __cplusplus < 201402
return string_view(p.data() + 36, p.size() - 36 - 1);
# else
return string_view(p.data() + 49, p.find(';', 49) - 49);
# endif
#elif defined(_MSC_VER)
string_view p = __FUNCSIG__;
return string_view(p.data() + 84, p.size() - 84 - 7);
#endif
}
// Compile-time FNV-1a hash for stable type IDs across builds.
constexpr uint64_t fnv1a_64(std::string_view sv) {
uint64_t hash = 14695981039346656037ull;
for (const char &c : sv) {
hash ^= static_cast<uint8_t>(c);
hash *= 1099511628211ull;
}
return hash;
}
template <typename T> constexpr uint64_t type_hash() { return fnv1a_64(type_name<T>()); }
// Type tag wrapper used to pass types through templates.
template <typename T> struct tag_s {
using type = T;
};