116 lines
4.9 KiB
Diff
116 lines
4.9 KiB
Diff
diff --git a/src/rmq/rmqio/rmqio_asioconnection.cpp b/src/rmq/rmqio/rmqio_asioconnection.cpp
|
|
index bdd0e97..ce874ec 100644
|
|
--- a/src/rmq/rmqio/rmqio_asioconnection.cpp
|
|
+++ b/src/rmq/rmqio/rmqio_asioconnection.cpp
|
|
@@ -444,21 +444,16 @@ bool AsioConnection<SocketType>::doRead(bsl::size_t bytes_transferred)
|
|
bsl::size_t bytes_decoded = 0;
|
|
boost::asio::streambuf::const_buffers_type bufs = d_inbound->data();
|
|
bsl::vector<rmqamqpt::Frame> readFrames;
|
|
- for (boost::asio::streambuf::const_buffers_type::const_iterator i =
|
|
- bufs.begin();
|
|
- i != bufs.end();
|
|
- ++i) {
|
|
- boost::asio::const_buffer buf(*i);
|
|
- Decoder::ReturnCode rcode =
|
|
- d_frameDecoder->appendBytes(&readFrames, buf.data(), buf.size());
|
|
- if (rcode != Decoder::OK) {
|
|
- BALL_LOG_WARN << "Bad rcode from decoder: " << rcode;
|
|
- // Fail but we still want to process frames we were able to decode
|
|
- success = false;
|
|
- break;
|
|
- };
|
|
- bytes_decoded += buf.size();
|
|
- }
|
|
+
|
|
+ boost::asio::const_buffer buf(bufs);
|
|
+ Decoder::ReturnCode rcode =
|
|
+ d_frameDecoder->appendBytes(&readFrames, buf.data(), buf.size());
|
|
+ if (rcode != Decoder::OK) {
|
|
+ BALL_LOG_WARN << "Bad rcode from decoder: " << rcode;
|
|
+ // Fail but we still want to process frames we were able to decode
|
|
+ success = false;
|
|
+ };
|
|
+ bytes_decoded += buf.size();
|
|
|
|
if (bytes_decoded != bytes_transferred) {
|
|
BALL_LOG_WARN << "bytes_decoded (" << bytes_decoded
|
|
diff --git a/src/rmq/rmqio/rmqio_asioeventloop.cpp b/src/rmq/rmqio/rmqio_asioeventloop.cpp
|
|
index ed8f176..307fd20 100644
|
|
--- a/src/rmq/rmqio/rmqio_asioeventloop.cpp
|
|
+++ b/src/rmq/rmqio/rmqio_asioeventloop.cpp
|
|
@@ -108,8 +108,8 @@ void AsioEventLoop::onThreadStarted()
|
|
d_condition.broadcast();
|
|
}
|
|
|
|
-void AsioEventLoop::postImpl(const Item& item) { d_context.post(item); }
|
|
-void AsioEventLoop::dispatchImpl(const Item& item) { d_context.dispatch(item); }
|
|
+void AsioEventLoop::postImpl(const Item& item) { boost::asio::post(d_context, item); }
|
|
+void AsioEventLoop::dispatchImpl(const Item& item) { boost::asio::dispatch(d_context, item); }
|
|
|
|
bsl::shared_ptr<rmqio::Resolver>
|
|
AsioEventLoop::resolver(bool shuffleConnectionEndpoints)
|
|
diff --git a/src/tests/rmqamqp/rmqamqp_connection.t.cpp b/src/tests/rmqamqp/rmqamqp_connection.t.cpp
|
|
index 2c99b0b..97621f5 100644
|
|
--- a/src/tests/rmqamqp/rmqamqp_connection.t.cpp
|
|
+++ b/src/tests/rmqamqp/rmqamqp_connection.t.cpp
|
|
@@ -147,7 +147,8 @@ class MockConnection : public rmqio::Connection {
|
|
|
|
BSLS_ASSERT_OPT(rc == Frame::OK);
|
|
|
|
- d_eventLoop.post(
|
|
+ boost::asio::post(
|
|
+ d_eventLoop,
|
|
bdlf::BindUtil::bind(d_connectionCallbacks.onRead, decoded));
|
|
}
|
|
}
|
|
@@ -156,7 +157,8 @@ class MockConnection : public rmqio::Connection {
|
|
{
|
|
BALL_LOG_TRACE << "MockConnection close";
|
|
|
|
- d_eventLoop.post(bdlf::BindUtil::bind(cb, GRACEFUL_DISCONNECT));
|
|
+ boost::asio::post(d_eventLoop,
|
|
+ bdlf::BindUtil::bind(cb, GRACEFUL_DISCONNECT));
|
|
}
|
|
|
|
void asyncWriteImpl(
|
|
@@ -175,7 +177,7 @@ class MockConnection : public rmqio::Connection {
|
|
rmqamqpt::Method(
|
|
rmqamqpt::ConnectionMethod(rmqamqpt::ConnectionCloseOk())));
|
|
|
|
- d_eventLoop.post(callback);
|
|
+ boost::asio::post(d_eventLoop, callback);
|
|
|
|
if (!closeOk) {
|
|
feedNextFrame();
|
|
@@ -301,7 +303,7 @@ ACTION_P3(ConnectMockConnection, mockConnectPtrPtr, replayFrame, eventLoop)
|
|
|
|
ON_CALL(**mockConnectPtrPtr, isConnected()).WillByDefault(Return(true));
|
|
|
|
- eventLoop.get().post(arg4);
|
|
+ boost::asio::post(eventLoop.get(), arg4);
|
|
|
|
return *mockConnectPtrPtr;
|
|
}
|
|
diff --git a/src/tests/rmqio/rmqio_asioresolver.t.cpp b/src/tests/rmqio/rmqio_asioresolver.t.cpp
|
|
index e5c2c7e..ad14d5e 100644
|
|
--- a/src/tests/rmqio/rmqio_asioresolver.t.cpp
|
|
+++ b/src/tests/rmqio/rmqio_asioresolver.t.cpp
|
|
@@ -98,7 +98,7 @@ TEST_F(ResolverTests, ShufflesResolverResults)
|
|
for (int i = 0; i < 5; i++) {
|
|
bsl::string ip = bsl::to_string(i) + ".0.0.0";
|
|
entry_type::endpoint_type endpoint(
|
|
- boost::asio::ip::address::from_string(std::string(ip)), 1);
|
|
+ boost::asio::ip::make_address(std::string(ip)), 1);
|
|
entries.push_back(entry_type(endpoint, host, port));
|
|
}
|
|
AsioResolver::results_type resolverResults =
|
|
@@ -140,7 +140,7 @@ TEST_F(ResolverTests, NoShuffleDoesNotReorderResolverResults)
|
|
for (int i = 0; i < 5; i++) {
|
|
bsl::string ip = bsl::to_string(i) + ".0.0.0";
|
|
entry_type::endpoint_type endpoint(
|
|
- boost::asio::ip::address::from_string(std::string(ip)), 1);
|
|
+ boost::asio::ip::make_address(std::string(ip)), 1);
|
|
entries.push_back(entry_type(endpoint, host, port));
|
|
}
|
|
AsioResolver::results_type resolverResults =
|