Commit 7c5ef061 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

asio: Add configurable connect/read timeout for client

This commit includes backward incompatible change, since we change
private field in public API class.
parent 1ee1122d
...@@ -39,12 +39,16 @@ using boost::asio::ip::tcp; ...@@ -39,12 +39,16 @@ using boost::asio::ip::tcp;
session::session(boost::asio::io_service &io_service, const std::string &host, session::session(boost::asio::io_service &io_service, const std::string &host,
const std::string &service) const std::string &service)
: impl_(make_unique<session_tcp_impl>(io_service, host, service)) {} : impl_(std::make_shared<session_tcp_impl>(io_service, host, service)) {
impl_->start_resolve(host, service);
}
session::session(boost::asio::io_service &io_service, session::session(boost::asio::io_service &io_service,
boost::asio::ssl::context &tls_ctx, const std::string &host, boost::asio::ssl::context &tls_ctx, const std::string &host,
const std::string &service) const std::string &service)
: impl_(make_unique<session_tls_impl>(io_service, tls_ctx, host, service)) { : impl_(std::make_shared<session_tls_impl>(io_service, tls_ctx, host,
service)) {
impl_->start_resolve(host, service);
} }
session::~session() {} session::~session() {}
...@@ -93,6 +97,14 @@ const request *session::submit(boost::system::error_code &ec, ...@@ -93,6 +97,14 @@ const request *session::submit(boost::system::error_code &ec,
return impl_->submit(ec, method, uri, std::move(cb), std::move(h)); return impl_->submit(ec, method, uri, std::move(cb), std::move(h));
} }
void session::connect_timeout(const boost::posix_time::time_duration &t) {
impl_->connect_timeout(t);
}
void session::read_timeout(const boost::posix_time::time_duration &t) {
impl_->read_timeout(t);
}
} // namespace client } // namespace client
} // namespace asio_http2 } // namespace asio_http2
} // nghttp2 } // nghttp2
...@@ -40,8 +40,10 @@ namespace client { ...@@ -40,8 +40,10 @@ namespace client {
session_impl::session_impl(boost::asio::io_service &io_service) session_impl::session_impl(boost::asio::io_service &io_service)
: wblen_(0), io_service_(io_service), resolver_(io_service), : wblen_(0), io_service_(io_service), resolver_(io_service),
session_(nullptr), data_pending_(nullptr), data_pendinglen_(0), deadline_(io_service), connect_timeout_(boost::posix_time::seconds(60)),
writing_(false), inside_callback_(false) {} read_timeout_(boost::posix_time::seconds(60)), session_(nullptr),
data_pending_(nullptr), data_pendinglen_(0), writing_(false),
inside_callback_(false), stopped_(false) {}
session_impl::~session_impl() { session_impl::~session_impl() {
// finish up all active stream // finish up all active stream
...@@ -56,9 +58,13 @@ session_impl::~session_impl() { ...@@ -56,9 +58,13 @@ session_impl::~session_impl() {
void session_impl::start_resolve(const std::string &host, void session_impl::start_resolve(const std::string &host,
const std::string &service) { const std::string &service) {
deadline_.expires_from_now(connect_timeout_);
auto self = this->shared_from_this();
resolver_.async_resolve({host, service}, resolver_.async_resolve({host, service},
[this](const boost::system::error_code &ec, [this, self](const boost::system::error_code &ec,
tcp::resolver::iterator endpoint_it) { tcp::resolver::iterator endpoint_it) {
if (ec) { if (ec) {
not_connected(ec); not_connected(ec);
return; return;
...@@ -66,6 +72,25 @@ void session_impl::start_resolve(const std::string &host, ...@@ -66,6 +72,25 @@ void session_impl::start_resolve(const std::string &host,
start_connect(endpoint_it); start_connect(endpoint_it);
}); });
deadline_.async_wait(std::bind(&session_impl::handle_deadline, self));
}
void session_impl::handle_deadline() {
if (stopped_) {
return;
}
if (deadline_.expires_at() <=
boost::asio::deadline_timer::traits_type::now()) {
call_error_cb(boost::asio::error::timed_out);
stop();
deadline_.expires_at(boost::posix_time::pos_infin);
return;
}
deadline_.async_wait(
std::bind(&session_impl::handle_deadline, this->shared_from_this()));
} }
void session_impl::connected(tcp::resolver::iterator endpoint_it) { void session_impl::connected(tcp::resolver::iterator endpoint_it) {
...@@ -86,6 +111,7 @@ void session_impl::connected(tcp::resolver::iterator endpoint_it) { ...@@ -86,6 +111,7 @@ void session_impl::connected(tcp::resolver::iterator endpoint_it) {
void session_impl::not_connected(const boost::system::error_code &ec) { void session_impl::not_connected(const boost::system::error_code &ec) {
call_error_cb(ec); call_error_cb(ec);
stop();
} }
void session_impl::on_connect(connect_cb cb) { connect_cb_ = std::move(cb); } void session_impl::on_connect(connect_cb cb) { connect_cb_ = std::move(cb); }
...@@ -97,6 +123,9 @@ const connect_cb &session_impl::on_connect() const { return connect_cb_; } ...@@ -97,6 +123,9 @@ const connect_cb &session_impl::on_connect() const { return connect_cb_; }
const error_cb &session_impl::on_error() const { return error_cb_; } const error_cb &session_impl::on_error() const { return error_cb_; }
void session_impl::call_error_cb(const boost::system::error_code &ec) { void session_impl::call_error_cb(const boost::system::error_code &ec) {
if (stopped_) {
return;
}
auto &error_cb = on_error(); auto &error_cb = on_error();
if (!error_cb) { if (!error_cb) {
return; return;
...@@ -350,12 +379,20 @@ int session_impl::write_trailer(stream &strm, header_map h) { ...@@ -350,12 +379,20 @@ int session_impl::write_trailer(stream &strm, header_map h) {
} }
void session_impl::cancel(stream &strm, uint32_t error_code) { void session_impl::cancel(stream &strm, uint32_t error_code) {
if (stopped_) {
return;
}
nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE, strm.stream_id(), nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE, strm.stream_id(),
error_code); error_code);
signal_write(); signal_write();
} }
void session_impl::resume(stream &strm) { void session_impl::resume(stream &strm) {
if (stopped_) {
return;
}
nghttp2_session_resume_data(session_, strm.stream_id()); nghttp2_session_resume_data(session_, strm.stream_id());
signal_write(); signal_write();
} }
...@@ -396,6 +433,11 @@ const request *session_impl::submit(boost::system::error_code &ec, ...@@ -396,6 +433,11 @@ const request *session_impl::submit(boost::system::error_code &ec,
header_map h) { header_map h) {
ec.clear(); ec.clear();
if (stopped_) {
ec = make_error_code(static_cast<nghttp2_error>(NGHTTP2_INTERNAL_ERROR));
return nullptr;
}
http_parser_url u{}; http_parser_url u{};
// TODO Handle CONNECT method // TODO Handle CONNECT method
if (http_parser_parse_url(uri.c_str(), uri.size(), 0, &u) != 0) { if (http_parser_parse_url(uri.c_str(), uri.size(), 0, &u) != 0) {
...@@ -485,6 +527,10 @@ const request *session_impl::submit(boost::system::error_code &ec, ...@@ -485,6 +527,10 @@ const request *session_impl::submit(boost::system::error_code &ec,
} }
void session_impl::shutdown() { void session_impl::shutdown() {
if (stopped_) {
return;
}
nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR); nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR);
signal_write(); signal_write();
} }
...@@ -522,14 +568,21 @@ void session_impl::leave_callback() { ...@@ -522,14 +568,21 @@ void session_impl::leave_callback() {
} }
void session_impl::do_read() { void session_impl::do_read() {
read_socket([this](const boost::system::error_code &ec, if (stopped_) {
std::size_t bytes_transferred) { return;
}
deadline_.expires_from_now(read_timeout_);
auto self = this->shared_from_this();
read_socket([this, self](const boost::system::error_code &ec,
std::size_t bytes_transferred) {
if (ec) { if (ec) {
if (!should_stop()) { if (!should_stop()) {
call_error_cb(ec); call_error_cb(ec);
} }
shutdown_socket(); stop();
return; return;
} }
...@@ -542,7 +595,7 @@ void session_impl::do_read() { ...@@ -542,7 +595,7 @@ void session_impl::do_read() {
if (rv != static_cast<ssize_t>(bytes_transferred)) { if (rv != static_cast<ssize_t>(bytes_transferred)) {
call_error_cb(make_error_code( call_error_cb(make_error_code(
static_cast<nghttp2_error>(rv < 0 ? rv : NGHTTP2_ERR_PROTO))); static_cast<nghttp2_error>(rv < 0 ? rv : NGHTTP2_ERR_PROTO)));
shutdown_socket(); stop();
return; return;
} }
} }
...@@ -550,7 +603,7 @@ void session_impl::do_read() { ...@@ -550,7 +603,7 @@ void session_impl::do_read() {
do_write(); do_write();
if (should_stop()) { if (should_stop()) {
shutdown_socket(); stop();
return; return;
} }
...@@ -559,6 +612,10 @@ void session_impl::do_read() { ...@@ -559,6 +612,10 @@ void session_impl::do_read() {
} }
void session_impl::do_write() { void session_impl::do_write() {
if (stopped_) {
return;
}
if (writing_) { if (writing_) {
return; return;
} }
...@@ -580,7 +637,7 @@ void session_impl::do_write() { ...@@ -580,7 +637,7 @@ void session_impl::do_write() {
auto n = nghttp2_session_mem_send(session_, &data); auto n = nghttp2_session_mem_send(session_, &data);
if (n < 0) { if (n < 0) {
call_error_cb(make_error_code(static_cast<nghttp2_error>(n))); call_error_cb(make_error_code(static_cast<nghttp2_error>(n)));
shutdown_socket(); stop();
return; return;
} }
...@@ -602,23 +659,51 @@ void session_impl::do_write() { ...@@ -602,23 +659,51 @@ void session_impl::do_write() {
} }
if (wblen_ == 0) { if (wblen_ == 0) {
if (should_stop()) {
stop();
}
return; return;
} }
writing_ = true; writing_ = true;
write_socket([this](const boost::system::error_code &ec, std::size_t n) { // Reset read deadline here, because normally client is sending
if (ec) { // something, it does not expect timeout while doing it.
call_error_cb(ec); deadline_.expires_from_now(read_timeout_);
shutdown_socket();
return;
}
wblen_ = 0; auto self = this->shared_from_this();
writing_ = false;
do_write(); write_socket(
}); [this, self](const boost::system::error_code &ec, std::size_t n) {
if (ec) {
call_error_cb(ec);
stop();
return;
}
wblen_ = 0;
writing_ = false;
do_write();
});
}
void session_impl::stop() {
if (stopped_) {
return;
}
shutdown_socket();
deadline_.cancel();
stopped_ = true;
}
void session_impl::connect_timeout(const boost::posix_time::time_duration &t) {
connect_timeout_ = t;
}
void session_impl::read_timeout(const boost::posix_time::time_duration &t) {
read_timeout_ = t;
} }
} // namespace client } // namespace client
......
...@@ -41,7 +41,7 @@ class stream; ...@@ -41,7 +41,7 @@ class stream;
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
class session_impl { class session_impl : public std::enable_shared_from_this<session_impl> {
public: public:
session_impl(boost::asio::io_service &io_service); session_impl(boost::asio::io_service &io_service);
virtual ~session_impl(); virtual ~session_impl();
...@@ -91,6 +91,11 @@ public: ...@@ -91,6 +91,11 @@ public:
void do_read(); void do_read();
void do_write(); void do_write();
void connect_timeout(const boost::posix_time::time_duration &t);
void read_timeout(const boost::posix_time::time_duration &t);
void stop();
protected: protected:
boost::array<uint8_t, 8_k> rb_; boost::array<uint8_t, 8_k> rb_;
boost::array<uint8_t, 64_k> wb_; boost::array<uint8_t, 64_k> wb_;
...@@ -100,6 +105,7 @@ private: ...@@ -100,6 +105,7 @@ private:
bool should_stop() const; bool should_stop() const;
bool setup_session(); bool setup_session();
void call_error_cb(const boost::system::error_code &ec); void call_error_cb(const boost::system::error_code &ec);
void handle_deadline();
boost::asio::io_service &io_service_; boost::asio::io_service &io_service_;
tcp::resolver resolver_; tcp::resolver resolver_;
...@@ -109,6 +115,10 @@ private: ...@@ -109,6 +115,10 @@ private:
connect_cb connect_cb_; connect_cb connect_cb_;
error_cb error_cb_; error_cb error_cb_;
boost::asio::deadline_timer deadline_;
boost::posix_time::time_duration connect_timeout_;
boost::posix_time::time_duration read_timeout_;
nghttp2_session *session_; nghttp2_session *session_;
const uint8_t *data_pending_; const uint8_t *data_pending_;
...@@ -116,6 +126,7 @@ private: ...@@ -116,6 +126,7 @@ private:
bool writing_; bool writing_;
bool inside_callback_; bool inside_callback_;
bool stopped_;
}; };
} // namespace client } // namespace client
......
...@@ -31,9 +31,7 @@ namespace client { ...@@ -31,9 +31,7 @@ namespace client {
session_tcp_impl::session_tcp_impl(boost::asio::io_service &io_service, session_tcp_impl::session_tcp_impl(boost::asio::io_service &io_service,
const std::string &host, const std::string &host,
const std::string &service) const std::string &service)
: session_impl(io_service), socket_(io_service) { : session_impl(io_service), socket_(io_service) {}
start_resolve(host, service);
}
session_tcp_impl::~session_tcp_impl() {} session_tcp_impl::~session_tcp_impl() {}
...@@ -62,7 +60,10 @@ void session_tcp_impl::write_socket( ...@@ -62,7 +60,10 @@ void session_tcp_impl::write_socket(
boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h); boost::asio::async_write(socket_, boost::asio::buffer(wb_, wblen_), h);
} }
void session_tcp_impl::shutdown_socket() { socket_.close(); } void session_tcp_impl::shutdown_socket() {
boost::system::error_code ignored_ec;
socket_.close(ignored_ec);
}
} // namespace client } // namespace client
} // namespace asio_http2 } // namespace asio_http2
......
...@@ -38,8 +38,6 @@ session_tls_impl::session_tls_impl(boost::asio::io_service &io_service, ...@@ -38,8 +38,6 @@ session_tls_impl::session_tls_impl(boost::asio::io_service &io_service,
// ssl::context::set_verify_mode(boost::asio::ssl::verify_peer) is // ssl::context::set_verify_mode(boost::asio::ssl::verify_peer) is
// not used, which is what we want. // not used, which is what we want.
socket_.set_verify_callback(boost::asio::ssl::rfc2818_verification(host)); socket_.set_verify_callback(boost::asio::ssl::rfc2818_verification(host));
start_resolve(host, service);
} }
session_tls_impl::~session_tls_impl() {} session_tls_impl::~session_tls_impl() {}
...@@ -85,7 +83,8 @@ void session_tls_impl::write_socket( ...@@ -85,7 +83,8 @@ void session_tls_impl::write_socket(
} }
void session_tls_impl::shutdown_socket() { void session_tls_impl::shutdown_socket() {
socket_.async_shutdown([](const boost::system::error_code &ec) {}); boost::system::error_code ignored_ec;
socket_.lowest_layer().close(ignored_ec);
} }
} // namespace client } // namespace client
......
...@@ -144,6 +144,12 @@ public: ...@@ -144,6 +144,12 @@ public:
// and session is terminated. // and session is terminated.
void on_error(error_cb cb) const; void on_error(error_cb cb) const;
// Sets connect timeout, which defaults to 60 seconds.
void connect_timeout(const boost::posix_time::time_duration &t);
// Sets read timeout, which defaults to 60 seconds.
void read_timeout(const boost::posix_time::time_duration &t);
// Shutdowns connection. // Shutdowns connection.
void shutdown() const; void shutdown() const;
...@@ -177,7 +183,7 @@ public: ...@@ -177,7 +183,7 @@ public:
generator_cb cb, header_map h = header_map{}) const; generator_cb cb, header_map h = header_map{}) const;
private: private:
std::unique_ptr<session_impl> impl_; std::shared_ptr<session_impl> impl_;
}; };
// configure |tls_ctx| for client use. Currently, we just set NPN // configure |tls_ctx| for client use. Currently, we just set NPN
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment