#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "broker/builder.hh" #include "broker/configuration.hh" #include "broker/convert.hh" #include "broker/data.hh" #include "broker/endpoint.hh" #include "broker/endpoint_id.hh" #include "broker/internal/configuration_access.hh" #include "broker/internal/endpoint_access.hh" #include "broker/internal/println.hh" #include "broker/internal/type_id.hh" #include "broker/publisher.hh" #include "broker/status.hh" #include "broker/subscriber.hh" #include "broker/topic.hh" #include "broker/variant.hh" #include "broker/variant_list.hh" using namespace std::literals; using std::string; using broker::count; using broker::data; using broker::topic; using broker::internal::native; namespace atom = broker::internal::atom; namespace err = broker::internal::err; namespace out = broker::internal::out; namespace verbose = broker::internal::verbose; // -- additional message and atom types ---------------------------------------- #define BROKER_NODE_ADD_ATOM(name, text) \ CAF_ADD_ATOM(broker_node, broker::atom, name, text) CAF_BEGIN_TYPE_ID_BLOCK(broker_node, id_block::broker_internal::end) BROKER_NODE_ADD_ATOM(blocking, "blocking") BROKER_NODE_ADD_ATOM(relay, "relay") BROKER_NODE_ADD_ATOM(stream, "stream") CAF_END_TYPE_ID_BLOCK(broker_node) // -- process-wide state ------------------------------------------------------- namespace { string node_name; } // namespace // -- URI support for Broker --------------------------------------------------- namespace broker { bool convert(const caf::uri& from, network_info& to) { if (from.empty()) return false; if (from.scheme() != "tcp") return false; const auto& auth = from.authority(); if (auth.empty()) return false; to.address = caf::visit( [](const auto& what) { using what_t = std::decay_t; if constexpr (std::is_same_v) return what; else return to_string(what); }, auth.host); to.port = auth.port; return true; } } // namespace broker // -- CAF setup ---------------------------------------------------------------- using namespace caf; namespace { // -- constants ---------------------------------------------------------------- size_t default_payload_size = 0; timespan default_rendezvous_retry = std::chrono::milliseconds(250); size_t default_ping_count = 100; // -- type aliases ------------------------------------------------------------- using uri_list = std::vector; using topic_list = std::vector; using string_list = std::vector; using mode_fun = void (*)(broker::endpoint&, topic_list); // -- constants ---------------------------------------------------------------- constexpr size_t max_cap = std::numeric_limits::max(); // -- program options ---------------------------------------------------------- void extend_config(broker::configuration& broker_cfg) { auto& cfg = broker::internal::configuration_access(&broker_cfg).cfg(); caf::config_option_adder{cfg.custom_options(), "global"} .add("verbose,v", "print status and debug output") .add("rate,r", "print rate once per second ('relay' mode only)") .add("name,N", "set node name in verbose output") .add("topics,t", "topics for sending/receiving messages") .add("mode,m", "'relay' (default), 'ping', or 'pong'") .add("payload-size,s", "additional number of bytes for the ping message") .add("rendezvous-retry", "timeout before repeating the first rendezvous ping " "message (default: 50ms)") .add("num-messages,n", "number of pings (default: 100, 'ping' mode only)") .add("peers", "list of peers we connect to on startup in " " notation") .add("port,p", "port to listen for incoming Broker peerings") .add("endpoint-id", "forces the endpoint to use this ID") .add("master-stores", "list of stores to attach as masters") .add("clone-stores", "list of stores to attach as clones"); } // -- convenience get_or and get_if overloads for enpoint ---------------------- template auto get_or(broker::endpoint& d, string_view key, const T& default_value) { auto& cfg = broker::internal::endpoint_access(&d).cfg(); return caf::get_or(cfg, key, default_value); } template auto get_as(broker::endpoint& d, string_view key) { auto& cfg = broker::internal::endpoint_access(&d).cfg(); return caf::get_as(cfg, key); } // -- message creation and introspection --------------------------------------- /// @pre `is_ping_msg(x) || is_pong_msg(x)` count msg_id(const broker::variant& x) { return x.to_list().at(1).to_count(); } bool is_ping_msg(const broker::variant& x) { auto&& xs = x.to_list(); return xs.size() == 3 && xs[0].to_string() == "ping" && xs[1].is_count() && xs[2].is_string(); } bool is_pong_msg(const broker::variant& x) { auto&& xs = x.to_list(); return xs.size() == 2 && xs[0].to_string() == "pong" && xs[1].is_count(); } bool is_pong_msg(const broker::variant& x, count id) { return is_pong_msg(x) && msg_id(x) == id; } bool is_stop_msg(const broker::variant& x) { return x.to_string() == "stop"; } broker::variant make_ping_msg(count id, size_t payload_size) { return broker::list_builder{} .add("ping"sv) .add(id) .add(std::string(payload_size, 'x')) .build(); } broker::variant make_pong_msg(count id) { return broker::list_builder{}.add("pong"sv).add(id).build(); } broker::variant make_stop_msg() { return broker::list_builder{}.add("stop"sv).build().to_list().at(0); } // -- mode implementations ----------------------------------------------------- void relay_mode(broker::endpoint& ep, topic_list topics) { verbose::println("relay messages"); auto handle_message = [&](const broker::data_message& x) { auto&& val = get_data(x); if (is_ping_msg(val)) { verbose::println("received ping ", msg_id(val)); } else if (is_pong_msg(val)) { verbose::println("received pong ", msg_id(val)); } else if (is_stop_msg(val)) { verbose::println("received stop"); return false; } return true; }; auto in = ep.make_subscriber(std::move(topics)); auto& cfg = broker::internal::endpoint_access{&ep}.cfg(); if (get_or(cfg, "verbose", false) && get_or(cfg, "rate", false)) { auto timeout = std::chrono::system_clock::now(); timeout += std::chrono::seconds(1); size_t received = 0; for (;;) { if (auto maybe_msg = in.get(timeout)) { auto msg = std::move(*maybe_msg); if (!handle_message(msg)) return; ++received; } else { verbose::println(received, "/s"); timeout += std::chrono::seconds(1); received = 0; } } } else { for (;;) { auto x = in.get(); if (!handle_message(x)) return; } } } void ping_mode(broker::endpoint& ep, topic_list topics) { assert(topics.size() > 0); auto topic = topics[0]; verbose::println("send pings to topic ", topic); std::vector xs; auto n = get_or(ep, "num-messages", default_ping_count); auto s = get_or(ep, "payload-size", default_payload_size); if (n == 0) { err::println("send no pings: n = 0"); return; } auto in = ep.make_subscriber({topic}); // Rendezvous between ping and pong. The first ping (id 0) is not part of our // measurement. We repeat this initial message until we receive a pong to // make sure all broker nodes are up and running. bool connected = false; auto retry_timeout = get_or(ep, "rendezvous-retry", default_rendezvous_retry); ep.publish(topic, make_ping_msg(0, 0)); while (!connected) { auto x = in.get(retry_timeout); if (x) { auto msg = std::move(*x); if (is_pong_msg(get_data(msg), 0)) connected = true; } else { ep.publish(topic, make_ping_msg(0, 0)); } } // Measurement. timespan total_time{0}; for (count i = 1; i <= n; ++i) { bool done = false; auto t0 = std::chrono::system_clock::now(); ep.publish(topic, make_ping_msg(i, s)); do { auto x = in.get(); done = is_pong_msg(get_data(x), i); } while (!done); auto t1 = std::chrono::system_clock::now(); auto roundtrip = std::chrono::duration_cast(t1 - t0); total_time += roundtrip; out::println(roundtrip.count()); } verbose::println("AVG: ", total_time / n); } void pong_mode(broker::endpoint& ep, topic_list topics) { assert(topics.size() > 0); verbose::println("receive pings from topics ", topics); auto in = ep.make_subscriber(std::move(topics)); for (;;) { auto x = in.get(); auto val = get_data(x); if (is_ping_msg(val)) { verbose::println("received ping ", msg_id(val)); ep.publish(get_topic(x), make_pong_msg(msg_id(val))); } else if (is_stop_msg(val)) { verbose::println("received stop"); return; } } } } // namespace // Converts plain port numbers and Zeek-style "/" notation into a // 16-bit port number. std::optional to_port(std::string str) { caf::config_value val{std::move(str)}; if (auto port_num = caf::get_as(val)) return *port_num; if (auto port_obj = caf::get_as(val)) return port_obj->number(); return std::nullopt; } // -- main function ------------------------------------------------------------ int main(int argc, char** argv) try { broker::endpoint::system_guard sys_guard; // Initialize global state. setvbuf(stdout, nullptr, _IOLBF, 0); // Always line-buffer stdout. // Parse CLI parameters using our config. broker::configuration cfg{broker::skip_init}; extend_config(cfg); try { cfg.init(argc, argv); } catch (std::exception& ex) { err::println(ex.what()); return EXIT_FAILURE; } if (cfg.cli_helptext_printed()) return EXIT_SUCCESS; // Pick up BROKER_PORT environment variable. if (auto env = getenv("BROKER_PORT")) { cfg.set("global.port", string{env}); } // Construct the endpoint. broker::endpoint_id eid; if (auto eid_str = get_as(cfg, "endpoint-id")) { if (!convert(*eid_str, eid)) { err::println("endpoint-id must be a valid UUID"); return EXIT_FAILURE; } } else { eid = broker::endpoint_id::random(); } broker::endpoint ep{std::move(cfg), eid}; // Get mode. auto mode = get_or(ep, "mode", "relay"); // Get process name, using the mode name as fallback. node_name = get_or(ep, "name", mode); // Get topics (mandatory) and make sure this endpoint at least forwards them. topic_list topics; { // Lifetime scope of temporary variables. auto topic_names = get_or(ep, "topics", string_list{}); if (topic_names.empty()) { err::println("no topics specified"); return EXIT_FAILURE; } for (auto& topic_name : topic_names) topics.emplace_back(std::move(topic_name)); } ep.forward(topics); // Enable verbose output if demanded by user. actor verbose_logger; if (get_or(ep, "verbose", false)) { verbose::enabled(true); // Launch background worker that prints status and error events when running // in verbose mode. ep.subscribe({topic::errors(), topic::statuses()}, [](const broker::data_message& msg) { verbose::println(msg); }); } // Publish endpoint at demanded port. if (auto port = get_as(ep, "port")) { verbose::println("listen for peers on port ", port->number()); ep.listen({}, port->number()); } // Select function f based on the mode. mode_fun f = nullptr; if (mode == "relay") { f = relay_mode; } else if (mode == "ping") { f = ping_mode; } else if (mode == "pong") { f = pong_mode; } else { err::println("invalid mode: ", mode); return EXIT_FAILURE; } // Attach master stores. std::vector stores; for (const auto& name : get_or(ep, "master-stores", string_list{})) { if (auto maybe_store = ep.attach_master(name, broker::backend::memory)) { stores.emplace_back(std::move(*maybe_store)); } else { err::println("failed to attach master store for ", name, ": ", maybe_store.error()); return EXIT_FAILURE; } } // Attach clone stores. for (const auto& name : get_or(ep, "clone-stores", string_list{})) { if (auto maybe_store = ep.attach_clone(name)) { stores.emplace_back(std::move(*maybe_store)); } else { err::println("failed to attach clone store for ", name, ": ", maybe_store.error()); return EXIT_FAILURE; } } // Connect to peers. auto peers = get_or(ep, "peers", uri_list{}); for (auto& peer : peers) { if (auto info = broker::to(peer)) { verbose::println("connect to ", info->address, " on port ", info->port, " ..."); if (!ep.peer(*info)) err::println("unable to connect to <", peer, '>'); } else { err::println("unrecognized scheme (expected tcp) or no authority in: <", peer, '>'); } } f(ep, std::move(topics)); // Disconnect from peers. for (auto& peer : peers) { auto& auth = peer.authority(); if (peer.scheme() == "tcp" && !auth.empty()) { auto host = caf::deep_to_string(auth.host); auto port = auth.port; verbose::println("diconnect from ", host, " on port ", port, " ..."); ep.unpeer_nosync(host, port); } } // Stop utility actors. anon_send_exit(verbose_logger, exit_reason::user_shutdown); } catch (std::exception& ex) { std::cerr << "*** exception: " << ex.what() << "\n"; return EXIT_FAILURE; }