fixes
This commit is contained in:
parent
2af4150ce2
commit
3dd9584b2a
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
using namespace boost::callable_traits;
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
|
// Adapter that pairs an encoder/decoder with a callback signal.
|
||||||
template<typename ...> class Adapter;
|
template<typename ...> class Adapter;
|
||||||
template <typename Encoder, typename Decoder, typename CallbackRetTypeTag, typename... CbkAargs>
|
template <typename Encoder, typename Decoder, typename CallbackRetTypeTag, typename... CbkAargs>
|
||||||
requires(std::tuple_size_v<args_t<Decoder>> == 1 && // Decoder has only 1 argument
|
requires(std::tuple_size_v<args_t<Decoder>> == 1 && // Decoder has only 1 argument
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,16 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "utils.hpp"
|
|
||||||
#include <boost/signals2.hpp>
|
#include <boost/signals2.hpp>
|
||||||
#include <boost/signals2/variadic_signal.hpp>
|
#include <boost/signals2/variadic_signal.hpp>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
|
||||||
using namespace boost::signals2;
|
using namespace boost::signals2;
|
||||||
|
|
||||||
|
// Base adapter interface exposing a typed callback signal and name.
|
||||||
template <typename InType, typename CallbackRetTypeTag, typename... CbkAargs> class AdapterBase {
|
template <typename InType, typename CallbackRetTypeTag, typename... CbkAargs> class AdapterBase {
|
||||||
using cbk_ret_type_t_ = typename CallbackRetTypeTag::type;
|
using cbk_ret_type_t_ = typename CallbackRetTypeTag::type;
|
||||||
|
|
||||||
// If return type is not void we combine all callback ivocation results to std::vector
|
// If return type is not void we combine all callback invocation results.
|
||||||
class CollectAllCombiner_ {
|
class CollectAllCombiner_ {
|
||||||
public:
|
public:
|
||||||
using result_type = std::conditional_t<!std::is_void_v<cbk_ret_type_t_>, std::vector<cbk_ret_type_t_>, std::false_type>;
|
using result_type = std::conditional_t<!std::is_void_v<cbk_ret_type_t_>, std::vector<cbk_ret_type_t_>, std::false_type>;
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "utils.hpp"
|
||||||
#include <boost/callable_traits/args.hpp>
|
#include <boost/callable_traits/args.hpp>
|
||||||
#include <boost/callable_traits/return_type.hpp>
|
#include <boost/callable_traits/return_type.hpp>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
|
|
@ -11,6 +12,7 @@ using namespace boost::callable_traits;
|
||||||
template <typename...> class AdapterBuilder;
|
template <typename...> class AdapterBuilder;
|
||||||
template <typename...> class AdapterBuilderNamed;
|
template <typename...> class AdapterBuilderNamed;
|
||||||
|
|
||||||
|
// Fluent builder to assemble an Adapter from encoder/decoder/signature/name.
|
||||||
template <> class AdapterBuilder<> {
|
template <> class AdapterBuilder<> {
|
||||||
public:
|
public:
|
||||||
template <typename Encoder> AdapterBuilder<Encoder> encodeDataBy(Encoder &&encoder) { return AdapterBuilder<Encoder>(std::forward<Encoder>(encoder)); }
|
template <typename Encoder> AdapterBuilder<Encoder> encodeDataBy(Encoder &&encoder) { return AdapterBuilder<Encoder>(std::forward<Encoder>(encoder)); }
|
||||||
|
|
@ -18,51 +20,55 @@ public:
|
||||||
|
|
||||||
template <typename Encoder> class AdapterBuilder<Encoder> {
|
template <typename Encoder> class AdapterBuilder<Encoder> {
|
||||||
public:
|
public:
|
||||||
AdapterBuilder(Encoder &&encoder) : m_encoder_ref_(std::forward<Encoder>(encoder)) {}
|
AdapterBuilder(Encoder &&encoder) : m_encoder_(std::forward<Encoder>(encoder)) {}
|
||||||
template <typename Decoder> AdapterBuilder<Encoder, Decoder> decodeDataBy(Decoder &&decoder) {
|
template <typename Decoder> AdapterBuilder<Encoder, Decoder> decodeDataBy(Decoder &&decoder) {
|
||||||
return AdapterBuilder<Encoder, Decoder>(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(decoder));
|
// Advance builder with decoder selected.
|
||||||
|
return AdapterBuilder<Encoder, Decoder>(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(decoder));
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Encoder &&m_encoder_ref_;
|
std::decay_t<Encoder> m_encoder_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename Encoder, typename Decoder> class AdapterBuilder<Encoder, Decoder> {
|
template <typename Encoder, typename Decoder> class AdapterBuilder<Encoder, Decoder> {
|
||||||
public:
|
public:
|
||||||
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_ref_(std::forward<Encoder>(encoder)), m_decoder_ref_(std::forward<Decoder>(decoder)) {}
|
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward<Encoder>(encoder)), m_decoder_(std::forward<Decoder>(decoder)) {}
|
||||||
template <typename CallbackSignature> AdapterBuilder<Encoder, Decoder, CallbackSignature> withCallbackSignature() {
|
template <typename CallbackSignature> AdapterBuilder<Encoder, Decoder, CallbackSignature> withCallbackSignature() {
|
||||||
return AdapterBuilder<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(m_decoder_ref_));
|
// Advance builder with callback signature selected.
|
||||||
|
return AdapterBuilder<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(m_decoder_));
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Encoder &&m_encoder_ref_;
|
std::decay_t<Encoder> m_encoder_;
|
||||||
Decoder &&m_decoder_ref_;
|
std::decay_t<Decoder> m_decoder_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilder<Encoder, Decoder, CallbackSignature> {
|
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilder<Encoder, Decoder, CallbackSignature> {
|
||||||
public:
|
public:
|
||||||
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_ref_(std::forward<Encoder>(encoder)), m_decoder_ref_(std::forward<Decoder>(decoder)) {}
|
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_(std::forward<Encoder>(encoder)), m_decoder_(std::forward<Decoder>(decoder)) {}
|
||||||
AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> withName(const std::string &name) {
|
AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> withName(const std::string &name) {
|
||||||
return AdapterBuilderNamed<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(m_decoder_ref_), name);
|
// Advance builder with adapter name selected.
|
||||||
|
return AdapterBuilderNamed<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(m_decoder_), name);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Encoder &&m_encoder_ref_;
|
std::decay_t<Encoder> m_encoder_;
|
||||||
Decoder &&m_decoder_ref_;
|
std::decay_t<Decoder> m_decoder_;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> {
|
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> {
|
||||||
public:
|
public:
|
||||||
AdapterBuilderNamed(Encoder &&encoder, Decoder &&decoder, const std::string &name)
|
AdapterBuilderNamed(Encoder &&encoder, Decoder &&decoder, const std::string &name)
|
||||||
: m_encoder_ref_(std::forward<Encoder>(encoder)), m_decoder_ref_(std::forward<Decoder>(decoder)), mc_name_(name) {}
|
: m_encoder_(std::forward<Encoder>(encoder)), m_decoder_(std::forward<Decoder>(decoder)), mc_name_(name) {}
|
||||||
|
|
||||||
auto finalize() {
|
auto finalize() {
|
||||||
|
// Produce a concrete Adapter instance.
|
||||||
return std::make_unique<Adapter<Encoder, Decoder, tag_s<return_type_t<CallbackSignature>>, typename tp::tuple_tail<args_t<CallbackSignature>>::type>>(
|
return std::make_unique<Adapter<Encoder, Decoder, tag_s<return_type_t<CallbackSignature>>, typename tp::tuple_tail<args_t<CallbackSignature>>::type>>(
|
||||||
mc_name_, std::make_tuple(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(m_decoder_ref_), tag_s<return_type_t<CallbackSignature>>{}));
|
mc_name_, std::make_tuple(std::forward<Encoder>(m_encoder_), std::forward<Decoder>(m_decoder_), tag_s<return_type_t<CallbackSignature>>{}));
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const std::string &mc_name_;
|
const std::string mc_name_;
|
||||||
Encoder &&m_encoder_ref_;
|
std::decay_t<Encoder> m_encoder_;
|
||||||
Decoder &&m_decoder_ref_;
|
std::decay_t<Decoder> m_decoder_;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,13 @@
|
||||||
#include "port_base.hpp"
|
#include "port_base.hpp"
|
||||||
#include "tuple.hpp"
|
#include "tuple.hpp"
|
||||||
|
|
||||||
|
// Module type hint (currently unused).
|
||||||
enum class module_type_e : uint32_t {
|
enum class module_type_e : uint32_t {
|
||||||
STANDALONE,
|
STANDALONE,
|
||||||
INCOMPOSITION,
|
INCOMPOSITION,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Base interface for a named module; exposes ports and run entrypoint.
|
||||||
template <typename EnvDataType> class ModuleBase {
|
template <typename EnvDataType> class ModuleBase {
|
||||||
public:
|
public:
|
||||||
ModuleBase(int32_t argc, char **argv, char **envp, const std::string &name)
|
ModuleBase(int32_t argc, char **argv, char **envp, const std::string &name)
|
||||||
|
|
@ -39,6 +41,7 @@ private:
|
||||||
} mc_cli_args_;
|
} mc_cli_args_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Concrete module that owns typed ports and dispatches entry with a port map.
|
||||||
template <typename... Ports> class Module : public ModuleBase<std::tuple_element_t<0, std::tuple<typename Ports::port_data_type_t...>>> {
|
template <typename... Ports> class Module : public ModuleBase<std::tuple_element_t<0, std::tuple<typename Ports::port_data_type_t...>>> {
|
||||||
public:
|
public:
|
||||||
using port_data_type_t = std::tuple_element_t<0, std::tuple<typename Ports::port_data_type_t...>>;
|
using port_data_type_t = std::tuple_element_t<0, std::tuple<typename Ports::port_data_type_t...>>;
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
#include "module.hpp"
|
#include "module.hpp"
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
// Factory for building a Module from owned ports.
|
||||||
template <typename... Ports>
|
template <typename... Ports>
|
||||||
auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Ports>...> &&ports) {
|
auto makeModule(int32_t argc, char **argv, char **envp, const std::string &name, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Ports>...> &&ports) {
|
||||||
return std::make_unique<Module<Ports...>>(argc, argv, envp, name, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Ports>...>>(ports));
|
return std::make_unique<Module<Ports...>>(argc, argv, envp, name, zmq_ctx, std::forward<std::tuple<std::unique_ptr<Ports>...>>(ports));
|
||||||
|
|
|
||||||
37
src/port.hpp
37
src/port.hpp
|
|
@ -16,14 +16,26 @@
|
||||||
|
|
||||||
using namespace boost::callable_traits;
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
|
// Typed port implementation owning adapters and a ZMQ transport.
|
||||||
template <typename...> class Port;
|
template <typename...> class Port;
|
||||||
template <typename... Adapters, typename... Args>
|
template <typename... Adapters, typename... Args>
|
||||||
requires(
|
requires(
|
||||||
|
// Check return types
|
||||||
(std::is_same_v<typename Adapters::base_t::callback_type_t::result_type, typename std::tuple_element_t<0, std::tuple<Adapters...>>::base_t::callback_type_t::result_type> &&
|
(std::is_same_v<typename Adapters::base_t::callback_type_t::result_type, typename std::tuple_element_t<0, std::tuple<Adapters...>>::base_t::callback_type_t::result_type> &&
|
||||||
...) &&
|
...) &&
|
||||||
|
|
||||||
|
// Check number of args
|
||||||
((std::tuple_size_v<args_t<typename Adapters::base_t::callback_type_t::signature_type>> ==
|
((std::tuple_size_v<args_t<typename Adapters::base_t::callback_type_t::signature_type>> ==
|
||||||
std::tuple_size_v<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>) &&
|
std::tuple_size_v<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>) &&
|
||||||
...))
|
...) &&
|
||||||
|
|
||||||
|
// Unique adapters check
|
||||||
|
([]<typename... Ts>() consteval {
|
||||||
|
return []<std::size_t... Is>(std::index_sequence<Is...>) consteval {
|
||||||
|
using tuple_t = std::tuple<Ts...>;
|
||||||
|
return ((tp::tuple_index<std::tuple_element_t<Is, tuple_t>, tuple_t>::value == Is) && ...);
|
||||||
|
}(std::index_sequence_for<Ts...>{});
|
||||||
|
}.template operator()<std::remove_cvref_t<return_type_t<typename Adapters::decoder_type_t>>...>()))
|
||||||
|
|
||||||
class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> {
|
class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> {
|
||||||
public:
|
public:
|
||||||
|
|
@ -46,10 +58,12 @@ public:
|
||||||
// fmt::print("Adding callback: name: {}, namehash: {}, typehash: {}, cbk_typehash: {}, cbk_type: {}\r\n", std::get<Idx>(adapters)->name(),
|
// fmt::print("Adding callback: name: {}, namehash: {}, typehash: {}, cbk_typehash: {}, cbk_type: {}\r\n", std::get<Idx>(adapters)->name(),
|
||||||
// std::hash<std::string>()(std::get<Idx>(adapters)->name()), typeid(adapter_input_type_t).hash_code(), typeid(adapter_callback_type_t).hash_code(),
|
// std::hash<std::string>()(std::get<Idx>(adapters)->name()), typeid(adapter_input_type_t).hash_code(), typeid(adapter_callback_type_t).hash_code(),
|
||||||
// type_name<adapter_callback_type_t>());
|
// type_name<adapter_callback_type_t>());
|
||||||
|
// Cache name, name hash, input type hash, callback type hash, and adapter pointer.
|
||||||
return std::make_tuple(std::get<Idx>(adapters)->name(), std::hash<std::string>()(std::get<Idx>(adapters)->name()), typeid(adapter_input_type_t).hash_code(),
|
return std::make_tuple(std::get<Idx>(adapters)->name(), std::hash<std::string>()(std::get<Idx>(adapters)->name()), typeid(adapter_input_type_t).hash_code(),
|
||||||
typeid(adapter_callback_type_t).hash_code(), std::forward<std::unique_ptr<adapter_type_t>>(std::get<Idx>(adapters)));
|
typeid(adapter_callback_type_t).hash_code(), std::forward<std::unique_ptr<adapter_type_t>>(std::get<Idx>(adapters)));
|
||||||
}.template operator()<Ids>()...);
|
}.template operator()<Ids>()...);
|
||||||
}(std::make_index_sequence<sizeof...(Adapters)>{})) {
|
}(std::make_index_sequence<sizeof...(Adapters)>{})) {
|
||||||
|
// Instantiate the port implementation with any extra args (e.g., SUB topics).
|
||||||
std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoints), std::forward<Args>(args)...); }, std::forward<decltype(args)>(args));
|
std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoints), std::forward<Args>(args)...); }, std::forward<decltype(args)>(args));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -64,7 +78,11 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void stop__() const override { m_impl__->stop_source().request_stop(); }
|
void stop__() const override { m_impl__->stop_source().request_stop(); }
|
||||||
void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override {
|
void send__(const void *data, size_t size, size_t hash, const std::string &addr = "") const override {
|
||||||
|
if (!addr.empty() && this->type() != port_types_e::PUB) {
|
||||||
|
throw std::runtime_error("Addressed send is only supported for PUB ports");
|
||||||
|
}
|
||||||
|
|
||||||
const void *adapter_ptr = nullptr;
|
const void *adapter_ptr = nullptr;
|
||||||
|
|
||||||
tp::for_each(mc_adapters_, [&](const auto &e) {
|
tp::for_each(mc_adapters_, [&](const auto &e) {
|
||||||
|
|
@ -74,6 +92,7 @@ protected:
|
||||||
using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>;
|
using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>;
|
||||||
adapter_ptr = &adapter;
|
adapter_ptr = &adapter;
|
||||||
|
|
||||||
|
// Encode to the environment type and pack payload.
|
||||||
typename PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>::port_payload_s payload = {
|
typename PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>::port_payload_s payload = {
|
||||||
.typehash = hash,
|
.typehash = hash,
|
||||||
.data = {adapter->encoder()(*reinterpret_cast<const adapter_in_type_t *>(data))},
|
.data = {adapter->encoder()(*reinterpret_cast<const adapter_in_type_t *>(data))},
|
||||||
|
|
@ -81,7 +100,8 @@ protected:
|
||||||
|
|
||||||
msgpack::sbuffer buf;
|
msgpack::sbuffer buf;
|
||||||
msgpack::pack(buf, payload);
|
msgpack::pack(buf, payload);
|
||||||
m_impl__->send(addr, buf);
|
// Send encoded payload via the transport.
|
||||||
|
m_impl__->send(buf, addr);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -96,6 +116,7 @@ protected:
|
||||||
tp::for_each(mc_adapters_, [&](auto &a) {
|
tp::for_each(mc_adapters_, [&](auto &a) {
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = a;
|
auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = a;
|
||||||
|
// Match by adapter name/type and callback signature.
|
||||||
if (adapter_typehash == typehash && adapter_namehash == namehash && adapter_cbk_typehash == cbk_typehash) {
|
if (adapter_typehash == typehash && adapter_namehash == namehash && adapter_cbk_typehash == cbk_typehash) {
|
||||||
ret = reinterpret_cast<void *>(adapter.get());
|
ret = reinterpret_cast<void *>(adapter.get());
|
||||||
}
|
}
|
||||||
|
|
@ -121,11 +142,12 @@ private:
|
||||||
using type_t = std::function<cbk_return_type_t_(const port_data_type_t &, size_t, Aargs &&...)>;
|
using type_t = std::function<cbk_return_type_t_(const port_data_type_t &, size_t, Aargs &&...)>;
|
||||||
|
|
||||||
cbk_return_type_t_ operator()(const port_data_type_t &data, size_t hash, Aargs &&...callback_args) const {
|
cbk_return_type_t_ operator()(const port_data_type_t &data, size_t hash, Aargs &&...callback_args) const {
|
||||||
std::conditional_t<!std::is_void_v<cbk_return_type_t_>, cbk_return_type_t_, std::false_type> ret;
|
std::conditional_t<!std::is_void_v<cbk_return_type_t_>, cbk_return_type_t_, std::false_type> ret{};
|
||||||
|
|
||||||
tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) {
|
tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) {
|
||||||
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
||||||
if (adapter_typehash == hash) {
|
if (adapter_typehash == hash) {
|
||||||
|
// Decode payload and dispatch to the adapter's signal.
|
||||||
if constexpr (std::is_void_v<std::invoke_result_t<std::remove_cvref_t<decltype(adapter->callback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) {
|
if constexpr (std::is_void_v<std::invoke_result_t<std::remove_cvref_t<decltype(adapter->callback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) {
|
||||||
adapter->callback()(adapter->decoder()(data), std::forward<Aargs>(callback_args)...);
|
adapter->callback()(adapter->decoder()(data), std::forward<Aargs>(callback_args)...);
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -143,7 +165,7 @@ private:
|
||||||
const Port *mc_port_;
|
const Port *mc_port_;
|
||||||
};
|
};
|
||||||
|
|
||||||
mutable std::unique_ptr<PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>> m_impl__;
|
mutable std::unique_ptr<PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>> m_impl__{nullptr};
|
||||||
mutable std::tuple<std::tuple<std::string, size_t, size_t, size_t, std::unique_ptr<Adapters>>...> mc_adapters_;
|
mutable std::tuple<std::tuple<std::string, size_t, size_t, size_t, std::unique_ptr<Adapters>>...> mc_adapters_;
|
||||||
|
|
||||||
template <typename... ImplArgs> void init_impl_(enum port_types_e pt, ImplArgs &&...args) const {
|
template <typename... ImplArgs> void init_impl_(enum port_types_e pt, ImplArgs &&...args) const {
|
||||||
|
|
@ -167,11 +189,16 @@ private:
|
||||||
if (type == pt) {
|
if (type == pt) {
|
||||||
using impl_type_t = std::remove_pointer_t<decltype(null_pimpl)>;
|
using impl_type_t = std::remove_pointer_t<decltype(null_pimpl)>;
|
||||||
|
|
||||||
|
// Construct the selected PortImpl if its constructor matches the args.
|
||||||
if constexpr (std::is_constructible_v<impl_type_t, const this_t *, const ImplArgs &..., PortImplCallback_<callback_aargs_t> &&>) {
|
if constexpr (std::is_constructible_v<impl_type_t, const this_t *, const ImplArgs &..., PortImplCallback_<callback_aargs_t> &&>) {
|
||||||
m_impl__ = std::make_unique<impl_type_t>(this, std::forward<ImplArgs>(args)..., PortImplCallback_<callback_aargs_t>(this));
|
m_impl__ = std::make_unique<impl_type_t>(this, std::forward<ImplArgs>(args)..., PortImplCallback_<callback_aargs_t>(this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!m_impl__) {
|
||||||
|
throw std::runtime_error("No PortImpl for port type ...");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,12 +19,14 @@
|
||||||
|
|
||||||
using namespace boost::callable_traits;
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
|
// Abstract port API for sending encoded messages and accessing callbacks.
|
||||||
template <typename EncodedType> class PortBase {
|
template <typename EncodedType> class PortBase {
|
||||||
|
// Helper binding an address/topic to a port for chained sends.
|
||||||
class AddressedPort_ {
|
class AddressedPort_ {
|
||||||
public:
|
public:
|
||||||
AddressedPort_(const PortBase<EncodedType> *port, const std::string &address) : mc_addr_(address), mc_port_(port) {}
|
AddressedPort_(const PortBase<EncodedType> *port, const std::string &address) : mc_addr_(address), mc_port_(port) {}
|
||||||
template <typename InType> const auto &operator<<(const InType &in) const {
|
template <typename InType> const auto &operator<<(const InType &in) const {
|
||||||
mc_port_->send__(mc_addr_, &in, sizeof(InType), typeid(InType).hash_code());
|
mc_port_->send__(&in, sizeof(InType), typeid(InType).hash_code(), mc_addr_);
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -47,18 +49,20 @@ public:
|
||||||
const auto operator[](const std::string &address) const { return AddressedPort_(this, address); }
|
const auto operator[](const std::string &address) const { return AddressedPort_(this, address); }
|
||||||
|
|
||||||
template <typename InType> const PortBase<EncodedType> &operator<<(const InType &in) const {
|
template <typename InType> const PortBase<EncodedType> &operator<<(const InType &in) const {
|
||||||
send__("", &in, sizeof(InType), typeid(InType).hash_code());
|
// Use empty address for non-addressed sends.
|
||||||
|
send__(&in, sizeof(InType), typeid(InType).hash_code());
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void stop() const = 0;
|
virtual void stop() const = 0;
|
||||||
template <typename Signature> auto &callback(const std::string &name) const {
|
template <typename Signature> auto &callback(const std::string &name) const {
|
||||||
|
// Lookup adapter by callback signature and name.
|
||||||
return GetCallbackHelper_<typename tp::tuple_tail<args_t<Signature>>::type>(this).template operator()<Signature>(name);
|
return GetCallbackHelper_<typename tp::tuple_tail<args_t<Signature>>::type>(this).template operator()<Signature>(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void stop__() const = 0;
|
virtual void stop__() const = 0;
|
||||||
virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0;
|
virtual void send__(const void *data, size_t size, size_t type_hash, const std::string &addr = "") const = 0;
|
||||||
virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const = 0;
|
virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const = 0;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
@ -67,6 +71,7 @@ private:
|
||||||
const std::map<std::string, std::string> mc_endpoints_;
|
const std::map<std::string, std::string> mc_endpoints_;
|
||||||
const size_t mc_name_hash_;
|
const size_t mc_name_hash_;
|
||||||
|
|
||||||
|
// Type-safe callback lookup helper.
|
||||||
template <typename...> class GetCallbackHelper_;
|
template <typename...> class GetCallbackHelper_;
|
||||||
template <typename... Aargs> class GetCallbackHelper_<std::tuple<Aargs...>> {
|
template <typename... Aargs> class GetCallbackHelper_<std::tuple<Aargs...>> {
|
||||||
public:
|
public:
|
||||||
|
|
|
||||||
|
|
@ -7,13 +7,14 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Dealer transport placeholder (no send/recv logic yet).
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::DEALER, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::DEALER, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
||||||
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {}
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void listen__(std::stop_token st) const override {
|
void listen__(std::stop_token st) const override {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
#include "port.hpp"
|
#include "port.hpp"
|
||||||
|
|
||||||
|
// Factory for building a Port from adapters and extra args.
|
||||||
template <typename... Adapters, typename... Args>
|
template <typename... Adapters, typename... Args>
|
||||||
auto makePort(enum port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx,
|
auto makePort(enum port_types_e pt, const std::string &name, const std::map<std::string, std::string> &endpoints, zmq::context_t &zmq_ctx,
|
||||||
std::tuple<std::unique_ptr<Adapters>...> &&adapters, std::tuple<Args...> &&args = {}) {
|
std::tuple<std::unique_ptr<Adapters>...> &&adapters, std::tuple<Args...> &&args = {}) {
|
||||||
|
|
|
||||||
|
|
@ -4,14 +4,17 @@
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <msgpack.hpp>
|
#include <msgpack.hpp>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
|
#include <stop_token>
|
||||||
|
|
||||||
#define ZMQ_BUILD_DRAFT_API
|
#define ZMQ_BUILD_DRAFT_API
|
||||||
#include <zmq.hpp>
|
#include <zmq.hpp>
|
||||||
|
|
||||||
|
// Base ZMQ-backed transport for a port; manages socket and listener thread.
|
||||||
template <typename Port, typename Callback> class PortImplBase {
|
template <typename Port, typename Callback> class PortImplBase {
|
||||||
using port_data_type_t_ = std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<Callback>>>;
|
using port_data_type_t_ = std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<Callback>>>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
// Wire payload: type hash + batch of encoded values.
|
||||||
struct port_payload_s {
|
struct port_payload_s {
|
||||||
size_t typehash;
|
size_t typehash;
|
||||||
std::vector<port_data_type_t_> data;
|
std::vector<port_data_type_t_> data;
|
||||||
|
|
@ -22,12 +25,22 @@ public:
|
||||||
: mc_endpoints__(endpoints), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {}
|
: mc_endpoints__(endpoints), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {}
|
||||||
virtual ~PortImplBase() = default;
|
virtual ~PortImplBase() = default;
|
||||||
|
|
||||||
virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0;
|
virtual void send(const msgpack::sbuffer &data, const std::string &addr = "") const = 0;
|
||||||
void close() {
|
void close() {
|
||||||
|
// Close socket first to break any pending waits.
|
||||||
|
try {
|
||||||
m_sock__.close();
|
m_sock__.close();
|
||||||
|
} catch (...) {
|
||||||
|
// Ignore close errors during shutdown.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join listener thread if one was started.
|
||||||
if (m_listener_thread__.valid()) {
|
if (m_listener_thread__.valid()) {
|
||||||
|
try {
|
||||||
m_listener_thread__.get();
|
m_listener_thread__.get();
|
||||||
|
} catch (...) {
|
||||||
|
// Ignore listener exceptions during shutdown.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,11 +7,12 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Pair transport placeholder; currently no full send/recv support.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::PAIR, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PAIR, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
||||||
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {
|
||||||
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push);
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pair);
|
||||||
|
|
||||||
for (const auto &[_, ep] : this->mc_endpoints__) {
|
for (const auto &[_, ep] : this->mc_endpoints__) {
|
||||||
this->m_sock__.bind(ep);
|
this->m_sock__.bind(ep);
|
||||||
|
|
@ -19,7 +20,7 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void listen__(std::stop_token st) const override {
|
void listen__(std::stop_token st) const override {
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Publisher transport: binds endpoints and sends topic + payload frames.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::PUB, Port, Callback> final : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PUB, Port, Callback> final : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
||||||
|
|
@ -21,10 +22,10 @@ public:
|
||||||
~PortImpl() override {}
|
~PortImpl() override {}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
|
||||||
try {
|
try {
|
||||||
this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore);
|
this->m_sock__.send(zmq::message_t(addr), zmq::send_flags::sndmore);
|
||||||
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
|
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
|
||||||
} catch (const zmq::error_t &err) {
|
} catch (const zmq::error_t &err) {
|
||||||
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Pull transport: connects and listens for incoming payloads.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::PULL, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PULL, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
using base_t = PortImplBase<Port, Callback>;
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
|
|
@ -24,13 +25,14 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); };
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); };
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void listen__(std::stop_token st) const override {
|
void listen__(std::stop_token st) const override {
|
||||||
this->m_listener_thread__ = std::async(
|
this->m_listener_thread__ = std::async(
|
||||||
std::launch::async,
|
std::launch::async,
|
||||||
[this](std::stop_token st) {
|
[this](std::stop_token st) {
|
||||||
|
try {
|
||||||
zmq::poller_t poller;
|
zmq::poller_t poller;
|
||||||
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
||||||
|
|
||||||
|
|
@ -56,6 +58,15 @@ private:
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (const zmq::error_t &) {
|
||||||
|
if (!st.stop_requested()) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
if (!st.stop_requested()) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
st);
|
st);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Push transport: binds endpoints and sends fire-and-forget payloads.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::PUSH, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PUSH, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
||||||
|
|
@ -19,9 +20,9 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
|
||||||
try {
|
try {
|
||||||
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
|
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::dontwait);
|
||||||
} catch (const zmq::error_t &err) {
|
} catch (const zmq::error_t &err) {
|
||||||
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@
|
||||||
#include "port_types.hpp"
|
#include "port_types.hpp"
|
||||||
#include "tuple.hpp"
|
#include "tuple.hpp"
|
||||||
|
|
||||||
|
// Reply transport: listens for requests and sends reply payloads.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::REP, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::REP, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
using base_t = PortImplBase<Port, Callback>;
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
|
|
@ -23,17 +24,19 @@ public:
|
||||||
this->m_sock__.bind(ep);
|
this->m_sock__.bind(ep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start async listener loop.
|
||||||
listen__(this->stop_source().get_token());
|
listen__(this->stop_source().get_token());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); };
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); };
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void listen__(std::stop_token st) const override {
|
void listen__(std::stop_token st) const override {
|
||||||
this->m_listener_thread__ = std::async(
|
this->m_listener_thread__ = std::async(
|
||||||
std::launch::async,
|
std::launch::async,
|
||||||
[this](std::stop_token st) {
|
[this](std::stop_token st) {
|
||||||
|
try {
|
||||||
zmq::poller_t poller;
|
zmq::poller_t poller;
|
||||||
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
||||||
|
|
||||||
|
|
@ -51,41 +54,54 @@ private:
|
||||||
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
||||||
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
||||||
|
|
||||||
for (const auto &data : batch) {
|
// Callback should return reply data for REP sockets.
|
||||||
if constexpr (!std::is_void_v<boost::callable_traits::return_type_t<std::remove_cvref_t<decltype(this->mc_cbk__)>>>) {
|
if constexpr (!std::is_void_v<boost::callable_traits::return_type_t<std::remove_cvref_t<decltype(this->mc_cbk__)>>>) {
|
||||||
auto reply_data = this->mc_cbk__(data, typehash);
|
if (batch.size()) {
|
||||||
|
auto reply_data = this->mc_cbk__(batch.front(), typehash);
|
||||||
|
typename base_t::port_payload_s reply_payload = {
|
||||||
|
.typehash = typeid(typename decltype(reply_data)::value_type).hash_code(),
|
||||||
|
};
|
||||||
|
|
||||||
for (const auto &d : reply_data) {
|
for (const auto &d : reply_data) {
|
||||||
using adapter_in_type_t = std::remove_cvref_t<decltype(d)>;
|
using adapter_in_type_t = std::remove_cvref_t<decltype(d)>;
|
||||||
|
|
||||||
size_t typehash = typeid(adapter_in_type_t).hash_code();
|
size_t typehash = typeid(adapter_in_type_t).hash_code();
|
||||||
|
|
||||||
|
// Find matching encoder by type to build reply payload.
|
||||||
tp::for_each(this->mc_port__->adapters(), [&](const auto &e) {
|
tp::for_each(this->mc_port__->adapters(), [&](const auto &e) {
|
||||||
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
||||||
if constexpr (std::is_same_v<std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<decltype(adapter->encoder())>>>,
|
if constexpr (std::is_same_v<std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<decltype(adapter->encoder())>>>,
|
||||||
adapter_in_type_t>) {
|
adapter_in_type_t>) {
|
||||||
if (adapter_typehash == typehash) {
|
if (adapter_typehash == typehash) {
|
||||||
|
reply_payload.data.push_back(adapter->encoder()(d));
|
||||||
typename base_t::port_payload_s payload = {
|
|
||||||
.typehash = typeid(d).hash_code(),
|
|
||||||
.data = {adapter->encoder()(d)},
|
|
||||||
};
|
|
||||||
|
|
||||||
msgpack::sbuffer buf;
|
|
||||||
msgpack::pack(buf, payload);
|
|
||||||
this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
std::runtime_error("Callback of REPLY pattern socket should return non-void value");
|
msgpack::sbuffer buf;
|
||||||
|
msgpack::pack(buf, reply_payload);
|
||||||
|
|
||||||
|
// Send reply for this item.
|
||||||
|
this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
throw std::runtime_error("Callback of REPLY pattern socket should return non-void value");
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::optional(res);
|
return std::optional(res);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (const zmq::error_t &) {
|
||||||
|
if (!st.stop_requested()) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
if (!st.stop_requested()) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
st);
|
st);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Request transport: sends a payload and waits for a reply.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::REQ, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::REQ, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
using base_t = PortImplBase<Port, Callback>;
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
|
|
@ -18,9 +19,12 @@ public:
|
||||||
for (const auto &[_, ep] : this->mc_endpoints__) {
|
for (const auto &[_, ep] : this->mc_endpoints__) {
|
||||||
this->m_sock__.connect(ep);
|
this->m_sock__.connect(ep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Avoid blocking forever if no REP is available.
|
||||||
|
this->m_sock__.set(zmq::sockopt::rcvtimeo, static_cast<int>(base_t::sc_recv_timeout_ms__));
|
||||||
}
|
}
|
||||||
|
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {
|
||||||
try {
|
try {
|
||||||
zmq::message_t reply;
|
zmq::message_t reply;
|
||||||
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
|
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
|
||||||
|
|
|
||||||
|
|
@ -7,13 +7,14 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Router transport placeholder (no send/recv logic yet).
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::ROUTER, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::ROUTER, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::map<std::string, std::string> &endpoints, Callback &&callback)
|
||||||
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {}
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoints, std::forward<Callback>(callback)) {}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override {};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void listen__(std::stop_token st) const override {
|
void listen__(std::stop_token st) const override {
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
|
// Subscriber transport: connects, subscribes to topics, and dispatches payloads.
|
||||||
template <typename Port, typename Callback> class PortImpl<port_types_e::SUB, Port, Callback> : public PortImplBase<Port, Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::SUB, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
using base_t = PortImplBase<Port, Callback>;
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
|
|
@ -24,15 +25,17 @@ public:
|
||||||
this->m_sock__.connect(ep);
|
this->m_sock__.connect(ep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe to each topic prefix.
|
||||||
for (const auto &topic : mc_topics_) {
|
for (const auto &topic : mc_topics_) {
|
||||||
this->m_sock__.set(zmq::sockopt::subscribe, topic);
|
this->m_sock__.set(zmq::sockopt::subscribe, topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start async listener loop.
|
||||||
listen__(this->stop_source().get_token());
|
listen__(this->stop_source().get_token());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); };
|
void send(const msgpack::sbuffer &data, const std::string &addr = "") const override { throw std::runtime_error("Can't send anything on SUBSCRIBER pattern socket"); };
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const std::list<std::string> mc_topics_;
|
const std::list<std::string> mc_topics_;
|
||||||
|
|
@ -41,6 +44,7 @@ private:
|
||||||
this->m_listener_thread__ = std::async(
|
this->m_listener_thread__ = std::async(
|
||||||
std::launch::async,
|
std::launch::async,
|
||||||
[this](std::stop_token st) {
|
[this](std::stop_token st) {
|
||||||
|
try {
|
||||||
zmq::poller_t poller;
|
zmq::poller_t poller;
|
||||||
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
||||||
|
|
||||||
|
|
@ -52,6 +56,7 @@ private:
|
||||||
zmq::message_t msg;
|
zmq::message_t msg;
|
||||||
|
|
||||||
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
|
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
|
||||||
|
// First frame is the topic envelope.
|
||||||
std::string envelope = std::string(static_cast<const char *>(msg.data()), msg.size());
|
std::string envelope = std::string(static_cast<const char *>(msg.data()), msg.size());
|
||||||
|
|
||||||
this->m_sock__.recv(msg).and_then([&](const auto &res) {
|
this->m_sock__.recv(msg).and_then([&](const auto &res) {
|
||||||
|
|
@ -61,6 +66,7 @@ private:
|
||||||
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
||||||
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
||||||
|
|
||||||
|
// Dispatch each decoded item.
|
||||||
for (const auto &data : batch) {
|
for (const auto &data : batch) {
|
||||||
this->mc_cbk__(data, typehash, envelope);
|
this->mc_cbk__(data, typehash, envelope);
|
||||||
}
|
}
|
||||||
|
|
@ -72,6 +78,15 @@ private:
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (const zmq::error_t &) {
|
||||||
|
if (!st.stop_requested()) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
if (!st.stop_requested()) {
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
st);
|
st);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
|
|
||||||
|
// Supported ZMQ socket patterns for ports.
|
||||||
enum class port_types_e : uint32_t {
|
enum class port_types_e : uint32_t {
|
||||||
UNKNOWN = 0,
|
UNKNOWN = 0,
|
||||||
PUB,
|
PUB,
|
||||||
|
|
@ -20,6 +21,7 @@ enum class port_types_e : uint32_t {
|
||||||
|
|
||||||
template <enum port_types_e, typename...> class PortImpl;
|
template <enum port_types_e, typename...> class PortImpl;
|
||||||
|
|
||||||
|
// Endpoint tag base type.
|
||||||
struct endpoints_base_s {};
|
struct endpoints_base_s {};
|
||||||
template <auto> struct endpoints_s;
|
template <auto> struct endpoints_s;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,10 @@
|
||||||
using env_data_type_t = std::vector<uint8_t>;
|
using env_data_type_t = std::vector<uint8_t>;
|
||||||
|
|
||||||
void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
// Resolve port by name.
|
||||||
const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port
|
const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port
|
||||||
|
|
||||||
|
// Fetch typed callbacks by adapter name and signature.
|
||||||
auto &int_cbk = subscriber.callback<void(const int32_t &, const std::string &)>("int-vector<uint8_t>-int");
|
auto &int_cbk = subscriber.callback<void(const int32_t &, const std::string &)>("int-vector<uint8_t>-int");
|
||||||
auto &string_cbk = subscriber.callback<void(const std::string &, const std::string &)>("string-vector<uint8_t>-string");
|
auto &string_cbk = subscriber.callback<void(const std::string &, const std::string &)>("string-vector<uint8_t>-string");
|
||||||
auto &double_cbk = subscriber.callback<void(const double &, const std::string &)>("double-vector<uint8_t>-double");
|
auto &double_cbk = subscriber.callback<void(const double &, const std::string &)>("double-vector<uint8_t>-double");
|
||||||
|
|
@ -40,6 +42,7 @@ void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unorder
|
||||||
|
|
||||||
// Publisher module entrypoint
|
// Publisher module entrypoint
|
||||||
void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
// Resolve port by name.
|
||||||
const auto &publisher = *ports.at("publisher_port"); // Get publisher port
|
const auto &publisher = *ports.at("publisher_port"); // Get publisher port
|
||||||
|
|
||||||
// Publish data
|
// Publish data
|
||||||
|
|
@ -51,13 +54,16 @@ void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordere
|
||||||
|
|
||||||
// Publisher module entrypoint
|
// Publisher module entrypoint
|
||||||
void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
// Resolve port by name.
|
||||||
const auto &publisher = *ports.at("push_port"); // Get publisher port
|
const auto &publisher = *ports.at("push_port"); // Get publisher port
|
||||||
publisher << 1 << 2 << double{3.f} << std::string("test");
|
publisher << 1 << 2 << double{3.f} << std::string("test");
|
||||||
}
|
}
|
||||||
|
|
||||||
void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
// Resolve port by name.
|
||||||
const auto &puller = *ports.at("pull_port"); // Get subscriber port
|
const auto &puller = *ports.at("pull_port"); // Get subscriber port
|
||||||
|
|
||||||
|
// Fetch typed callbacks by adapter name and signature.
|
||||||
auto &int_cbk = puller.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
|
auto &int_cbk = puller.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
|
||||||
auto &string_cbk = puller.callback<void(const std::string &)>("string-vector<uint8_t>-string");
|
auto &string_cbk = puller.callback<void(const std::string &)>("string-vector<uint8_t>-string");
|
||||||
auto &double_cbk = puller.callback<void(const double &)>("double-vector<uint8_t>-double");
|
auto &double_cbk = puller.callback<void(const double &)>("double-vector<uint8_t>-double");
|
||||||
|
|
@ -69,8 +75,10 @@ void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_m
|
||||||
}
|
}
|
||||||
|
|
||||||
void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
// Resolve port by name.
|
||||||
const auto &req = *ports.at("req_port"); // Get publisher port
|
const auto &req = *ports.at("req_port"); // Get publisher port
|
||||||
// static_assert(std::is_same_v<typename std::remove_cvref_t<decltype(req)>, void>, "");
|
// static_assert(std::is_same_v<typename std::remove_cvref_t<decltype(req)>, void>, "");
|
||||||
|
// Fetch typed callbacks by adapter name and signature.
|
||||||
auto &int_cbk = req.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
|
auto &int_cbk = req.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
|
||||||
auto &string_cbk = req.callback<void(const std::string &)>("string-vector<uint8_t>-string");
|
auto &string_cbk = req.callback<void(const std::string &)>("string-vector<uint8_t>-string");
|
||||||
auto &double_cbk = req.callback<void(const double &)>("double-vector<uint8_t>-double");
|
auto &double_cbk = req.callback<void(const double &)>("double-vector<uint8_t>-double");
|
||||||
|
|
@ -84,8 +92,10 @@ void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<
|
||||||
}
|
}
|
||||||
|
|
||||||
void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
// Resolve port by name.
|
||||||
const auto &rep = *ports.at("rep_port"); // Get subscriber port
|
const auto &rep = *ports.at("rep_port"); // Get subscriber port
|
||||||
|
|
||||||
|
// Fetch typed callbacks by adapter name and signature.
|
||||||
auto &int_cbk = rep.callback<std::string(const int32_t &)>("int-vector<uint8_t>-int");
|
auto &int_cbk = rep.callback<std::string(const int32_t &)>("int-vector<uint8_t>-int");
|
||||||
auto &string_cbk = rep.callback<std::string(const std::string &)>("string-vector<uint8_t>-string");
|
auto &string_cbk = rep.callback<std::string(const std::string &)>("string-vector<uint8_t>-string");
|
||||||
auto &double_cbk = rep.callback<std::string(const double &)>("double-vector<uint8_t>-double");
|
auto &double_cbk = rep.callback<std::string(const double &)>("double-vector<uint8_t>-double");
|
||||||
|
|
@ -98,9 +108,15 @@ void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<
|
||||||
});
|
});
|
||||||
|
|
||||||
string_cbk.connect([](const std::string &s) -> std::string {
|
string_cbk.connect([](const std::string &s) -> std::string {
|
||||||
fmt::print("REPLY socket: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
fmt::print("REPLY socket: got data: {} of {} type (FIRST callback)\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
||||||
// Handle data ...
|
// Handle data ...
|
||||||
return fmt::format("'Handle request: {} of type: {}'", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
return fmt::format("'Handle request: {} of type: {}' (FIRST callback)", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
||||||
|
});
|
||||||
|
|
||||||
|
string_cbk.connect([](const std::string &s) -> std::string {
|
||||||
|
fmt::print("REPLY socket: got data: {} of {} type (SECOND callback)\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
||||||
|
// Handle data ...
|
||||||
|
return fmt::format("'Handle request: {} of type: {}' (SECOND callback)", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
||||||
});
|
});
|
||||||
|
|
||||||
double_cbk.connect([](const double &d) -> std::string {
|
double_cbk.connect([](const double &d) -> std::string {
|
||||||
|
|
@ -113,7 +129,14 @@ void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<
|
||||||
int main(int argc, char *argv[], char *envp[]) {
|
int main(int argc, char *argv[], char *envp[]) {
|
||||||
using enum port_types_e;
|
using enum port_types_e;
|
||||||
codecs_s<env_data_type_t> codecs;
|
codecs_s<env_data_type_t> codecs;
|
||||||
|
// Shared ZMQ context for in-process transports.
|
||||||
zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads
|
zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads
|
||||||
|
auto a = AdapterBuilder();
|
||||||
|
auto b = AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<void(const int32_t &)>()
|
||||||
|
.withName("string-vector<uint8_t>-string");
|
||||||
|
|
||||||
// Make module that contains only 1 port working on PUBLISHER pattern
|
// Make module that contains only 1 port working on PUBLISHER pattern
|
||||||
// TODO: merge all builders to one complex "Module" builder
|
// TODO: merge all builders to one complex "Module" builder
|
||||||
|
|
@ -297,7 +320,7 @@ int main(int argc, char *argv[], char *envp[]) {
|
||||||
AdapterBuilder()
|
AdapterBuilder()
|
||||||
.encodeDataBy(&codecs.encoders.from_string)
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
.decodeDataBy(&codecs.decoders.to_string)
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
.withCallbackSignature<std::string(const std::string)>()
|
.withCallbackSignature<std::string(const std::string &)>()
|
||||||
.withName("string-vector<uint8_t>-string")
|
.withName("string-vector<uint8_t>-string")
|
||||||
.finalize(),
|
.finalize(),
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
|
// Tuple utilities for compile-time iteration and transformations.
|
||||||
namespace tp {
|
namespace tp {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,11 @@
|
||||||
|
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
|
|
||||||
|
// Trait to detect template specializations.
|
||||||
template <class T, template <class...> class Template> struct is_specialization : std::false_type {};
|
template <class T, template <class...> class Template> struct is_specialization : std::false_type {};
|
||||||
template <template <class...> class Template, class... Args> struct is_specialization<Template<Args...>, Template> : std::true_type {};
|
template <template <class...> class Template, class... Args> struct is_specialization<Template<Args...>, Template> : std::true_type {};
|
||||||
|
|
||||||
|
// Best-effort type name string for debugging.
|
||||||
template <class T> constexpr std::string_view type_name() {
|
template <class T> constexpr std::string_view type_name() {
|
||||||
using namespace std;
|
using namespace std;
|
||||||
#ifdef __clang__
|
#ifdef __clang__
|
||||||
|
|
@ -23,6 +25,7 @@ template <class T> constexpr std::string_view type_name() {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type tag wrapper used to pass types through templates.
|
||||||
template <typename T> struct tag_s {
|
template <typename T> struct tag_s {
|
||||||
using type = T;
|
using type = T;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue