add push-pull, req-rep, addresses for pub-sub
This commit is contained in:
parent
81d15c8a1e
commit
394ece667d
|
|
@ -1,5 +1,6 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "src/tuple.hpp"
|
||||||
#include <boost/callable_traits.hpp>
|
#include <boost/callable_traits.hpp>
|
||||||
#include <boost/callable_traits/args.hpp>
|
#include <boost/callable_traits/args.hpp>
|
||||||
#include <boost/callable_traits/return_type.hpp>
|
#include <boost/callable_traits/return_type.hpp>
|
||||||
|
|
@ -16,23 +17,25 @@
|
||||||
|
|
||||||
using namespace boost::callable_traits;
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
template <typename Encoder, typename Decoder>
|
template<typename ...> class Adapter;
|
||||||
requires(
|
template <typename Encoder, typename Decoder, typename CallbackRetTypeTag, typename... CbkAargs>
|
||||||
// Encoder return type is same as Decoder argument type, encoder and decoder have only 1 arg
|
requires(std::tuple_size_v<args_t<Decoder>> == 1 && // Decoder has only 1 argument
|
||||||
std::tuple_size_v<args_t<Decoder>> == 1 && std::is_same_v<return_type_t<Encoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Decoder>>>> &&
|
std::is_same_v<return_type_t<Encoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Decoder>>>> && // Encoder return type is same with decoder argument
|
||||||
std::tuple_size_v<args_t<Encoder>> == 1 && std::is_same_v<return_type_t<Decoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Encoder>>>>)
|
std::tuple_size_v<args_t<Encoder>> == 1 && // Encoder accepts only one argument
|
||||||
|
std::is_same_v<return_type_t<Decoder>, std::remove_cvref_t<std::tuple_element_t<0, args_t<Encoder>>>> // Decoder return type is same with encoder argument
|
||||||
|
)
|
||||||
|
|
||||||
class Adapter : public AdapterBase<std::tuple_element_t<0, boost::callable_traits::args_t<Encoder>>> {
|
class Adapter<Encoder, Decoder, CallbackRetTypeTag, std::tuple<CbkAargs...>> : public AdapterBase<std::tuple_element_t<0, args_t<Encoder>>, CallbackRetTypeTag, CbkAargs...> {
|
||||||
public:
|
public:
|
||||||
using encoder_type_t = Encoder;
|
using encoder_type_t = Encoder;
|
||||||
using decoder_type_t = Decoder;
|
using decoder_type_t = Decoder;
|
||||||
using adapter_type_t = Adapter;
|
|
||||||
using callback_arg_type_t = std::tuple_element_t<0, boost::callable_traits::args_t<Encoder>>;
|
|
||||||
using base_t = AdapterBase<callback_arg_type_t>;
|
|
||||||
using callback_type_t = base_t::callback_type_t;
|
|
||||||
|
|
||||||
Adapter(const std::string &name, std::pair<Encoder, Decoder> &&fns)
|
using callback_arg_type_t = std::tuple_element_t<0, args_t<Encoder>>;
|
||||||
: AdapterBase<callback_arg_type_t>(name), mc_enc_(std::forward<Encoder>(fns.first)), mc_dec_(std::forward<Decoder>(fns.second)) {}
|
using base_t = AdapterBase<callback_arg_type_t, CallbackRetTypeTag, CbkAargs...>;
|
||||||
|
|
||||||
|
Adapter(const std::string &name, std::tuple<Encoder, Decoder, CallbackRetTypeTag> &&fns)
|
||||||
|
: AdapterBase<callback_arg_type_t, CallbackRetTypeTag, CbkAargs...>(name), mc_enc_(std::forward<Encoder>(std::get<0u>(fns))),
|
||||||
|
mc_dec_(std::forward<Decoder>(std::get<1u>(fns))) {}
|
||||||
|
|
||||||
inline const auto &encoder() const { return mc_enc_; }
|
inline const auto &encoder() const { return mc_enc_; }
|
||||||
inline const auto &decoder() const { return mc_dec_; }
|
inline const auto &decoder() const { return mc_dec_; }
|
||||||
|
|
@ -42,5 +45,5 @@ public:
|
||||||
private:
|
private:
|
||||||
const Encoder mc_enc_;
|
const Encoder mc_enc_;
|
||||||
const Decoder mc_dec_;
|
const Decoder mc_dec_;
|
||||||
mutable callback_type_t m_callback_;
|
mutable typename base_t::callback_type_t m_callback_;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,36 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include "utils.hpp"
|
||||||
#include <boost/signals2.hpp>
|
#include <boost/signals2.hpp>
|
||||||
|
#include <boost/signals2/variadic_signal.hpp>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
template <typename InType> class AdapterBase {
|
using namespace boost::signals2;
|
||||||
|
|
||||||
|
template <typename InType, typename CallbackRetTypeTag, typename... CbkAargs> class AdapterBase {
|
||||||
|
using cbk_ret_type_t_ = typename CallbackRetTypeTag::type;
|
||||||
|
|
||||||
|
// If return type is not void we combine all callback ivocation results to std::vector
|
||||||
|
class CollectAllCombiner_ {
|
||||||
public:
|
public:
|
||||||
using callback_type_t = boost::signals2::signal<void(const InType &)>;
|
using result_type = std::conditional_t<!std::is_void_v<cbk_ret_type_t_>, std::vector<cbk_ret_type_t_>, std::false_type>;
|
||||||
|
|
||||||
|
template <typename InputIterator> result_type operator()(InputIterator first, InputIterator last) const {
|
||||||
|
result_type results;
|
||||||
|
if constexpr (!std::is_same_v<result_type, std::false_type>) {
|
||||||
|
for (; first != last; ++first) {
|
||||||
|
results.push_back(*first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
|
// Use combiner if return type is not void
|
||||||
|
using signature_t = std::conditional_t<std::is_void_v<cbk_ret_type_t_>, void(const InType &, CbkAargs &&...), cbk_ret_type_t_(const InType &, CbkAargs &&...)>;
|
||||||
|
using callback_type_t = std::conditional_t<!std::is_void_v<cbk_ret_type_t_>, signal<signature_t, CollectAllCombiner_>, signal<signature_t>>;
|
||||||
|
|
||||||
AdapterBase(const std::string &name) : mc_name_(name) {}
|
AdapterBase(const std::string &name) : mc_name_(name) {}
|
||||||
|
|
||||||
|
|
@ -15,4 +41,3 @@ protected:
|
||||||
private:
|
private:
|
||||||
const std::string mc_name_;
|
const std::string mc_name_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,68 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "adapter.hpp"
|
#include <boost/callable_traits/args.hpp>
|
||||||
|
#include <boost/callable_traits/return_type.hpp>
|
||||||
|
#include <tuple>
|
||||||
|
|
||||||
template <typename Encoder, typename Decoder> auto makeAdapter(const std::string &name, std::pair<Encoder, Decoder> &&fns) {
|
#include "adapter.hpp"
|
||||||
return std::make_unique<Adapter<Encoder, Decoder>>(name, std::forward<std::pair<Encoder, Decoder>>(fns));
|
#include "src/tuple.hpp"
|
||||||
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
|
template <typename...> class AdapterBuilder;
|
||||||
|
template <typename...> class AdapterBuilderNamed;
|
||||||
|
|
||||||
|
template <> class AdapterBuilder<> {
|
||||||
|
public:
|
||||||
|
template <typename Encoder> AdapterBuilder<Encoder> encodeDataBy(Encoder &&encoder) { return AdapterBuilder<Encoder>(std::forward<Encoder>(encoder)); }
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Encoder> class AdapterBuilder<Encoder> {
|
||||||
|
public:
|
||||||
|
AdapterBuilder(Encoder &&encoder) : m_encoder_ref_(std::forward<Encoder>(encoder)) {}
|
||||||
|
template <typename Decoder> AdapterBuilder<Encoder, Decoder> decodeDataBy(Decoder &&decoder) {
|
||||||
|
return AdapterBuilder<Encoder, Decoder>(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(decoder));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Encoder &&m_encoder_ref_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Encoder, typename Decoder> class AdapterBuilder<Encoder, Decoder> {
|
||||||
|
public:
|
||||||
|
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_ref_(std::forward<Encoder>(encoder)), m_decoder_ref_(std::forward<Decoder>(decoder)) {}
|
||||||
|
template <typename CallbackSignature> AdapterBuilder<Encoder, Decoder, CallbackSignature> withCallbackSignature() {
|
||||||
|
return AdapterBuilder<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(m_decoder_ref_));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Encoder &&m_encoder_ref_;
|
||||||
|
Decoder &&m_decoder_ref_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilder<Encoder, Decoder, CallbackSignature> {
|
||||||
|
public:
|
||||||
|
AdapterBuilder(Encoder &&encoder, Decoder &&decoder) : m_encoder_ref_(std::forward<Encoder>(encoder)), m_decoder_ref_(std::forward<Decoder>(decoder)) {}
|
||||||
|
AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> withName(const std::string &name) {
|
||||||
|
return AdapterBuilderNamed<Encoder, Decoder, CallbackSignature>(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(m_decoder_ref_), name);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Encoder &&m_encoder_ref_;
|
||||||
|
Decoder &&m_decoder_ref_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Encoder, typename Decoder, typename CallbackSignature> class AdapterBuilderNamed<Encoder, Decoder, CallbackSignature> {
|
||||||
|
public:
|
||||||
|
AdapterBuilderNamed(Encoder &&encoder, Decoder &&decoder, const std::string &name)
|
||||||
|
: m_encoder_ref_(std::forward<Encoder>(encoder)), m_decoder_ref_(std::forward<Decoder>(decoder)), mc_name_(name) {}
|
||||||
|
|
||||||
|
auto finalize() {
|
||||||
|
return std::make_unique<Adapter<Encoder, Decoder, tag_s<return_type_t<CallbackSignature>>, typename tp::tuple_tail<args_t<CallbackSignature>>::type>>(
|
||||||
|
mc_name_, std::make_tuple(std::forward<Encoder>(m_encoder_ref_), std::forward<Decoder>(m_decoder_ref_), tag_s<return_type_t<CallbackSignature>>{}));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const std::string &mc_name_;
|
||||||
|
Encoder &&m_encoder_ref_;
|
||||||
|
Decoder &&m_decoder_ref_;
|
||||||
|
};
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <zmq.hpp>
|
|
||||||
|
|
||||||
#include "port_base.hpp"
|
#include "port_base.hpp"
|
||||||
#include "tuple.hpp"
|
#include "tuple.hpp"
|
||||||
|
|
|
||||||
105
src/port.hpp
105
src/port.hpp
|
|
@ -5,18 +5,32 @@
|
||||||
#include <boost/callable_traits/return_type.hpp>
|
#include <boost/callable_traits/return_type.hpp>
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
#include <tuple>
|
||||||
|
#include <type_traits>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "port_base.hpp"
|
#include "port_base.hpp"
|
||||||
#include "ports.hpp"
|
#include "ports.hpp"
|
||||||
|
#include "src/port_types.hpp"
|
||||||
#include "tuple.hpp"
|
#include "tuple.hpp"
|
||||||
|
|
||||||
using namespace boost::callable_traits;
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
template <typename...> class Port;
|
template <typename...> class Port;
|
||||||
template <typename... Adapters, typename... Args>
|
template <typename... Adapters, typename... Args>
|
||||||
|
requires(
|
||||||
|
(std::is_same_v<typename Adapters::base_t::callback_type_t::result_type, typename std::tuple_element_t<0, std::tuple<Adapters...>>::base_t::callback_type_t::result_type> &&
|
||||||
|
...) &&
|
||||||
|
((std::tuple_size_v<args_t<typename Adapters::base_t::callback_type_t::signature_type>> ==
|
||||||
|
std::tuple_size_v<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>) &&
|
||||||
|
...))
|
||||||
|
|
||||||
class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> {
|
class Port<std::tuple<Adapters...>, std::tuple<Args...>> : public PortBase<std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>> {
|
||||||
public:
|
public:
|
||||||
|
static constexpr auto callback_args_num = std::tuple_size_v<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>;
|
||||||
|
using this_t = Port<std::tuple<Adapters...>, std::tuple<Args...>>;
|
||||||
using port_data_type_t = std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>;
|
using port_data_type_t = std::tuple_element_t<0, std::tuple<return_type_t<typename Adapters::encoder_type_t>...>>;
|
||||||
|
using callback_aargs_t = tp::tuple_tail<args_t<typename std::tuple_element_t<0, std::tuple<Adapters...>>::callback_type_t::signature_type>>::type;
|
||||||
|
|
||||||
Port(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Adapters>...> &&adapters,
|
Port(enum port_types_e pt, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx, std::tuple<std::unique_ptr<Adapters>...> &&adapters,
|
||||||
std::tuple<Args...> &&args)
|
std::tuple<Args...> &&args)
|
||||||
|
|
@ -26,39 +40,48 @@ public:
|
||||||
mc_adapters_([&]<size_t... Ids>(std::index_sequence<Ids...>) {
|
mc_adapters_([&]<size_t... Ids>(std::index_sequence<Ids...>) {
|
||||||
return std::make_tuple([&]<size_t Idx>() {
|
return std::make_tuple([&]<size_t Idx>() {
|
||||||
using adapter_type_t = std::remove_cvref_t<decltype(*std::get<Idx>(adapters))>;
|
using adapter_type_t = std::remove_cvref_t<decltype(*std::get<Idx>(adapters))>;
|
||||||
using adapter_input_type_t = return_type_t<typename adapter_type_t::decoder_type_t>;
|
using adapter_input_type_t = std::remove_cvref_t<return_type_t<typename adapter_type_t::decoder_type_t>>;
|
||||||
|
using adapter_callback_type_t = std::remove_cvref_t<typename adapter_type_t::base_t::callback_type_t>;
|
||||||
|
|
||||||
return std::make_tuple(std::get<Idx>(adapters)->name(), std::hash<std::string>()(std::get<Idx>(adapters)->name()),
|
// fmt::print("Adding callback: name: {}, namehash: {}, typehash: {}, cbk_typehash: {}, cbk_type: {}\r\n", std::get<Idx>(adapters)->name(),
|
||||||
typeid(std::remove_cvref_t<adapter_input_type_t>).hash_code(), std::forward<std::unique_ptr<adapter_type_t>>(std::get<Idx>(adapters)));
|
// std::hash<std::string>()(std::get<Idx>(adapters)->name()), typeid(adapter_input_type_t).hash_code(), typeid(adapter_callback_type_t).hash_code(),
|
||||||
|
// type_name<adapter_callback_type_t>());
|
||||||
|
return std::make_tuple(std::get<Idx>(adapters)->name(), std::hash<std::string>()(std::get<Idx>(adapters)->name()), typeid(adapter_input_type_t).hash_code(),
|
||||||
|
typeid(adapter_callback_type_t).hash_code(), std::forward<std::unique_ptr<adapter_type_t>>(std::get<Idx>(adapters)));
|
||||||
}.template operator()<Ids>()...);
|
}.template operator()<Ids>()...);
|
||||||
}(std::make_index_sequence<sizeof...(Adapters)>{})) {
|
}(std::make_index_sequence<sizeof...(Adapters)>{})) {
|
||||||
std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward<Args>(args)...); }, std::forward<decltype(args)>(args));
|
std::apply([&, this](auto &&...args) { init_impl_(pt, zmq_ctx, std::move(endpoint), std::forward<Args>(args)...); }, std::forward<decltype(args)>(args));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
~Port() override { stop(); }
|
||||||
|
|
||||||
void stop() const override {
|
void stop() const override {
|
||||||
this->stop__();
|
stop__();
|
||||||
mc_impl_->close();
|
m_impl__->close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline const auto &adapters() const { return mc_adapters_; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
void stop__() const override { m_impl__->stop_source().request_stop(); }
|
||||||
void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override {
|
void send__(const std::string &addr, const void *data, size_t size, size_t hash) const override {
|
||||||
const void *adapter_ptr = nullptr;
|
const void *adapter_ptr = nullptr;
|
||||||
|
|
||||||
tp::for_each(mc_adapters_, [&](const auto &e) {
|
tp::for_each(mc_adapters_, [&](const auto &e) {
|
||||||
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e;
|
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
||||||
|
|
||||||
if (adapter_typehash == hash) {
|
if (adapter_typehash == hash) {
|
||||||
using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>;
|
using adapter_in_type_t = std::remove_cvref_t<return_type_t<decltype(adapter->decoder())>>;
|
||||||
adapter_ptr = &adapter;
|
adapter_ptr = &adapter;
|
||||||
|
|
||||||
typename PortImplBase<cbk_type_t_>::port_payload_s payload = {
|
typename PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>::port_payload_s payload = {
|
||||||
.typehash = hash,
|
.typehash = hash,
|
||||||
.data = adapter->encoder()(*reinterpret_cast<const adapter_in_type_t *>(data)),
|
.data = {adapter->encoder()(*reinterpret_cast<const adapter_in_type_t *>(data))},
|
||||||
};
|
};
|
||||||
|
|
||||||
msgpack::sbuffer buf;
|
msgpack::sbuffer buf;
|
||||||
msgpack::pack(buf, payload);
|
msgpack::pack(buf, payload);
|
||||||
mc_impl_->send(addr, buf);
|
m_impl__->send(addr, buf);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -67,20 +90,21 @@ protected:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const override final {
|
void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const override final {
|
||||||
void *ret = nullptr;
|
void *ret = nullptr;
|
||||||
|
|
||||||
tp::for_each(mc_adapters_, [&](auto &a) {
|
tp::for_each(mc_adapters_, [&](auto &a) {
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = a;
|
auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = a;
|
||||||
if (adapter_typehash == typehash && adapter_namehash == namehash) {
|
if (adapter_typehash == typehash && adapter_namehash == namehash && adapter_cbk_typehash == cbk_typehash) {
|
||||||
ret = reinterpret_cast<void *>(adapter.get());
|
ret = reinterpret_cast<void *>(adapter.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
throw std::runtime_error(fmt::format("No adapter '{}' in port '{}'\r\n", name, this->name()));
|
throw std::runtime_error(
|
||||||
|
fmt::format("No such callback in adapter '{}' in port '{}' (namehash: #{}, typehash: #{}, cbk_typehash: #{})\r\n", name, this->name(), namehash, typehash, cbk_typehash));
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
@ -88,20 +112,47 @@ protected:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using base_t_ = PortBase<port_data_type_t>;
|
using base_t_ = PortBase<port_data_type_t>;
|
||||||
using cbk_type_t_ = std::function<void(const port_data_type_t &, size_t)>;
|
using cbk_return_type_t_ = typename std::tuple_element_t<0, std::tuple<Adapters...>>::base_t::callback_type_t::result_type;
|
||||||
|
|
||||||
mutable std::tuple<std::tuple<std::string, size_t, size_t, std::unique_ptr<Adapters>>...> mc_adapters_;
|
template <typename...> class PortImplCallback_;
|
||||||
mutable std::unique_ptr<PortImplBase<cbk_type_t_>> mc_impl_;
|
template <typename... Aargs> class PortImplCallback_<std::tuple<Aargs...>> {
|
||||||
|
public:
|
||||||
|
PortImplCallback_(const Port *port) : mc_port_(port) {}
|
||||||
|
using type_t = std::function<cbk_return_type_t_(const port_data_type_t &, size_t, Aargs &&...)>;
|
||||||
|
|
||||||
void listen__(std::stop_token st) const override { mc_impl_->listen(st); }
|
cbk_return_type_t_ operator()(const port_data_type_t &data, size_t hash, Aargs &&...callback_args) const {
|
||||||
|
std::conditional_t<!std::is_void_v<cbk_return_type_t_>, cbk_return_type_t_, std::false_type> ret;
|
||||||
|
|
||||||
|
tp::for_each(mc_port_->mc_adapters_, [&](const auto &e) {
|
||||||
|
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
||||||
|
if (adapter_typehash == hash) {
|
||||||
|
if constexpr (std::is_void_v<std::invoke_result_t<std::remove_cvref_t<decltype(adapter->callback())>, decltype(adapter->decoder()(data)), Aargs &&...>>) {
|
||||||
|
adapter->callback()(adapter->decoder()(data), std::forward<Aargs>(callback_args)...);
|
||||||
|
} else {
|
||||||
|
ret = adapter->callback()(adapter->decoder()(data), std::forward<Aargs>(callback_args)...);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if constexpr (!std::is_void_v<cbk_return_type_t_>) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const Port *mc_port_;
|
||||||
|
};
|
||||||
|
|
||||||
|
mutable std::unique_ptr<PortImplBase<this_t, PortImplCallback_<callback_aargs_t>>> m_impl__;
|
||||||
|
mutable std::tuple<std::tuple<std::string, size_t, size_t, size_t, std::unique_ptr<Adapters>>...> mc_adapters_;
|
||||||
|
|
||||||
template <typename... ImplArgs> void init_impl_(enum port_types_e pt, ImplArgs &&...args) const {
|
template <typename... ImplArgs> void init_impl_(enum port_types_e pt, ImplArgs &&...args) const {
|
||||||
using enum port_types_e;
|
using enum port_types_e;
|
||||||
static constexpr auto make_null_impl_pair = []<enum port_types_e port_type>() consteval {
|
static constexpr auto make_null_impl_pair = []<enum port_types_e port_type>() consteval {
|
||||||
if constexpr (port_type == UNKNOWN) {
|
if constexpr (port_type == UNKNOWN) {
|
||||||
return std::make_pair(port_type, static_cast<PortImplBase<cbk_type_t_> *>(nullptr));
|
return std::make_pair(port_type, static_cast<PortImplBase<this_t, PortImplCallback_<callback_aargs_t>> *>(nullptr));
|
||||||
} else {
|
} else {
|
||||||
return std::make_pair(port_type, static_cast<PortImpl<port_type, cbk_type_t_> *>(nullptr));
|
return std::make_pair(port_type, static_cast<PortImpl<port_type, this_t, PortImplCallback_<callback_aargs_t>> *>(nullptr));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -116,20 +167,8 @@ private:
|
||||||
if (type == pt) {
|
if (type == pt) {
|
||||||
using impl_type_t = std::remove_pointer_t<decltype(null_pimpl)>;
|
using impl_type_t = std::remove_pointer_t<decltype(null_pimpl)>;
|
||||||
|
|
||||||
if constexpr (std::is_constructible_v<impl_type_t, const ImplArgs &..., cbk_type_t_ &&>) {
|
if constexpr (std::is_constructible_v<impl_type_t, const this_t *, const ImplArgs &..., PortImplCallback_<callback_aargs_t> &&>) {
|
||||||
mc_impl_ = std::make_unique<impl_type_t>(
|
m_impl__ = std::make_unique<impl_type_t>(this, std::forward<ImplArgs>(args)..., PortImplCallback_<callback_aargs_t>(this));
|
||||||
// Args
|
|
||||||
std::forward<ImplArgs>(args)...,
|
|
||||||
|
|
||||||
// Callback
|
|
||||||
[this](const port_data_type_t &data, size_t hash) {
|
|
||||||
tp::for_each(mc_adapters_, [&](const auto &e) {
|
|
||||||
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter] = e;
|
|
||||||
if (adapter_typehash == hash) {
|
|
||||||
adapter->callback()(adapter->decoder()(data));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,12 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <cstdint>
|
#include "src/tuple.hpp"
|
||||||
#include <future>
|
#include <boost/callable_traits/args.hpp>
|
||||||
|
#include <boost/callable_traits/return_type.hpp>
|
||||||
#include <msgpack.hpp>
|
#include <msgpack.hpp>
|
||||||
|
#include <tuple>
|
||||||
|
|
||||||
|
#define ZMQ_BUILD_DRAFT_API
|
||||||
#include <zmq.hpp>
|
#include <zmq.hpp>
|
||||||
|
|
||||||
#define FMT_HEADER_ONLY
|
#define FMT_HEADER_ONLY
|
||||||
|
|
@ -13,6 +17,8 @@
|
||||||
#include "port_types.hpp"
|
#include "port_types.hpp"
|
||||||
#include "utils.hpp"
|
#include "utils.hpp"
|
||||||
|
|
||||||
|
using namespace boost::callable_traits;
|
||||||
|
|
||||||
template <typename EncodedType> class PortBase {
|
template <typename EncodedType> class PortBase {
|
||||||
class AddressedPort_ {
|
class AddressedPort_ {
|
||||||
public:
|
public:
|
||||||
|
|
@ -31,6 +37,8 @@ public:
|
||||||
PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx)
|
PortBase(enum port_types_e port_type, const std::string &name, const std::string &endpoint, zmq::context_t &zmq_ctx)
|
||||||
: mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash<std::string>()(name)), mc_endpoint_hash_(std::hash<std::string>()(endpoint)) {}
|
: mc_type_(port_type), mc_name_(name), mc_endpoint_(endpoint), mc_name_hash_(std::hash<std::string>()(name)), mc_endpoint_hash_(std::hash<std::string>()(endpoint)) {}
|
||||||
|
|
||||||
|
virtual ~PortBase() = default;
|
||||||
|
|
||||||
inline const auto &type() const { return mc_type_; }
|
inline const auto &type() const { return mc_type_; }
|
||||||
inline const auto &name() const { return mc_name_; }
|
inline const auto &name() const { return mc_name_; }
|
||||||
inline const auto &endpoint() const { return mc_endpoint_; }
|
inline const auto &endpoint() const { return mc_endpoint_; }
|
||||||
|
|
@ -44,23 +52,38 @@ public:
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
void listen() const { listen__(m_ss_.get_token()); };
|
// void listen() const { listen__(m_ss_.get_token()); };
|
||||||
virtual void stop() const = 0;
|
virtual void stop() const = 0;
|
||||||
|
|
||||||
template <typename AdapterInType> auto &callback(const std::string &name) const {
|
template <typename Signature> auto &callback(const std::string &name) const {
|
||||||
return (*static_cast<AdapterBase<AdapterInType> *>(get_adapter__(name, std::hash<std::string>()(name), typeid(std::remove_cvref_t<AdapterInType>).hash_code()))).callback();
|
return GetCallbackHelper_<typename tp::tuple_tail<args_t<Signature>>::type>(this).template operator()<Signature>(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void stop__() const { m_ss_.request_stop(); }
|
virtual void stop__() const = 0;
|
||||||
|
|
||||||
virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0;
|
virtual void send__(const std::string &addr, const void *data, size_t size, size_t type_hash) const = 0;
|
||||||
virtual void listen__(std::stop_token) const = 0;
|
virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash, size_t cbk_typehash) const = 0;
|
||||||
virtual void *get_adapter__(const std::string &name, size_t namehash, size_t typehash) const = 0;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const enum port_types_e mc_type_;
|
const enum port_types_e mc_type_;
|
||||||
const std::string mc_name_, mc_endpoint_;
|
const std::string mc_name_, mc_endpoint_;
|
||||||
const size_t mc_name_hash_, mc_endpoint_hash_;
|
const size_t mc_name_hash_, mc_endpoint_hash_;
|
||||||
mutable std::stop_source m_ss_;
|
|
||||||
|
template <typename...> class GetCallbackHelper_;
|
||||||
|
template <typename... Aargs> class GetCallbackHelper_<std::tuple<Aargs...>> {
|
||||||
|
public:
|
||||||
|
GetCallbackHelper_(const PortBase *port) : mc_port_(port) {}
|
||||||
|
|
||||||
|
template <typename Signature> auto &operator()(const std::string &name) const {
|
||||||
|
using ret_type_t = std::remove_cvref_t<return_type_t<std::function<Signature>>>;
|
||||||
|
using arg_type_t = std::tuple_element_t<0, args_t<std::function<Signature>>>;
|
||||||
|
return (*static_cast<AdapterBase<arg_type_t, tag_s<ret_type_t>, Aargs...> *>(
|
||||||
|
mc_port_->get_adapter__(name, std::hash<std::string>()(name), typeid(arg_type_t).hash_code(),
|
||||||
|
typeid(typename AdapterBase<arg_type_t, tag_s<ret_type_t>, Aargs...>::callback_type_t).hash_code())))
|
||||||
|
.callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const PortBase *mc_port_;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -7,16 +7,18 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::DEALER, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::DEALER, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
void listen(std::stop_token st) const override {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
||||||
|
|
||||||
|
// Send to socket depending on implementation
|
||||||
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override {
|
||||||
while (!st.stop_requested()) {
|
while (!st.stop_requested()) {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,23 +4,24 @@
|
||||||
#include <future>
|
#include <future>
|
||||||
#include <msgpack.hpp>
|
#include <msgpack.hpp>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
|
|
||||||
|
#define ZMQ_BUILD_DRAFT_API
|
||||||
#include <zmq.hpp>
|
#include <zmq.hpp>
|
||||||
|
|
||||||
template <typename Callback> class PortImplBase {
|
template <typename Port, typename Callback> class PortImplBase {
|
||||||
using port_data_type_t_ = std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<Callback>>>;
|
using port_data_type_t_ = std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<Callback>>>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
struct port_payload_s {
|
struct port_payload_s {
|
||||||
size_t typehash;
|
size_t typehash;
|
||||||
port_data_type_t_ data;
|
std::vector<port_data_type_t_> data;
|
||||||
MSGPACK_DEFINE(typehash, data);
|
MSGPACK_DEFINE(typehash, data);
|
||||||
};
|
};
|
||||||
|
|
||||||
PortImplBase(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : mc_endpoint__(endpoint), m_ctx__(zmq_ctx), mc_cbk__(callback) {}
|
PortImplBase(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
|
: mc_endpoint__(endpoint), m_ctx__(zmq_ctx), mc_cbk__(callback), mc_port__(port) {}
|
||||||
virtual ~PortImplBase() = default;
|
virtual ~PortImplBase() = default;
|
||||||
|
|
||||||
virtual void listen(std::stop_token st) const = 0;
|
|
||||||
virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0;
|
virtual void send(const std::string &addr, const msgpack::sbuffer &data) const = 0;
|
||||||
void close() {
|
void close() {
|
||||||
m_sock__.close();
|
m_sock__.close();
|
||||||
|
|
@ -30,11 +31,18 @@ public:
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
inline auto &stop_source() { return m_ss_; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
static constexpr auto sc_recv_timeout_ms__ = 1'000u;
|
||||||
mutable std::future<void> m_listener_thread__;
|
mutable std::future<void> m_listener_thread__;
|
||||||
mutable zmq::socket_t m_sock__;
|
mutable zmq::socket_t m_sock__;
|
||||||
zmq::context_t &m_ctx__;
|
zmq::context_t &m_ctx__;
|
||||||
const std::string mc_endpoint__;
|
const std::string mc_endpoint__;
|
||||||
const Callback mc_cbk__;
|
const Callback mc_cbk__;
|
||||||
|
const Port *mc_port__;
|
||||||
|
virtual void listen__(std::stop_token st) const = 0;
|
||||||
|
|
||||||
|
private:
|
||||||
|
mutable std::stop_source m_ss_;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,18 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::PAIR, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PAIR, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
void listen(std::stop_token st) const override {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
||||||
|
|
||||||
|
// Send to socket depending on implementation
|
||||||
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override {
|
||||||
while (!st.stop_requested()) {
|
while (!st.stop_requested()) {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -7,17 +7,16 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::PUB, Callback> final : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PUB, Port, Callback> final : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {
|
||||||
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub);
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pub);
|
||||||
this->m_sock__.bind(this->mc_endpoint__);
|
this->m_sock__.bind(this->mc_endpoint__);
|
||||||
}
|
}
|
||||||
|
|
||||||
~PortImpl() override {}
|
~PortImpl() override {}
|
||||||
|
|
||||||
void listen(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); }
|
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
||||||
try {
|
try {
|
||||||
|
|
@ -27,4 +26,7 @@ public:
|
||||||
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUBLISHER pattern socket"); }
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -2,20 +2,58 @@
|
||||||
|
|
||||||
#include "port_impl_base.hpp"
|
#include "port_impl_base.hpp"
|
||||||
#include "port_types.hpp"
|
#include "port_types.hpp"
|
||||||
|
#include <chrono>
|
||||||
|
#include <zmq.hpp>
|
||||||
|
|
||||||
#define FMT_HEADER_ONLY
|
#define FMT_HEADER_ONLY
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::PULL, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PULL, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
void listen(std::stop_token st) const override {
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
while (!st.stop_requested()) {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::pull);
|
||||||
}
|
this->m_sock__.connect(this->mc_endpoint__);
|
||||||
|
|
||||||
|
listen__(this->stop_source().get_token());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on PULL pattern socket"); };
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override {
|
||||||
|
this->m_listener_thread__ = std::async(
|
||||||
|
std::launch::async,
|
||||||
|
[this](std::stop_token st) {
|
||||||
|
zmq::poller_t poller;
|
||||||
|
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
||||||
|
|
||||||
|
while (!st.stop_requested()) {
|
||||||
|
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
|
||||||
|
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < num_events; ++i) {
|
||||||
|
zmq::message_t msg;
|
||||||
|
|
||||||
|
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
|
||||||
|
typename base_t::port_payload_s payload;
|
||||||
|
|
||||||
|
msgpack::sbuffer buf;
|
||||||
|
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
||||||
|
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
||||||
|
|
||||||
|
for (const auto &data : batch) {
|
||||||
|
this->mc_cbk__(data, typehash);
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::optional(res);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
st);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,23 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::PUSH, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::PUSH, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
void listen(std::stop_token st) const override {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {
|
||||||
while (!st.stop_requested()) {
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::push);
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
this->m_sock__.bind(this->mc_endpoint__);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
||||||
|
try {
|
||||||
|
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
|
||||||
|
} catch (const zmq::error_t &err) {
|
||||||
|
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on PUSH pattern socket"); }
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,22 +1,90 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "port_impl_base.hpp"
|
#include <boost/callable_traits/return_type.hpp>
|
||||||
#include "port_types.hpp"
|
#include <optional>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <tuple>
|
||||||
|
|
||||||
#define FMT_HEADER_ONLY
|
#define FMT_HEADER_ONLY
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::REP, Callback> : public PortImplBase<Callback> {
|
#include "port_impl_base.hpp"
|
||||||
|
#include "port_types.hpp"
|
||||||
|
#include "tuple.hpp"
|
||||||
|
|
||||||
|
template <typename Port, typename Callback> class PortImpl<port_types_e::REP, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
void listen(std::stop_token st) const override {
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
while (!st.stop_requested()) {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::rep);
|
||||||
}
|
this->m_sock__.bind(this->mc_endpoint__);
|
||||||
|
|
||||||
|
listen__(this->stop_source().get_token());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override { throw std::runtime_error("Can't send anything on REPLY pattern socket"); };
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override {
|
||||||
|
this->m_listener_thread__ = std::async(
|
||||||
|
std::launch::async,
|
||||||
|
[this](std::stop_token st) {
|
||||||
|
zmq::poller_t poller;
|
||||||
|
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
||||||
|
|
||||||
|
while (!st.stop_requested()) {
|
||||||
|
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
|
||||||
|
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < num_events; ++i) {
|
||||||
|
zmq::message_t msg;
|
||||||
|
|
||||||
|
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
|
||||||
|
typename base_t::port_payload_s payload;
|
||||||
|
|
||||||
|
msgpack::sbuffer buf;
|
||||||
|
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
||||||
|
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
||||||
|
|
||||||
|
for (const auto &data : batch) {
|
||||||
|
if constexpr (!std::is_void_v<boost::callable_traits::return_type_t<std::remove_cvref_t<decltype(this->mc_cbk__)>>>) {
|
||||||
|
auto reply_data = this->mc_cbk__(data, typehash);
|
||||||
|
for (const auto &d : reply_data) {
|
||||||
|
using adapter_in_type_t = std::remove_cvref_t<decltype(d)>;
|
||||||
|
|
||||||
|
size_t typehash = typeid(adapter_in_type_t).hash_code();
|
||||||
|
|
||||||
|
tp::for_each(this->mc_port__->adapters(), [&](const auto &e) {
|
||||||
|
const auto &[adapter_name, adapter_namehash, adapter_typehash, adapter_cbk_typehash, adapter] = e;
|
||||||
|
if constexpr (std::is_same_v<std::remove_cvref_t<std::tuple_element_t<0, boost::callable_traits::args_t<decltype(adapter->encoder())>>>,
|
||||||
|
adapter_in_type_t>) {
|
||||||
|
if (adapter_typehash == typehash) {
|
||||||
|
|
||||||
|
typename base_t::port_payload_s payload = {
|
||||||
|
.typehash = typeid(d).hash_code(),
|
||||||
|
.data = {adapter->encoder()(d)},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
msgpack::sbuffer buf;
|
||||||
|
msgpack::pack(buf, payload);
|
||||||
|
this->m_sock__.send(zmq::message_t(buf.data(), buf.size()), zmq::send_flags::none);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
std::runtime_error("Callback of REPLY pattern socket should return non-void value");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::optional(res);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
st);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
|
||||||
|
|
@ -7,16 +7,38 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::REQ, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::REQ, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
void listen(std::stop_token st) const override {
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
while (!st.stop_requested()) {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::req);
|
||||||
}
|
this->m_sock__.connect(this->mc_endpoint__);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override {
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
try {
|
||||||
|
zmq::message_t reply;
|
||||||
|
this->m_sock__.send(zmq::message_t(data.data(), data.size()), zmq::send_flags::none);
|
||||||
|
|
||||||
|
this->m_sock__.recv(reply, zmq::recv_flags::none).and_then([&](const auto &res) {
|
||||||
|
typename base_t::port_payload_s payload;
|
||||||
|
|
||||||
|
msgpack::sbuffer buf;
|
||||||
|
buf.write(reinterpret_cast<const char *>(reply.data()), reply.size());
|
||||||
|
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
||||||
|
|
||||||
|
for (const auto &data : batch) {
|
||||||
|
this->mc_cbk__(data, typehash);
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::optional(res);
|
||||||
|
});
|
||||||
|
} catch (const zmq::error_t &err) {
|
||||||
|
fmt::print("ZMQ error: {1} ({0})\r\n", err.num(), err.what());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override { throw std::runtime_error("Can't listen on REQUEST pattern socket"); }
|
||||||
|
};
|
||||||
|
|
|
||||||
|
|
@ -7,16 +7,18 @@
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::ROUTER, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::ROUTER, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback) : PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, Callback &&callback)
|
||||||
void listen(std::stop_token st) const override {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)) {}
|
||||||
|
|
||||||
|
// Send to socket depending on implementation
|
||||||
|
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
||||||
|
|
||||||
|
private:
|
||||||
|
void listen__(std::stop_token st) const override {
|
||||||
while (!st.stop_requested()) {
|
while (!st.stop_requested()) {
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
std::this_thread::sleep_for(std::chrono::milliseconds(100u));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
|
||||||
void send(const std::string &addr, const msgpack::sbuffer &data) const override {};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,48 +2,30 @@
|
||||||
|
|
||||||
#include "port_impl_base.hpp"
|
#include "port_impl_base.hpp"
|
||||||
#include "port_types.hpp"
|
#include "port_types.hpp"
|
||||||
|
#include <chrono>
|
||||||
|
#include <ranges>
|
||||||
|
#include <stop_token>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <utility>
|
||||||
|
#include <zmq.hpp>
|
||||||
|
|
||||||
#define FMT_HEADER_ONLY
|
#define FMT_HEADER_ONLY
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
template <typename Callback> class PortImpl<port_types_e::SUB, Callback> : public PortImplBase<Callback> {
|
template <typename Port, typename Callback> class PortImpl<port_types_e::SUB, Port, Callback> : public PortImplBase<Port, Callback> {
|
||||||
public:
|
public:
|
||||||
using base_t = PortImplBase<Callback>;
|
using base_t = PortImplBase<Port, Callback>;
|
||||||
PortImpl(zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list<std::string> &topics, Callback &&callback)
|
PortImpl(const Port *port, zmq::context_t &zmq_ctx, const std::string &endpoint, const std::list<std::string> &topics, Callback &&callback)
|
||||||
: PortImplBase<Callback>(zmq_ctx, endpoint, std::forward<Callback>(callback)), mc_topics_(topics) {
|
: PortImplBase<Port, Callback>(port, zmq_ctx, endpoint, std::forward<Callback>(callback)), mc_topics_(topics) {
|
||||||
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub);
|
this->m_sock__ = zmq::socket_t(this->m_ctx__, zmq::socket_type::sub);
|
||||||
this->m_sock__.connect(this->mc_endpoint__);
|
this->m_sock__.connect(this->mc_endpoint__);
|
||||||
|
|
||||||
for (const auto &topic : mc_topics_) {
|
for (const auto &topic : mc_topics_) {
|
||||||
this->m_sock__.set(zmq::sockopt::subscribe, topic);
|
this->m_sock__.set(zmq::sockopt::subscribe, topic);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void listen(std::stop_token st) const override {
|
listen__(this->stop_source().get_token());
|
||||||
this->m_listener_thread__ = std::async(
|
|
||||||
std::launch::async,
|
|
||||||
[this](std::stop_token st) {
|
|
||||||
while (!st.stop_requested()) {
|
|
||||||
zmq::message_t msg;
|
|
||||||
|
|
||||||
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
|
|
||||||
this->m_sock__.recv(msg).and_then([&](const auto &res) {
|
|
||||||
typename base_t::port_payload_s payload;
|
|
||||||
|
|
||||||
msgpack::sbuffer buf;
|
|
||||||
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
|
||||||
const auto [typehash, data] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
|
||||||
|
|
||||||
this->mc_cbk__(data, typehash);
|
|
||||||
return std::optional(res);
|
|
||||||
});
|
|
||||||
|
|
||||||
return std::optional(res);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
},
|
|
||||||
st);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to socket depending on implementation
|
// Send to socket depending on implementation
|
||||||
|
|
@ -51,4 +33,43 @@ public:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const std::list<std::string> mc_topics_;
|
const std::list<std::string> mc_topics_;
|
||||||
|
|
||||||
|
void listen__(std::stop_token st) const override {
|
||||||
|
this->m_listener_thread__ = std::async(
|
||||||
|
std::launch::async,
|
||||||
|
[this](std::stop_token st) {
|
||||||
|
zmq::poller_t poller;
|
||||||
|
poller.add(this->m_sock__, zmq::event_flags::pollin);
|
||||||
|
|
||||||
|
while (!st.stop_requested()) {
|
||||||
|
std::vector<zmq::poller_event<zmq::no_user_data>> events(1u);
|
||||||
|
size_t num_events = poller.wait_all(events, std::chrono::milliseconds(base_t::sc_recv_timeout_ms__));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < num_events; ++i) {
|
||||||
|
zmq::message_t msg;
|
||||||
|
|
||||||
|
this->m_sock__.recv(msg, zmq::recv_flags::dontwait).and_then([&](const auto &res) {
|
||||||
|
std::string envelope = std::string(static_cast<const char *>(msg.data()), msg.size());
|
||||||
|
|
||||||
|
this->m_sock__.recv(msg).and_then([&](const auto &res) {
|
||||||
|
typename base_t::port_payload_s payload;
|
||||||
|
|
||||||
|
msgpack::sbuffer buf;
|
||||||
|
buf.write(reinterpret_cast<const char *>(msg.data()), msg.size());
|
||||||
|
const auto &[typehash, batch] = msgpack::unpack(buf.data(), buf.size()).get().convert(payload);
|
||||||
|
|
||||||
|
for (const auto &data : batch) {
|
||||||
|
this->mc_cbk__(data, typehash, envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::optional(res);
|
||||||
|
});
|
||||||
|
|
||||||
|
return std::optional(res);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
st);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,17 @@
|
||||||
#include <boost/signals2/signal_type.hpp>
|
#include <boost/signals2/signal_type.hpp>
|
||||||
#include <charconv>
|
#include <boost/signals2/variadic_signal.hpp>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <tuple>
|
#include <tuple>
|
||||||
|
|
||||||
#include <zmq.hpp>
|
|
||||||
|
|
||||||
#define FMT_HEADER_ONLY
|
#define FMT_HEADER_ONLY
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <fmt/ranges.h>
|
#include <fmt/ranges.h>
|
||||||
|
|
||||||
#include "adapter_factory.hpp"
|
#include "adapter_factory.hpp"
|
||||||
|
#include "codecs.hpp"
|
||||||
#include "module_factory.hpp"
|
#include "module_factory.hpp"
|
||||||
#include "port_factory.hpp"
|
#include "port_factory.hpp"
|
||||||
|
|
||||||
|
|
@ -21,19 +20,22 @@ using env_data_type_t = std::vector<uint8_t>;
|
||||||
void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
void subscriber_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port
|
const auto &subscriber = *ports.at("subscriber_port"); // Get subscriber port
|
||||||
|
|
||||||
auto &int_cbk = subscriber.callback<int32_t>("int-vector<uint8_t>-int");
|
auto &int_cbk = subscriber.callback<void(const int32_t &, const std::string &)>("int-vector<uint8_t>-int");
|
||||||
auto &string_cbk = subscriber.callback<std::string>("string-vector<uint8_t>-string");
|
auto &string_cbk = subscriber.callback<void(const std::string &, const std::string &)>("string-vector<uint8_t>-string");
|
||||||
auto &double_cbk = subscriber.callback<double>("double-vector<uint8_t>-double");
|
auto &double_cbk = subscriber.callback<void(const double &, const std::string &)>("double-vector<uint8_t>-double");
|
||||||
|
|
||||||
// Connect callbacks
|
// Connect callbacks
|
||||||
int_cbk.connect([](const int32_t &i) -> void { fmt::print("SUBSCRIBER: got data: {} of integer type\r\n", i); });
|
int_cbk.connect([](const int32_t &i, const std::string &addr) {
|
||||||
string_cbk.connect([](const std::string &s) -> void { fmt::print("SUBSCRIBER: got data: {} of string type\r\n", s); });
|
fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>(), addr);
|
||||||
double_cbk.connect([](const double &d) -> void { fmt::print("SUBSCRIBER: got data: {} of double type\r\n", d); });
|
});
|
||||||
|
|
||||||
// Listen for 1 second
|
string_cbk.connect([](const std::string &s, const std::string &addr) {
|
||||||
subscriber.listen();
|
fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>(), addr);
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000u));
|
});
|
||||||
subscriber.stop();
|
|
||||||
|
double_cbk.connect([](const double &d, const std::string &addr) {
|
||||||
|
fmt::print("SUBSCRIBER socket: got data: {} of {} type from address: {}\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>(), addr);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publisher module entrypoint
|
// Publisher module entrypoint
|
||||||
|
|
@ -47,67 +49,126 @@ void publisher_entry(int32_t argc, char **argv, char **envp, const std::unordere
|
||||||
publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test");
|
publisher["topic3"] << 1 << 2 << double{3.f} << std::string("test");
|
||||||
}
|
}
|
||||||
|
|
||||||
struct codecs_s {
|
// Publisher module entrypoint
|
||||||
struct encoders_s {
|
void pusher_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
static inline const auto from_int = [](const int32_t &i) -> env_data_type_t {
|
const auto &publisher = *ports.at("push_port"); // Get publisher port
|
||||||
auto str = std::to_string(i); // Convert to string first
|
publisher << 1 << 2 << double{3.f} << std::string("test");
|
||||||
return {str.begin(), str.end()}; // String to byte array
|
|
||||||
};
|
|
||||||
|
|
||||||
static inline const auto from_string = [](const std::string &s) -> env_data_type_t { return {s.begin(), s.end()}; };
|
|
||||||
static inline const auto from_double = [](const double &d) -> env_data_type_t {
|
|
||||||
auto str = std::to_string(d);
|
|
||||||
return {str.begin(), str.end()};
|
|
||||||
};
|
|
||||||
} encoders;
|
|
||||||
|
|
||||||
struct decoders_s {
|
|
||||||
static inline const auto to_int = [](const env_data_type_t &s) -> int32_t {
|
|
||||||
int32_t ret;
|
|
||||||
auto str = std::string(s.begin(), s.end());
|
|
||||||
if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) {
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw std::runtime_error(fmt::format("Invalid convert from {} to integer type", str));
|
void puller_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
}; // Byte array to int
|
const auto &puller = *ports.at("pull_port"); // Get subscriber port
|
||||||
|
|
||||||
static inline const auto to_string = [](const env_data_type_t &i) -> std::string { return std::string(i.begin(), i.end()); };
|
auto &int_cbk = puller.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
|
||||||
static inline const auto to_double = [](const env_data_type_t &s) -> double {
|
auto &string_cbk = puller.callback<void(const std::string &)>("string-vector<uint8_t>-string");
|
||||||
double ret;
|
auto &double_cbk = puller.callback<void(const double &)>("double-vector<uint8_t>-double");
|
||||||
auto str = std::string(s.begin(), s.end());
|
|
||||||
if (std::from_chars(str.c_str(), str.c_str() + str.size(), ret).ec == std::errc{}) {
|
// Connect callbacks
|
||||||
return ret;
|
int_cbk.connect([](const int32_t &i) { fmt::print("PULL socket: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>()); });
|
||||||
|
string_cbk.connect([](const std::string &s) { fmt::print("PULL socket: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>()); });
|
||||||
|
double_cbk.connect([](const double &d) { fmt::print("PULL socket: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>()); });
|
||||||
}
|
}
|
||||||
|
|
||||||
throw std::runtime_error(fmt::format("Invalid convert from {} to double type", str));
|
void req_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
};
|
const auto &req = *ports.at("req_port"); // Get publisher port
|
||||||
} decoders;
|
// static_assert(std::is_same_v<typename std::remove_cvref_t<decltype(req)>, void>, "");
|
||||||
} codecs;
|
auto &int_cbk = req.callback<void(const int32_t &)>("int-vector<uint8_t>-int");
|
||||||
|
auto &string_cbk = req.callback<void(const std::string &)>("string-vector<uint8_t>-string");
|
||||||
|
auto &double_cbk = req.callback<void(const double &)>("double-vector<uint8_t>-double");
|
||||||
|
|
||||||
|
// Connect callbacks
|
||||||
|
int_cbk.connect([](const int32_t &i) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>()); });
|
||||||
|
string_cbk.connect([](const std::string &s) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>()); });
|
||||||
|
double_cbk.connect([](const double &d) { fmt::print("REQUEST socket: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>()); });
|
||||||
|
|
||||||
|
req << 1 << 2 << double{3.f} << std::string("test");
|
||||||
|
}
|
||||||
|
|
||||||
|
void rep_entry(int32_t argc, char **argv, char **envp, const std::unordered_map<std::string, const PortBase<env_data_type_t> *> &ports) {
|
||||||
|
const auto &rep = *ports.at("rep_port"); // Get subscriber port
|
||||||
|
|
||||||
|
auto &int_cbk = rep.callback<std::string(const int32_t &)>("int-vector<uint8_t>-int");
|
||||||
|
auto &string_cbk = rep.callback<std::string(const std::string &)>("string-vector<uint8_t>-string");
|
||||||
|
auto &double_cbk = rep.callback<std::string(const double &)>("double-vector<uint8_t>-double");
|
||||||
|
|
||||||
|
// Connect callbacks
|
||||||
|
int_cbk.connect([](const int32_t &i) -> std::string {
|
||||||
|
fmt::print("REPLY socket: got data: {} of {} type\r\n", i, type_name<std::remove_cvref_t<decltype(i)>>());
|
||||||
|
// Handle data ...
|
||||||
|
return fmt::format("'Handle request: {} of type: {}'", i, type_name<std::remove_cvref_t<decltype(i)>>());
|
||||||
|
});
|
||||||
|
|
||||||
|
string_cbk.connect([](const std::string &s) -> std::string {
|
||||||
|
fmt::print("REPLY socket: got data: {} of {} type\r\n", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
||||||
|
// Handle data ...
|
||||||
|
return fmt::format("'Handle request: {} of type: {}'", s, type_name<std::remove_cvref_t<decltype(s)>>());
|
||||||
|
});
|
||||||
|
|
||||||
|
double_cbk.connect([](const double &d) -> std::string {
|
||||||
|
fmt::print("REPLY socket: got data: {} of {} type\r\n", d, type_name<std::remove_cvref_t<decltype(d)>>());
|
||||||
|
// Handle data ...
|
||||||
|
return fmt::format("'Handle request: {} of type: {}'", d, type_name<std::remove_cvref_t<decltype(d)>>());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[], char *envp[]) {
|
int main(int argc, char *argv[], char *envp[]) {
|
||||||
using enum port_types_e;
|
using enum port_types_e;
|
||||||
|
codecs_s<env_data_type_t> codecs;
|
||||||
zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads
|
zmq::context_t zmq_ctx; // Use common context because both modules have in-process ports, but working in different threads
|
||||||
|
|
||||||
// Make module that contains only 1 port working on PUBLISHER pattern
|
// Make module that contains only 1 port working on PUBLISHER pattern
|
||||||
|
// TODO: merge all builders to one complex "Module" builder
|
||||||
auto publisher_module = makeModule(argc, argv, envp, "publisher_module", zmq_ctx,
|
auto publisher_module = makeModule(argc, argv, envp, "publisher_module", zmq_ctx,
|
||||||
std::tuple{
|
std::tuple{
|
||||||
makePort(PUB, "publisher_port", "inproc://point" /* This port will publish messages here */, zmq_ctx,
|
makePort(PUB, "publisher_port", "inproc://PUB-SUB" /* This port will publish messages here */, zmq_ctx,
|
||||||
std::tuple{
|
std::tuple{
|
||||||
makeAdapter("int-vector<uint8_t>-int", std::pair{codecs.encoders.from_int, codecs.decoders.to_int}),
|
AdapterBuilder()
|
||||||
makeAdapter("string-vector<uint8_t>-string", std::pair{codecs.encoders.from_string, codecs.decoders.to_string}),
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
makeAdapter("double-vector<uint8_t>-double", std::pair{codecs.encoders.from_double, codecs.decoders.to_double}),
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<void(const int32_t &)>()
|
||||||
|
.withName("int-vector<uint8_t>-int")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
|
.withCallbackSignature<void(const std::string)>()
|
||||||
|
.withName("string-vector<uint8_t>-string")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_double)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_double)
|
||||||
|
.withCallbackSignature<void(const double &)>()
|
||||||
|
.withName("double-vector<uint8_t>-double")
|
||||||
|
.finalize(),
|
||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Make module that contains only 1 port working on SUBSCRIBER pattern
|
// Make module that contains only 1 port working on SUBSCRIBER pattern
|
||||||
auto subscriber_module = makeModule(argc, argv, envp, "subscriber_module", zmq_ctx,
|
auto subscriber_module = makeModule(argc, argv, envp, "subscriber_module", zmq_ctx,
|
||||||
std::tuple{
|
std::tuple{
|
||||||
makePort(SUB, "subscriber_port", "inproc://point" /* this port will read data here */, zmq_ctx,
|
makePort(SUB, "subscriber_port", "inproc://PUB-SUB" /* this port will read data here */, zmq_ctx,
|
||||||
std::tuple{
|
std::tuple{
|
||||||
makeAdapter("int-vector<uint8_t>-int", std::pair{codecs.encoders.from_int, codecs.decoders.to_int}),
|
AdapterBuilder()
|
||||||
makeAdapter("string-vector<uint8_t>-string", std::pair{codecs.encoders.from_string, codecs.decoders.to_string}),
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
makeAdapter("double-vector<uint8_t>-double", std::pair{codecs.encoders.from_double, codecs.decoders.to_double}),
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<void(const int32_t &, const std::string &)>()
|
||||||
|
.withName("int-vector<uint8_t>-int")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
|
.withCallbackSignature<void(const std::string &, const std::string &)>()
|
||||||
|
.withName("string-vector<uint8_t>-string")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_double)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_double)
|
||||||
|
.withCallbackSignature<void(const double &, const std::string &)>()
|
||||||
|
.withName("double-vector<uint8_t>-double")
|
||||||
|
.finalize(),
|
||||||
},
|
},
|
||||||
|
|
||||||
// This type of port requires arguments - topics to subscribe
|
// This type of port requires arguments - topics to subscribe
|
||||||
|
|
@ -117,7 +178,129 @@ int main(int argc, char *argv[], char *envp[]) {
|
||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
|
|
||||||
publisher_module->run(publisher_entry); // Publish data
|
auto pusher_module = makeModule(argc, argv, envp, "pusher_module", zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
makePort(PUSH, "push_port", "inproc://PUSH-PULL" /* This port will publish messages here */, zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<void(const int32_t &)>()
|
||||||
|
.withName("int-vector<uint8_t>-int")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
|
.withCallbackSignature<void(const std::string)>()
|
||||||
|
.withName("string-vector<uint8_t>-string")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_double)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_double)
|
||||||
|
.withCallbackSignature<void(const double &)>()
|
||||||
|
.withName("double-vector<uint8_t>-double")
|
||||||
|
.finalize(),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
auto puller_module = makeModule(argc, argv, envp, "puller_module", zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
makePort(PULL, "pull_port", "inproc://PUSH-PULL" /* This port will publish messages here */, zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<void(const int32_t &)>()
|
||||||
|
.withName("int-vector<uint8_t>-int")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
|
.withCallbackSignature<void(const std::string)>()
|
||||||
|
.withName("string-vector<uint8_t>-string")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_double)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_double)
|
||||||
|
.withCallbackSignature<void(const double &)>()
|
||||||
|
.withName("double-vector<uint8_t>-double")
|
||||||
|
.finalize(),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
auto req_module = makeModule(argc, argv, envp, "req_module", zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
makePort(REQ, "req_port", "inproc://REQ-REP" /* This port will publish messages here */, zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<void(const int32_t &)>()
|
||||||
|
.withName("int-vector<uint8_t>-int")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
|
.withCallbackSignature<void(const std::string)>()
|
||||||
|
.withName("string-vector<uint8_t>-string")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_double)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_double)
|
||||||
|
.withCallbackSignature<void(const double &)>()
|
||||||
|
.withName("double-vector<uint8_t>-double")
|
||||||
|
.finalize(),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
auto rep_module = makeModule(argc, argv, envp, "rep_module", zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
makePort(REP, "rep_port", "inproc://REQ-REP" /* This port will publish messages here */, zmq_ctx,
|
||||||
|
std::tuple{
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_int)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_int)
|
||||||
|
.withCallbackSignature<std::string(const int32_t &)>()
|
||||||
|
.withName("int-vector<uint8_t>-int")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_string)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_string)
|
||||||
|
.withCallbackSignature<std::string(const std::string)>()
|
||||||
|
.withName("string-vector<uint8_t>-string")
|
||||||
|
.finalize(),
|
||||||
|
|
||||||
|
AdapterBuilder()
|
||||||
|
.encodeDataBy(&codecs.encoders.from_double)
|
||||||
|
.decodeDataBy(&codecs.decoders.to_double)
|
||||||
|
.withCallbackSignature<std::string(const double &)>()
|
||||||
|
.withName("double-vector<uint8_t>-double")
|
||||||
|
.finalize(),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
fmt::print("\r\nPUB-SUB test:\r\n");
|
||||||
subscriber_module->run(subscriber_entry); // Subscribe and get data
|
subscriber_module->run(subscriber_entry); // Subscribe and get data
|
||||||
|
publisher_module->run(publisher_entry); // Publish data
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||||
|
|
||||||
|
fmt::print("\r\nPUSH-PULL test:\r\n");
|
||||||
|
pusher_module->run(pusher_entry);
|
||||||
|
puller_module->run(puller_entry);
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||||
|
|
||||||
|
fmt::print("\r\nREQ-REP test:\r\n");
|
||||||
|
rep_module->run(rep_entry);
|
||||||
|
req_module->run(req_entry);
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||||
|
fmt::print("DONE!\r\n");
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -117,4 +117,10 @@ template <typename Head, typename... Tail>
|
||||||
auto tuple_zip(Head &&head, Tail &&...tail) {
|
auto tuple_zip(Head &&head, Tail &&...tail) {
|
||||||
return detail::tuple_zip_impl<Head, Tail...>(std::forward<Head>(head), std::forward<Tail>(tail)..., std::make_index_sequence<std::tuple_size_v<std::decay_t<Head>>>());
|
return detail::tuple_zip_impl<Head, Tail...>(std::forward<Head>(head), std::forward<Tail>(tail)..., std::make_index_sequence<std::tuple_size_v<std::decay_t<Head>>>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TAIL
|
||||||
|
template <typename T> struct tuple_tail;
|
||||||
|
template <typename Head, typename... Tail> struct tuple_tail<std::tuple<Head, Tail...>> {
|
||||||
|
using type = std::tuple<Tail...>;
|
||||||
|
}; //
|
||||||
}; // namespace tp
|
}; // namespace tp
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,9 @@
|
||||||
|
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
|
|
||||||
|
template <class T, template <class...> class Template> struct is_specialization : std::false_type {};
|
||||||
|
template <template <class...> class Template, class... Args> struct is_specialization<Template<Args...>, Template> : std::true_type {};
|
||||||
|
|
||||||
template <class T> constexpr std::string_view type_name() {
|
template <class T> constexpr std::string_view type_name() {
|
||||||
using namespace std;
|
using namespace std;
|
||||||
#ifdef __clang__
|
#ifdef __clang__
|
||||||
|
|
@ -19,3 +22,7 @@ template <class T> constexpr std::string_view type_name() {
|
||||||
return string_view(p.data() + 84, p.size() - 84 - 7);
|
return string_view(p.data() + 84, p.size() - 84 - 7);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T> struct tag_s {
|
||||||
|
using type = T;
|
||||||
|
};
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue