zeek/auxil/broker/caf/libcaf_net/test/net/consumer_adapter.cpp
Patrick Kelley 8fd444092b initial
2025-05-07 15:35:15 -04:00

219 lines
5.9 KiB
C++

// This file is part of CAF, the C++ Actor Framework. See the file LICENSE in
// the main distribution directory for license terms and copyright or visit
// https://github.com/actor-framework/actor-framework/blob/master/LICENSE.
#define CAF_SUITE net.consumer_adapter
#include "caf/net/consumer_adapter.hpp"
#include "net-test.hpp"
#include "caf/async/spsc_buffer.hpp"
#include "caf/net/middleman.hpp"
#include "caf/net/socket_guard.hpp"
#include "caf/net/stream_socket.hpp"
#include "caf/scheduled_actor/flow.hpp"
using namespace caf;
namespace {
class reader {
public:
reader(net::stream_socket fd, size_t n) : sg_(fd) {
buf_.resize(n);
}
auto fd() {
return sg_.socket();
}
void read_some() {
if (rd_pos_ < buf_.size()) {
auto res = read(fd(), make_span(buf_).subspan(rd_pos_));
if (res > 0) {
rd_pos_ += static_cast<size_t>(res);
MESSAGE(rd_pos_ << " bytes received");
} else if (res < 0 && !net::last_socket_error_is_temporary()) {
FAIL("failed to read: " << net::last_socket_error_as_string());
}
}
}
size_t remaining() const noexcept {
return buf_.size() - rd_pos_;
}
bool done() const noexcept {
return remaining() == 0;
}
auto& buf() const {
return buf_;
}
private:
size_t rd_pos_ = 0;
std::vector<byte> buf_;
net::socket_guard<net::stream_socket> sg_;
};
class app_t {
public:
using input_tag = tag::stream_oriented;
using resource_type = async::consumer_resource<int32_t>;
using buffer_type = resource_type::buffer_type;
using adapter_ptr = net::consumer_adapter_ptr<buffer_type>;
using adapter_type = adapter_ptr::element_type;
explicit app_t(resource_type input) : input(std::move(input)) {
// nop
}
template <class LowerLayerPtr>
error init(net::socket_manager* mgr, LowerLayerPtr, const settings&) {
if (auto ptr = adapter_type::try_open(mgr, std::move(input))) {
adapter = std::move(ptr);
return none;
} else {
FAIL("unable to open the resource");
}
}
template <class LowerLayerPtr>
struct send_helper {
app_t* thisptr;
LowerLayerPtr down;
bool on_next_called;
bool aborted;
void on_next(int32_t val) {
thisptr->written_values.emplace_back(val);
auto offset = thisptr->written_bytes.size();
binary_serializer sink{nullptr, thisptr->written_bytes};
if (!sink.apply(val))
FAIL("sink.apply failed: " << sink.get_error());
auto bytes = make_span(thisptr->written_bytes).subspan(offset);
down->begin_output();
auto& buf = down->output_buffer();
buf.insert(buf.end(), bytes.begin(), bytes.end());
down->end_output();
}
void on_complete() {
// nop
}
void on_error(const error&) {
aborted = true;
}
};
template <class LowerLayerPtr>
bool prepare_send(LowerLayerPtr down) {
if (done)
return true;
auto helper = send_helper<LowerLayerPtr>{this, down, false, false};
while (down->can_send_more()) {
auto [ok, consumed] = adapter->pull(async::delay_errors, 1, helper);
if (!ok) {
MESSAGE("adapter signaled end-of-buffer");
done = true;
break;
} else if (consumed == 0) {
break;
}
}
MESSAGE(written_bytes.size() << " bytes written");
return true;
}
template <class LowerLayerPtr>
bool done_sending(LowerLayerPtr) {
return done || !adapter->has_data();
}
template <class LowerLayerPtr>
void continue_reading(LowerLayerPtr) {
CAF_FAIL("continue_reading called");
}
template <class LowerLayerPtr>
void abort(LowerLayerPtr, const error& reason) {
FAIL("app::abort called: " << reason);
}
template <class LowerLayerPtr>
ptrdiff_t consume(LowerLayerPtr, byte_span, byte_span) {
FAIL("app::consume called: unexpected data");
}
bool done = false;
std::vector<int32_t> written_values;
std::vector<byte> written_bytes;
adapter_ptr adapter;
resource_type input;
};
struct fixture : test_coordinator_fixture<>, host_fixture {
fixture() : mm(sys) {
mm.mpx().set_thread_id();
if (auto err = mm.mpx().init())
CAF_FAIL("mpx.init() failed: " << err);
}
bool handle_io_event() override {
return mm.mpx().poll_once(false);
}
net::middleman mm;
};
} // namespace
BEGIN_FIXTURE_SCOPE(fixture)
SCENARIO("subscriber adapters wake up idle socket managers") {
GIVEN("an actor pushing into a buffer resource") {
static constexpr size_t num_items = 79;
auto [rd, wr] = async::make_spsc_buffer_resource<int32_t>(8, 2);
sys.spawn([wr{wr}](event_based_actor* self) {
self->make_observable().repeat(42).take(num_items).subscribe(wr);
});
WHEN("draining the buffer resource and sending its items over a socket") {
auto [fd1, fd2] = unbox(net::make_stream_socket_pair());
if (auto err = nonblocking(fd1, true))
FAIL("nonblocking(fd1) returned an error: " << err);
if (auto err = nonblocking(fd2, true))
FAIL("nonblocking(fd2) returned an error: " << err);
auto mgr = net::make_socket_manager<app_t, net::stream_transport>(
fd1, mm.mpx_ptr(), std::move(rd));
auto& app = mgr->top_layer();
if (auto err = mgr->init(content(cfg)))
FAIL("mgr->init() failed: " << err);
THEN("the reader receives all items before the connection closes") {
auto remaining = num_items * sizeof(int32_t);
reader rd{fd2, remaining};
while (!rd.done()) {
if (auto new_val = rd.remaining(); remaining != new_val) {
remaining = new_val;
MESSAGE("want " << remaining << " more bytes");
}
run();
rd.read_some();
}
CHECK_EQ(app.written_values, std::vector<int32_t>(num_items, 42));
CHECK_EQ(app.written_bytes.size(), num_items * sizeof(int32_t));
CHECK_EQ(rd.buf().size(), num_items * sizeof(int32_t));
CHECK_EQ(app.written_bytes, rd.buf());
}
}
}
}
END_FIXTURE_SCOPE()