.. _data_flows: Data Flows ========== Data flows, or *streams*, are potentially unbound sequences of values. The flow API in CAF makes it easy to generate, transform, and consume observable sequences with a `ReactiveX `_-style interface. Flows that pass through two or more actors use backpressure to make sure that fast senders cannot overwhelm receivers. We do not assume prior experience with ReactiveX for this chapter and there are some key differences to ReactiveX implementations that we pointer out in `Key Differences to ReactiveX`_. Introduction ------------ The fundamental building blocks of the flow API are ``observable``, ``observer`` and ``subscription``. ``observable`` Emits a potentially unbound sequence of values to observers that have subscribed to the observable. Offers the single member function ``subscribe`` to add more observers. ``observer`` Subscribes to and consumes values from an observable. This interface bundles callbacks for the observable, namely ``on_subscribe``, ``on_next``, ``on_complete`` and ``on_error``. ``subscription`` Manages the flow of items between an observable and an observer. An observer calls ``request`` to ask for more items or ``cancel`` to stop receiving data. When working with data flows, the primitive building blocks usually remain in the background. For example, the code snippet below illustrates a trivial data flow for integers inside a single actor that only uses the high-level composition API without any manual setup for observers or subscriptions: .. literalinclude:: /examples/flow/from-callable.cpp :language: C++ :start-after: --(rst-main-begin)-- :end-before: --(rst-main-end)-- Concurrent Processing --------------------- Flows that only run inside a single actors are of course quite useless outside of toy examples. For running different parts of a data flow on different actors, CAF offers two APIs: one for setting up a processing chain declaratively and one for setting up processing chains dynamically. Declarative Setup: ``observe_on`` +++++++++++++++++++++++++++++++++ If the entire processing chain is known at coding time, ``observe_on`` provides the easiest way to assign work to individual actors. The following example revisits our first example, but this time generates the numbers on one actor and then prints them on another. .. literalinclude:: /examples/flow/observe-on.cpp :language: C++ :start-after: --(rst-main-begin)-- :end-before: --(rst-main-end)-- Please note that calling ``observe_on`` *requires* that the target actor is inactive. Otherwise, this function call results in unsynchronized state access. Dynamic Setup: Asynchronous Buffers +++++++++++++++++++++++++++++++++++ Our second option for spanning data flows across multiple actors is using SPSC (Single Producer Single Consumer) buffers. This option is more general. In fact, ``observe_on`` internally uses these buffers for connecting the actors. Further, the buffers allows bridging flows between actor and non-actor code. While one could use an SPSC buffer directly, they usually remain hidden behind another abstraction: asynchronous resources. The resources in CAF usually come in pairs and users may create new ones by calling ``make_spsc_buffer_resource``. This function returns a producer resource and a consumer resource. With these two resources, we can then spawn actors that open the resources for either reading or writing. To illustrate how the API pieces fit together, we revisit our example a third time. This time, we spawn the actors individually and connect them via the buffer resources: .. literalinclude:: /examples/flow/spsc-buffer-resource.cpp :language: C++ :start-after: --(rst-main-begin)-- :end-before: --(rst-main-end)-- In this iteration of our example, we have moved the implementation for the source and sink actors to their own functions. The source once again creates the data, only this time we subscribe the buffer to the generated sequence: .. literalinclude:: /examples/flow/spsc-buffer-resource.cpp :language: C++ :start-after: --(rst-source-begin)-- :end-before: --(rst-source-end)-- For the sink, we generate an observable from the consumer resource and then once more call ``for_each``: .. literalinclude:: /examples/flow/spsc-buffer-resource.cpp :language: C++ :start-after: --(rst-sink-begin)-- :end-before: --(rst-sink-end)-- Building and Transforming Observables ------------------------------------- When building processing pipelines, CAF fuses as many processing steps as possible into a single C++ object. In our examples, we composed the source part like this: ``self->make_observable().from_callable(...).take(...)...``. The first bit, ``self->make_observable()``, returns an ``observable_builder``. This class implements factory functions for creating observable sequences from containers, repeated values, and so on. However, most functions do not actually return an ``observable``. Instead, they return a ``generation<...>`` object. The ``generation`` class is a variadic template that allows CAF to incrementally define consecutive processing steps. In our example, we call ``from_callable`` on the builder object, which returns a ``generation>``. The generation is meant as temporary object only. Hence, most member functions may only get called on an rvalue. After calling ``.take(...)`` on the returned ``generation``, we get a new temporary object of type ``generation, limit_step<...>>``. The ``generation`` class also mimics the interface of ``observable``. When calling a member function that requires an actual ``observable``, CAF uses the blueprint stored in the ``generation`` to create an actual observable object and then forward the member function call. For example, calling ``for_each`` on a ``generation`` internally constructs the ``observable`` and then calls ``for_each`` on that new object. Users can also call ``as_observable`` on a ``generation`` explicitly to turn the blueprint into an actual observable sequence. By delaying the construction of actual ``observable`` instances, CAF can fuse consecutive steps into single objects. This reduces the number of heap allocations and also accelerates processing, since the fused processing steps result in simple function call chains without subscriptions and backpressure between them. Analogues to the ``generation`` class for creating new observables from inputs, CAF uses a template called ``transformation`` that represents a blueprint for applying operators to existing observables. Operators --------- Most operators transform an observable by applying one or more processing steps on all observed values and then emit the result as a new observable. Since the result of a transformation usually is new observable, these operators compose into complex data stream operations. The operators presented here are available on the template classes ``observable``, ``generation`` and ``transformation``. Filter ++++++ The ``filter`` operator re-emits items from its input observable that pass a predicate test. .. image:: filter.png :alt: Filter operator. Map +++ The ``map`` operator applies a unary operation to all items of the input observable and re-emits the resulting items. Similar to `std::transform `_. .. image:: map.png :alt: Map operator. Take ++++ The ``take`` operator re-emits the first ``n`` items from its input observable. .. image:: take.png :alt: Take operator. Merge +++++ The ``merge`` operator takes multiple input observables and re-emits the observed items as a single sequence of items as soon as they appear. .. image:: merge.png :alt: Merge operator. Concat ++++++ The ``concat`` operator takes multiple input observables and re-emits the observed items as a single sequence of items without interleaving them. .. image:: concat.png :alt: Concat operator. Key Differences to ReactiveX ---------------------------- Observables are not thread-safe. They describe a flow of data *within* an actor and are thus considered private to an actor. CAF is more "opinionated" than ReactiveX when it comes to concurrency and ownership. The intended way for connecting concurrent parts of the system is by creating buffer resources and turning them into observables at the observing actor. Furthermore, CAF does not support the scheduler interface from ReactiveX. Data flows are usually managed by an actor. Hence, there is no analog for operators such as ``SubscribeOn``. That being said, the flow API does not tie observables or observers to actor types. The interface ``caf::flow::coordinator`` manages scheduling of flow-related work and can be implemented to run CAF flows without actors, e.g., to integrate them into a custom event loop. Observers and observables use non-blocking backpressure by default. The protocol used between observers and observables to signal demand is similar to the `Reactive Streams `_ specification.