.. _communication: Communication ============= Broker's primary objective is to facilitate efficient communication through a publish/subscribe model. In this model, entities send data by publishing to a specific topic, and receive data by subscribing to topics of interest. The asynchronous nature of publish/subscribe makes it a popular choice for loosely coupled, distributed systems. Broker is the successor of Broccoli. Broker enables arbitrary applications to communicate in Zeek's data model. In this chapter, we first describe generic Broker communication between peers that don't assume any specific message layout. Afterwards, we show how to exchange events with Zeek through an additional Zeek-specific shim on top of Broker's generic messages. Exchanging Broker Messages -------------------------- We start with a discussion of generic message exchange between Broker clients. At the Broker level, messages are just arbitrary values that have no further semantics attached. It's up to senders and receivers to agree on a specific layout of messages (e.g., a set of doubles for a measurement series). .. _endpoint: Endpoints ~~~~~~~~~ Broker encapsulates its entire peering setup in an ``endpoint`` object. Multiple instances of an ``endpoint`` can exist in the same process, but each ``endpoint`` features a thread-pool and (configurable) scheduler, which determines the execution of Broker's components. Using a single ``endpoint`` per OS process guarantees the most efficient usage of available hardware resources. Nonetheless, multiple Broker applications can seamlessly operate when linked together, as there exists no global library state. .. note:: Instances of type ``endpoint`` have reference semantics: that is, they behave like a reference in that it's impossible to obtain an invalid one (unlike a null pointer). An ``endpoint`` can also be copied around cheaply, but is not safe against access from concurrent threads. Peerings ~~~~~~~~ In order to publish or receive messages an endpoint needs to peer with other endpoints. A peering is a bidirectional relationship between two endpoints. Peering endpoints exchange subscriptions and then forward messages accordingly. This allows for creating flexible communication topologies that use topic-based message routing. An endpoint can either initiate a peering itself by connecting to remote locations, or wait for an incoming request: .. literalinclude:: _examples/comm.cc :start-after: --peering-start :end-before: --peering-end Sending Data ~~~~~~~~~~~~ In Broker a message consists of a *topic*-*data* pair. That is, endpoints *publish* values as `data` instances along with a *topic* that steers them to interested subscribers: .. literalinclude:: _examples/comm.cc :start-after: --publish-start :end-before: --publish-end .. note:: Publishing a message can be a no-op if there exists no subscriber. Because Broker has fire-and-forget messaging semantics, the runtime does not generate a notification if no subscribers exist. One can also explicitly create a dedicated ``publisher`` for a specific topic first, and then use that to send subsequent messages. This approach is better suited for high-volume streams, as it leverages CAF's demand management internally: .. literalinclude:: _examples/comm.cc :start-after: --publisher-start :end-before: --publisher-end Finally, there's also a streaming version of the publisher that pulls messages from a producer as capacity becomes available on the output channel; see ``endpoint::publish_all`` and ``endpoint::publish_all_no_sync``. See :ref:`data-model` for a detailed discussion on how to construct values for messages in the form of various types of ``data`` instances. Receiving Data ~~~~~~~~~~~~~~ Endpoints receive data by creating a ``subscriber`` attached to the topics of interest. Subscriptions are prefix-based, matching all topics that start with a given string. A ``subscriber`` can either retrieve incoming messages explicitly by calling ``get`` or ``poll`` (synchronous API), or spawn a background worker to process messages as they come in (asynchronous API). Synchronous API *************** The synchronous API exists for applications that want to poll for messages explicitly. Once a subscriber is registered for topics, calling ``get`` will wait for a new message: .. literalinclude:: _examples/comm.cc :start-after: --get-start :end-before: --get-end By default the function ``get`` blocks until the subscriber has at least one message available, which it then returns. Each retrieved message consists of the same two elements that the publisher passed along: the topic that the message has been published to, and the message's payload in the form of an arbitray Broker value, (i.e., a `data` instance). The example just prints them both out. Blocking indefinitely until messages arrive often won't work well, in particular not in combination with existing event loops or polling. Therefore, ``get`` takes an additional optional timeout parameter to wait only for a certain amount of time. Alternatively, one can also use ``available`` to explicitly check for available messages, or ``poll`` to extract just all currently pending messages (which may be none): .. literalinclude:: _examples/comm.cc :start-after: --poll-start :end-before: --poll-end For integration into event loops, ``subscriber`` also provides a file descriptor that signals whether messages are available: .. literalinclude:: _examples/comm.cc :start-after: --fd-start :end-before: --fd-end Asynchronous API **************** TODO: Document. .. todo: Add docs for asynchronous API. .. If your application does not require a blocking API, the non-blocking API .. offers an asynchronous alternative. Unlike the blocking API, non-blocking .. endpoints take a callback for each topic they subscribe to: .. .. .. code-block:: cpp .. .. context ctx; .. auto ep = ctx.spawn(); .. ep.subscribe("/foo", [=](const topic& t, const data& d) { .. std::cout << t << " -> " << d << std::endl; .. }); .. ep.subscribe("/bar", [=](const topic& t, const data& d) { .. std::cout << t << " -> " << d << std::endl; .. }); .. .. When a new message matching the subscription arrives, Broker dispatches it to .. the callback without blocking. .. .. .. warning:: .. .. The function ``subscribe`` returns immediately. Capturing variable *by .. reference* introduces a dangling reference once the outer frame returns. .. Therefore, only capture locals *by value*. .. _status-error-messages: Status and Error Messages ~~~~~~~~~~~~~~~~~~~~~~~~~ Broker informs clients about any communication errors---and optionally also about non-critical connectivity changes---through separate ``status`` messages. To get access to that information, one creates a ``status_subscriber``, which provides a similar synchronous ``get/available/poll`` API as the standard message subscriber. By default, a ``status_subscriber`` returns only errors: .. literalinclude:: _examples/comm.cc :start-after: --status-subscriber-err-start :end-before: --status-subscriber-err-end Errors reflect failures that may impact the correctness of operation. ``err.code()`` returns an enum ``ec`` that codifies existing error codes: .. literalinclude:: ../libbroker/broker/error.hh :language: cpp :start-after: --ec-enum-start :end-before: --ec-enum-end To receive non-critical status messages as well, specify that when creating the ``status_subscriber``: .. literalinclude:: _examples/comm.cc :start-after: --status-subscriber-all-start :end-before: --status-subscriber-all-end Status messages represent non-critical changes to the topology. For example, after a successful peering, both endpoints receive a ``peer_added`` status message. The concrete semantics of a status depend on its embedded code, which the enum ``sc`` codifies: .. literalinclude:: ../libbroker/broker/status.hh :language: cpp :start-after: --sc-enum-start :end-before: --sc-enum-end Status messages have an optional *context* and an optional descriptive *message*. The member function ``context`` returns a ``const T*`` if the context is available. The type of available context information is dependent on the status code enum ``sc``. For example, all ``sc::peer_*`` status codes include an ``endpoint_info`` context as well as a message. .. _zeek_events_cpp: Exchanging Zeek Events ---------------------- The communication model discussed so far remains generic for all Broker clients in that it doesn't associate any semantics with the values exchanged through messages. In practice, however, senders and receivers will need to agree on a specific data layout for the values exchanged, so that they interpret them in the same way. This is in particular true for exchanging events with Zeek---which is one of the main applications for Broker in the first place. To support that, Broker provides built-in support for sending and receiving Zeek events through a small Zeek-specific shim on top of the generic message model. The shim encapsulates Zeek events and takes care of converting them into the expected lower-level message layout that gets transmitted. This way, Zeek events can be exchanged between an external Broker client and Zeek itself---and also even just between Broker clients without any Zeek instances at all. Here's a complete ping/ping example between a C++ Broker client and Zeek: .. literalinclude:: _examples/ping.zeek .. literalinclude:: _examples/ping.cc .. code-block:: bash # g++ -std=c++17 -lbroker -o ping ping.cc # zeek ping.zeek & # ./ping received pong[0] received pong[1] received pong[2] received pong[3] received pong[4] Gateways -------- Broker was designed with peer-to-peer communication in mind. All endpoints in the network form a single publish/subscribe layer. This implies that each endpoint is aware of every other endpoint in the network as well as what topics they have subscribed to. This level of transparency enables source routing, but it comes at a cost. Endpoints flood subscriptions and topology changes to the entire network. The volume of flooded messages remains small, as long as primarily endpoints with high availability and a stable set of subscriptions join the network. However, short-lived or unstable endpoints may increase the amount of messages in the network quickly. Furthermore, the more endpoints join the network, the more state and bookkeeping overhead accumulates. The overhead becomes especially prominent on endpoints that join the network only to publish data but were placed on the edges of the network. Such endpoints usually end up sending all---or nearly all---of their messages to another, well-connected endpoint that distributes the messages. Nevertheless, these producing endpoints still have to flood their subscriptions to the entire network and get stored in all routing tables. In the Zeek ecosystem, the `Zeek Agent `_ fits this exact role. Agents run at the edge of the network and ideally should not consume more network bandwidth and CPU cycles than necessary. Gateways provide a way to separate the well-connected "inner" endpoints from endpoints at the edges that generally cannot contribute to the overall connectivity of the network but still incur messaging and bookkeeping overhead. Topology ~~~~~~~~ Gateways separate the overlay into two domains: *external* and *internal*. The external domain consists of stable, well-connected endpoints that build the core of the publish/subscribe layer. The internal domain consists of endpoints that need no knowledge of the entire overlay, because all ways would pass through the gateway anyway. This means, the gateway is the only way in or out for endpoints in the internal domain, as illustrated in the figure below. .. figure:: _images/gateway.png :align: center Aside from forwarding messages between the two domains, gateways render all endpoints of the internal domain *completely opaque* to endpoints in the external domain and vice versa. To endpoints in the external domain, a gateway appears as the regular endpoint ``E``. It subscribes to all topics that were subscribed by any endpoint in the internal domain and all messages published in the internal domain appear as if ``E`` was the publisher. The endpoint in the internal domain, ``I`` is the mirror image of ``E``: it hides all endpoints from the external domain. The two endpoints ``E`` and ``I`` actually exist, i.e., the gateway starts both endpoints in the same process and creates a "shortcut" between the two. Every subscription or published events on one gets forwarded to the other. However, ``E`` and ``I`` are not aware of each other and the forwarded events and subscriptions appear as if they had a local ``subscriber`` or ``publisher``. .. warning:: The endpoints ``E`` and ``I`` use the *same ID*. When setting up a gateway, make sure that no other endpoint provides connectivity between the internal and the external domain. Otherwise, ``E`` could receive messages from ``I`` and vice versa. Since they share one ID, endpoints in the network would receive contradictory messages from what appears to be the same endpoint. Setup ~~~~~ Broker includes the standalone tool ``broker-gateway``. When started, it creates the two endpoints ``E`` and ``I`` in the same process. Each of the two endpoints listens to its own port for incoming peerings. A minimal setup would only set the two ports, as shown below. .. code-block:: none broker-gateway --internal.port=8080 --external.port=9090 Users can also configure the gateway to connect to a list of predefined peers on startup. For example: .. code-block:: none broker-gateway --internal.port=8080 \ --internal.peers=[tcp://mars:1234, tcp://venus:2345] \ --external.port=9090 \ --external.peers=[tcp://proxima-centauri:3456] The invocation above would listen on port 8080 for incoming peerings in the internal domain and tries to connect to ``mars`` on port ``1234`` as well as to ``venus`` on port 2345. In the external domain, the gateway would listen on port 9090 and try to connect to ``proxima-centauri`` on port 3456. Instead of using the command line, users could also provide a ``broker.conf`` file with the following content: .. code-block:: none internal { port = 8080 peers = [ , , ] } external { port = 9090 peers = [ , ] } There is also a third parameter for the domains: ``disable-forwarding``. In particular, setting ``internal.disable-forwarding`` to ``true`` causes the gateway to not only isolate endpoints in the internal domain from endpoints in the external domains, but also endpoints *within* the internal domain from each other. In setups where all endpoints of the internal domain connect only to the gateway and do not need to interact with each other, setting this flag reduces any messaging to the bare minimum by leading each endpoint in the internal domain to believe that there is exactly one other endpoint in the network---the gateway.