#include "broker/configuration.hh" #include "broker/convert.hh" #include "broker/data.hh" #include "broker/endpoint.hh" #include "broker/internal/configuration_access.hh" #include "broker/internal/endpoint_access.hh" #include "broker/internal/type_id.hh" #include "broker/publisher.hh" #include "broker/status.hh" #include "broker/subscriber.hh" #include "broker/topic.hh" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace broker; using namespace std::literals; // -- program options ---------------------------------------------------------- namespace { std::string mode; // "master" or "clone" size_t clone_index; // when in "clone" mode: the index of this clone (0-based) size_t num_clones; } // namespace void extend_config(broker::configuration& broker_cfg) { auto& cfg = broker::internal::configuration_access(&broker_cfg).cfg(); caf::config_option_adder{cfg.custom_options(), "global"} .add(mode, "mode,m", "'master' or 'clone'") .add(clone_index, "clone-index,c", "0-based index of the clone") .add(num_clones, "num-clones,n", "number of clones"); } uint16_t rd_env_port(const char* env_var) { // Pick up the environment variable. if (auto env = getenv(env_var)) { // We accept plain port numbers and Zeek-style "/" notation. caf::config_value val{env}; if (auto port_num = caf::get_as(val)) { return *port_num; } else if (auto port_obj = caf::get_as(val)) { return port_obj->number(); } else { fprintf(stderr, "invalid value for %s: (expected a non-zero port number)\n", env_var); ::abort(); } } else { fprintf(stderr, "need environment variable %s\n", env_var); ::abort(); } } void master_mode(endpoint& ep, std::vector cl_ports) { error err; auto port = ep.listen({}, rd_env_port("MASTER_PORT"), &err); if (port == 0) { auto err_str = to_string(err); fprintf(stderr, "endpiont::listen failed: %s\n", err_str.c_str()); ::abort(); } auto sub = ep.make_status_subscriber(true); for (auto cl_port : cl_ports) ep.peer_nosync("localhost", cl_port, 1s); unsigned connected = 0; store foobar; bool running = true; while (running) { auto f = [&](const auto& ev) { printf("event: %s\n", to_string(ev).c_str()); using ev_t = std::decay_t; if constexpr (std::is_same_v) { auto err_str = to_string(ev); fprintf(stderr, "ERROR: %s\n", err_str.c_str()); } else if constexpr (std::is_same_v) { auto code = ev.code(); if (code == sc::peer_added) { ++connected; printf("connected: %u\n", connected); if (connected == num_clones) { foobar = *ep.attach_master("foobar", backend::memory); puts("attached master for 'foobar'"); } } else if (code == sc::peer_removed || code == sc::peer_lost) { --connected; printf("connected: %u\n", connected); if (connected == 0) { puts("no clone left: done"); running = false; } } } }; std::visit(f, sub.get()); } } void run_puts(store::proxy px) { // For random delays between the clones. std::random_device rd; auto sleep_some = [&rd] { std::uniform_int_distribution dist(1, 250); std::this_thread::sleep_for(std::chrono::milliseconds{dist(rd)}); }; // Put the values and wait for responds. std::map entries; entries.emplace("key1", "value1"); entries.emplace("key2", 42); entries.emplace("key3", 1.2); entries.emplace("key4", "hello world"); for (auto& [key, val] : entries) { sleep_some(); auto req_id = px.put_unique(data{key}, val); printf("%s has request ID %d\n", key.c_str(), static_cast(req_id)); } for (auto i = 0; i < 4; ++i) { auto res = px.receive(); auto answer_str = to_string(res.answer); printf("%d -> %s\n", static_cast(res.id), answer_str.c_str()); } } void clone_mode(endpoint& ep, std::vector cl_ports) { error err; auto port = ep.listen({}, cl_ports[clone_index], &err); if (port == 0) { auto err_str = to_string(err); fprintf(stderr, "endpiont::listen failed: %s\n", err_str.c_str()); ::abort(); } for (size_t index = 0; index < num_clones; ++index) if (index != clone_index) ep.peer_nosync("localhost", cl_ports[index], 1s); ep.peer_nosync("localhost", rd_env_port("MASTER_PORT"), 1s); store foobar; foobar = *ep.attach_clone("foobar"); puts("attached clone for 'foobar'"); run_puts(store::proxy{foobar}); } int main(int argc, char** argv) { broker::endpoint::system_guard sys_guard; // Initialize global state. setvbuf(stdout, NULL, _IOLBF, 0); // Always line-buffer stdout. // Parse CLI parameters using our config. broker::configuration cfg{broker::skip_init}; extend_config(cfg); try { cfg.init(argc, argv); } catch (std::exception& ex) { fprintf(stderr, "%s\n", ex.what()); return EXIT_FAILURE; } if (cfg.cli_helptext_printed()) return EXIT_SUCCESS; // Parse environment variables. std::vector cl_ports; for (size_t i = 0; i < num_clones; ++i) { std::string env_var = "CL_"; env_var += std::to_string(i); env_var += "_PORT"; cl_ports.push_back(rd_env_port(env_var.c_str())); } // Run. endpoint ep{std::move(cfg)}; if (mode == "master") { master_mode(ep, std::move(cl_ports)); } else if (mode == "clone") { clone_mode(ep, std::move(cl_ports)); } else { fprintf(stderr, "expected mode 'master' or 'clone'\n"); return EXIT_FAILURE; } puts("fin"); return EXIT_SUCCESS; }