#include "broker/configuration.hh" #include "broker/endpoint.hh" #include "broker/message.hh" #include #include #include #include #include #include #ifndef BROKER_WINDOWS # include # include #endif using namespace broker; using namespace std::literals; // -- constants and utility types ---------------------------------------------- /// Terminal color and font face options. enum class term_color { /// Resets the color to the default color and the font weight to normal. reset, /// Sets the terminal color to black. black, /// Sets the terminal color to red. red, /// Sets the terminal color to green. green, /// Sets the terminal color to yellow. yellow, /// Sets the terminal color to blue. blue, /// Sets the terminal color to magenta. magenta, /// Sets the terminal color to cyan. cyan, /// Sets the terminal color to white. white, }; namespace { constexpr uint64_t default_peer_count = 10; constexpr uint64_t default_message_count = 10'000; constexpr uint64_t default_payload_size = 1'000; std::string_view tty_codes[] = { "\033[0m", // reset "\033[30m", // black "\033[31m", // red "\033[32m", // green "\033[33m", // yellow "\033[34m", // blue "\033[35m", // magenta "\033[36m", // cyan "\033[37m", // white }; } // namespace bool is_tty([[maybe_unused]] const std::ostream& out) { #ifdef BROKER_WINDOWS return false; #else if (&out == &std::cout) return isatty(STDOUT_FILENO) != 0; else if (&out == &std::cerr || &out == &std::clog) return isatty(STDERR_FILENO) != 0; else return false; #endif } std::ostream& operator<<(std::ostream& out, term_color x) { if (is_tty(out)) out << tty_codes[static_cast(x)]; return out; } struct quoted { std::string_view str; }; std::ostream& operator<<(std::ostream& out, const quoted& x) { return out << '"' << x.str << '"'; } struct parameters { uint64_t peer_count = default_peer_count; uint64_t message_count = default_message_count; uint64_t payload_size = default_payload_size; uint64_t seed = std::random_device{}(); bool naive_publish = false; }; // -- I/O utility -------------------------------------------------------------- namespace { std::mutex ostream_mtx; } // namespace template void do_println(std::ostream& out, Ts&&... xs) { std::unique_lock guard{ostream_mtx}; (out << ... << xs); out << term_color::reset << '\n'; } namespace out { template void println(Ts&&... xs) { do_println(std::cout, std::forward(xs)...); } } // namespace out namespace err { template void println(Ts&&... xs) { do_println(std::cerr, term_color::red, std::forward(xs)..., term_color::reset); } } // namespace err namespace warn { template void println(Ts&&... xs) { do_println(std::cerr, term_color::yellow, std::forward(xs)..., term_color::reset); } } // namespace warn namespace verbose { namespace { bool is_enabled; } // namespace bool enabled() { return is_enabled; } template void println(Ts&&... xs) { if (is_enabled) do_println(std::clog, term_color::blue, std::forward(xs)..., term_color::reset); } } // namespace verbose // -- other utility ------------------------------------------------------------ template std::string random_string(RandomEngine& rng, size_t size) { std::string_view charset = "0123456789" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; auto dis = std::uniform_int_distribution{0, charset.size() - 1}; std::string result; result.reserve(size); for (size_t i = 0; i < size; ++i) result.push_back(charset[dis(rng)]); return result; } void add_options(configuration& cfg, parameters& ps) { cfg.add_option(&ps.peer_count, "peer-count,p", "number of simulated peers"); cfg.add_option(&ps.message_count, "message-count,m", "number of messages"); cfg.add_option(&ps.payload_size, "payload-size,s", "bytes per message"); cfg.add_option(&ps.seed, "seed", "seed for the random-number generator"); cfg.add_option(&verbose::is_enabled, "verbose", "enables more console output"); cfg.add_option(&ps.naive_publish, "naive-publish", "publish data via endpoint::publish instead of publish_all"); } struct generator { explicit generator(parameters ps) : params(ps), rng(static_cast(ps.seed)) { // nop } parameters params; std::minstd_rand rng; data_message next() { return make_data_message("/benchmark/fan-out"s, data{random_string(rng, params.payload_size)}); } }; class barrier { public: explicit barrier(ptrdiff_t num_threads) : num_threads_(num_threads), count_(0) { // nop } void arrive_and_wait() { std::unique_lock guard{mx_}; if (++count_ == num_threads_) { cv_.notify_all(); return; } cv_.wait(guard, [this] { return count_.load() == num_threads_; }); } private: size_t num_threads_; std::mutex mx_; std::atomic count_; std::condition_variable cv_; }; struct padded_id { endpoint_id id; char padding[BROKER_CONSTRUCTIVE_INTERFERENCE_SIZE - sizeof(endpoint_id)]; }; // -- actual program logic ----------------------------------------------------- void run_subscriber(barrier* sync, padded_id* id_slot, uint16_t port, parameters ps, size_t index) { barrier worker_sync{2}; endpoint ep; id_slot->id = ep.node_id(); verbose::println("started new subscriber endpoint: ", ep.node_id()); ep.subscribe( {"/benchmark"_t}, [] { // Init: nop. }, [&worker_sync, ps, index, n = 0u, &ep](const data_message&) mutable { ++n; if (index == 0 && n % 1000 == 0) verbose::println("subscriber 1 received ", n, " items ..."); if (n == ps.message_count) { verbose::println("Broker endpoint ", ep.node_id(), " is done receiving"); worker_sync.arrive_and_wait(); } }, [](const error&) { // Cleanup: nop. }); if (!ep.peer("localhost", port)) { std::cerr << "ep.peer failed!\n"; abort(); } sync->arrive_and_wait(); worker_sync.arrive_and_wait(); } void run_publisher(endpoint& ep, parameters ps) { if (ps.naive_publish) { generator gen{ps}; for (size_t i = 0; i < ps.message_count; ++i) { ep.publish(gen.next()); if (i % 1000 == 0) verbose::println("publisher emitted ", i, " items ..."); } } else { auto gen = std::make_shared(ps); auto worker = ep.publish_all( [](size_t& count) { count = 0; }, [ps, gen](size_t& count, std::deque& out, size_t hint) { if (auto n = std::min(static_cast(ps.message_count) - count, hint); n > 0) { for (size_t i = 0; i < n; ++i) { out.push_back(gen->next()); if (++count % 1000 == 0) verbose::println("publisher emitted ", count, " items ..."); } } }, [n = ps.message_count](const size_t& count) { return count >= n; }); ep.wait_for(worker); } } int main(int argc, char** argv) { // Parse CLI / config file. configuration cfg{skip_init}; parameters params; add_options(cfg, params); try { cfg.init(argc, argv); } catch (std::exception& ex) { std::cerr << ex.what() << "\n\n"; return EXIT_FAILURE; } if (cfg.cli_helptext_printed()) return EXIT_SUCCESS; if (cfg.remainder().size() > 0) { std::cerr << "*** too many arguments (did not expect any)\n\n"; return EXIT_FAILURE; } // Spin up the "main" endpoint. endpoint ep{std::move(cfg)}; auto port = ep.listen(); verbose::println("started publisher endpoint: ", ep.node_id()); // Spin up N peers, as requested and wait for all of them to connect. barrier sync{static_cast(params.peer_count + 1)}; std::vector ls; ls.resize(params.peer_count); std::vector threads; threads.resize(params.peer_count); for (size_t i = 0; i < params.peer_count; ++i) threads[i] = std::thread{run_subscriber, &sync, &ls[i], port, params, i}; sync.arrive_and_wait(); verbose::println("started ", params.peer_count, " subscriber endpoints"); // Wait for all peers to complete their handshake. for (auto& x : ls) { if (!ep.await_peer(x.id)) { std::cerr << "*** peers failed to connect\n"; return EXIT_FAILURE; } } verbose::println("received all ", params.peer_count, " handshakes -> run!"); // Light, camera, action! run_publisher(ep, params); // Tear down. verbose::println("tear down -> wait for ", params.peer_count, " threads"); for (auto& thread : threads) thread.join(); verbose::println("all threads have terminated, bye"); }