Patrick Kelley 8fd444092b initial
2025-05-07 15:35:15 -04:00

376 lines
13 KiB
C++

/******************************************************************************
* This example *
* - emulates a client launching a request every 10-300ms *
* - uses a CURL-backend consisting of a master and 10 workers *
* - runs until it is shut down by a CTRL+C signal *
* *
* *
* Schematic view: *
* *
* client | client_job | curl_master | curl_worker *
* /--------------|*|-------------\ /-------------|*| *
* /---------------|*|--------------\ / *
* /----------------|*|---------------\ / *
* |*| ----------------|*|----------------|*|----------------|*| *
* \________________|*|_______________/ \ *
* \_______________|*|______________/ \ *
* \______________|*|_____________/ \-------------|*| *
* *
* *
* Communication pattern: *
* *
* client_job curl_master curl_worker *
* | | | *
* | ----(read)-----> | | *
* | | --(forward)----> | *
* | |---\ *
* | | | *
* | |<--/ *
* | <-------------(reply)-------------- | *
* X *
******************************************************************************/
// C includes
#include <csignal>
#include <cstdlib>
#include <ctime>
// C++ includes
#include <iostream>
#include <random>
#include <string>
#include <vector>
// CAF
#include "caf/all.hpp"
#include "caf/io/all.hpp"
CAF_PUSH_WARNINGS
#include <curl/curl.h>
CAF_POP_WARNINGS
// disable some clang warnings here caused by CURL macros
#ifdef __clang__
# pragma clang diagnostic ignored "-Wshorten-64-to-32"
# pragma clang diagnostic ignored "-Wdisabled-macro-expansion"
# pragma clang diagnostic ignored "-Wunused-const-variable"
#endif // __clang__
CAF_BEGIN_TYPE_ID_BLOCK(curl_fuse, first_custom_type_id)
CAF_ADD_TYPE_ID(curl_fuse, (std::vector<char>) )
CAF_ADD_ATOM(curl_fuse, read_atom)
CAF_ADD_ATOM(curl_fuse, fail_atom)
CAF_ADD_ATOM(curl_fuse, next_atom)
CAF_ADD_ATOM(curl_fuse, reply_atom)
CAF_ADD_ATOM(curl_fuse, finished_atom)
CAF_END_TYPE_ID_BLOCK(curl_fuse)
using namespace caf;
using buffer_type = std::vector<char>;
namespace color {
// UNIX terminal color codes
constexpr char reset[] = "\033[0m";
constexpr char reset_endl[] = "\033[0m\n";
constexpr char black[] = "\033[30m";
constexpr char red[] = "\033[31m";
constexpr char green[] = "\033[32m";
constexpr char yellow[] = "\033[33m";
constexpr char blue[] = "\033[34m";
constexpr char magenta[] = "\033[35m";
constexpr char cyan[] = "\033[36m";
constexpr char white[] = "\033[37m";
constexpr char bold_black[] = "\033[1m\033[30m";
constexpr char bold_red[] = "\033[1m\033[31m";
constexpr char bold_green[] = "\033[1m\033[32m";
constexpr char bold_yellow[] = "\033[1m\033[33m";
constexpr char bold_blue[] = "\033[1m\033[34m";
constexpr char bold_magenta[] = "\033[1m\033[35m";
constexpr char bold_cyan[] = "\033[1m\033[36m";
constexpr char bold_white[] = "\033[1m\033[37m";
} // namespace color
// number of HTTP workers
constexpr size_t num_curl_workers = 10;
// minimum delay between HTTP requests
constexpr int min_req_interval = 10;
// maximum delay between HTTP requests
constexpr int max_req_interval = 300;
// put everything into anonymous namespace (except main)
namespace {
// provides print utility, a name, and a parent
struct base_state {
base_state(local_actor* thisptr) : self(thisptr) {
// nop
}
actor_ostream print() {
return aout(self) << color << self->name() << " (id = " << self->id()
<< "): ";
}
virtual bool init(std::string m_color) {
color = std::move(m_color);
print() << "started" << color::reset_endl;
return true;
}
virtual ~base_state() {
print() << "done" << color::reset_endl;
}
local_actor* self;
std::string color;
};
struct client_job_state : base_state {
static inline const char* name = "curl.client-job";
using base_state::base_state;
};
// encapsulates an HTTP request
behavior client_job(stateful_actor<client_job_state>* self,
const actor& parent) {
if (!self->state.init(color::blue))
return {}; // returning an empty behavior terminates the actor
self->send(parent, read_atom_v, "http://www.example.com/index.html",
uint64_t{0}, uint64_t{4095});
return {
[=](reply_atom, const buffer_type& buf) {
self->state.print() << "successfully received " << buf.size() << " bytes"
<< color::reset_endl;
self->quit();
},
[=](fail_atom) {
self->state.print() << "failure" << color::reset_endl;
self->quit();
},
};
}
struct client_state : base_state {
client_state(local_actor* selfptr)
: base_state(selfptr),
count(0),
re(rd()),
dist(min_req_interval, max_req_interval) {
// nop
}
size_t count;
std::random_device rd;
std::default_random_engine re;
std::uniform_int_distribution<int> dist;
static inline const char* name = "curl.client";
};
// spawns HTTP requests
behavior client(stateful_actor<client_state>* self, const actor& parent) {
using std::chrono::milliseconds;
self->link_to(parent);
if (!self->state.init(color::green))
return {}; // returning an empty behavior terminates the actor
self->send(self, next_atom_v);
return {
[=](next_atom) {
auto& st = self->state;
st.print() << "spawn new client_job (nr. " << ++st.count << ")"
<< color::reset_endl;
// client_job will use IO
// and should thus be spawned in a separate thread
self->spawn<detached + linked>(client_job, parent);
// compute random delay until next job is launched
auto delay = st.dist(st.re);
self->delayed_send(self, milliseconds(delay), next_atom_v);
},
};
}
struct curl_state : base_state {
curl_state(local_actor* selfptr) : base_state(selfptr) {
// nop
}
~curl_state() override {
if (curl != nullptr)
curl_easy_cleanup(curl);
}
static size_t callback(void* data, size_t bsize, size_t nmemb, void* userp) {
size_t size = bsize * nmemb;
auto& buf = reinterpret_cast<curl_state*>(userp)->buf;
auto first = reinterpret_cast<char*>(data);
auto last = first + bsize;
buf.insert(buf.end(), first, last);
return size;
}
bool init(std::string m_color) override {
curl = curl_easy_init();
if (curl == nullptr)
return false;
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &curl_state::callback);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
return base_state::init(std::move(m_color));
}
CURL* curl = nullptr;
buffer_type buf;
static inline const char* name = "curl.worker";
};
// manages a CURL session
behavior curl_worker(stateful_actor<curl_state>* self, const actor& parent) {
if (!self->state.init(color::yellow))
return {}; // returning an empty behavior terminates the actor
return {[=](read_atom, const std::string& fname, uint64_t offset,
uint64_t range) -> message {
auto& st = self->state;
st.print() << "read" << color::reset_endl;
for (;;) {
st.buf.clear();
// set URL
curl_easy_setopt(st.curl, CURLOPT_URL, fname.c_str());
// set range
std::ostringstream oss;
oss << offset << "-" << range;
curl_easy_setopt(st.curl, CURLOPT_RANGE, oss.str().c_str());
// set curl callback
curl_easy_setopt(st.curl, CURLOPT_WRITEDATA,
reinterpret_cast<void*>(&st));
// launch file transfer
auto res = curl_easy_perform(st.curl);
if (res != CURLE_OK) {
st.print() << "curl_easy_perform() failed: " << curl_easy_strerror(res)
<< color::reset_endl;
} else {
long hc = 0; // http return code
curl_easy_getinfo(st.curl, CURLINFO_RESPONSE_CODE, &hc);
switch (hc) {
default:
st.print() << "http error: download failed with "
<< "'HTTP RETURN CODE': " << hc << color::reset_endl;
break;
case 200: // ok
case 206: // partial content
st.print() << "received " << st.buf.size()
<< " bytes with 'HTTP RETURN CODE': " << hc
<< color::reset_endl;
// tell parent that this worker is done
self->send(parent, finished_atom_v);
return make_message(reply_atom_v, std::move(st.buf));
case 404: // file does not exist
st.print() << "http error: download failed with "
<< "'HTTP RETURN CODE': 404 (file does "
<< "not exist!)" << color::reset_endl;
}
}
// avoid 100% cpu utilization if remote side is not accessible
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}};
}
struct master_state : base_state {
master_state(local_actor* selfptr) : base_state(selfptr) {
// nop
}
std::vector<actor> idle;
std::vector<actor> busy;
static inline const char* name = "curl.master";
};
behavior curl_master(stateful_actor<master_state>* self) {
if (!self->state.init(color::magenta))
return {}; // returning an empty behavior terminates the actor
// spawn workers
for (size_t i = 0; i < num_curl_workers; ++i)
self->state.idle.push_back(
self->spawn<detached + linked>(curl_worker, self));
auto worker_finished = [=] {
auto sender = self->current_sender();
auto last = self->state.busy.end();
auto i = std::find(self->state.busy.begin(), last, sender);
if (i == last)
return;
self->state.idle.push_back(*i);
self->state.busy.erase(i);
self->state.print() << "worker is done" << color::reset_endl;
};
self->state.print() << "spawned " << self->state.idle.size() << " worker(s)"
<< color::reset_endl;
return {
[=](read_atom rd, std::string& str, uint64_t x, uint64_t y) {
auto& st = self->state;
st.print() << "received {'read'}" << color::reset_endl;
// forward job to an idle worker
actor worker = st.idle.back();
st.idle.pop_back();
st.busy.push_back(worker);
self->delegate(worker, rd, std::move(str), x, y);
st.print() << st.busy.size() << " active jobs" << color::reset_endl;
if (st.idle.empty()) {
// wait until at least one worker finished its job
self->become(keep_behavior, [=](finished_atom) {
worker_finished();
self->unbecome();
});
}
},
[=](finished_atom) { worker_finished(); },
};
}
// signal handling for ctrl+c
std::atomic<bool> shutdown_flag{false};
} // namespace
void caf_main(actor_system& system) {
// install signal handler
struct sigaction act;
act.sa_handler = [](int) { shutdown_flag = true; };
auto set_sighandler = [&] {
if (sigaction(SIGINT, &act, nullptr) != 0) {
std::cerr << "fatal: cannot set signal handler" << std::endl;
abort();
}
};
set_sighandler();
// initialize CURL
curl_global_init(CURL_GLOBAL_DEFAULT);
// get a scoped actor for the communication with our CURL actors
scoped_actor self{system};
// spawn client and curl_master
auto master = self->spawn<detached>(curl_master);
self->spawn<detached>(client, master);
// poll CTRL+C flag every second
while (!shutdown_flag)
std::this_thread::sleep_for(std::chrono::seconds(1));
aout(self) << color::cyan << "received CTRL+C" << color::reset_endl;
// shutdown actors
anon_send_exit(master, exit_reason::user_shutdown);
// await actors
act.sa_handler = [](int) { abort(); };
set_sighandler();
aout(self) << color::cyan
<< "await CURL; this may take a while "
"(press CTRL+C again to abort)"
<< color::reset_endl;
self->await_all_other_actors_done();
// shutdown CURL
curl_global_cleanup();
}
CAF_MAIN(id_block::curl_fuse, io::middleman)