Commit 60bbb5ca authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

h2load: Perform sampling for request timings to reduce memory consumption

parent da859100
......@@ -98,11 +98,15 @@ Config config;
RequestStat::RequestStat() : data_offset(0), completed(false) {}
constexpr size_t MAX_STATS = 1000000;
Stats::Stats(size_t req_todo, size_t nclients)
: req_todo(0), req_started(0), req_done(0), req_success(0),
req_status_success(0), req_failed(0), req_error(0), req_timedout(0),
bytes_total(0), bytes_head(0), bytes_head_decomp(0), bytes_body(0),
status(), req_stats(req_todo), client_stats(nclients) {}
status(), client_stats(nclients) {
req_stats.reserve(std::min(req_todo, MAX_STATS));
}
Stream::Stream() : status_success(-1) {}
......@@ -420,9 +424,8 @@ void Client::disconnect() {
}
int Client::submit_request() {
auto req_stat = &worker->stats.req_stats[worker->stats.req_started++];
if (session->submit_request(req_stat) != 0) {
++worker->stats.req_started;
if (session->submit_request() != 0) {
return -1;
}
......@@ -607,8 +610,12 @@ void Client::on_status_code(int32_t stream_id, uint16_t status) {
}
}
void Client::on_stream_close(int32_t stream_id, bool success,
RequestStat *req_stat, bool final) {
void Client::on_stream_close(int32_t stream_id, bool success, bool final) {
auto req_stat = get_req_stat(stream_id);
if (!req_stat) {
return;
}
req_stat->stream_close_time = std::chrono::steady_clock::now();
if (success) {
req_stat->completed = true;
......@@ -628,6 +635,12 @@ void Client::on_stream_close(int32_t stream_id, bool success,
++worker->stats.req_failed;
++worker->stats.req_error;
}
if (req_stat->completed &&
(worker->stats.req_done % worker->request_times_sampling_step) == 0) {
worker->sample_req_stat(req_stat);
}
report_progress();
streams.erase(stream_id);
if (req_done == req_todo) {
......@@ -645,6 +658,15 @@ void Client::on_stream_close(int32_t stream_id, bool success,
}
}
RequestStat *Client::get_req_stat(int32_t stream_id) {
auto it = streams.find(stream_id);
if (it == std::end(streams)) {
return nullptr;
}
return &(*it).second.req_stat;
}
int Client::connection_made() {
if (ssl) {
report_tls_info();
......@@ -750,7 +772,6 @@ int Client::connection_made() {
if (!config.timing_script) {
auto nreq =
std::min(req_todo - req_started, (size_t)config.max_concurrent_streams);
for (; nreq > 0; --nreq) {
if (submit_request() != 0) {
process_request_failure();
......@@ -1060,6 +1081,10 @@ Worker::Worker(uint32_t id, SSL_CTX *ssl_ctx, size_t req_todo, size_t nclients,
clients.push_back(make_unique<Client>(next_client_id++, this, req_todo));
}
}
auto request_times_max_stats = std::min(req_todo, MAX_STATS);
request_times_sampling_step =
(req_todo + request_times_max_stats - 1) / request_times_max_stats;
}
Worker::~Worker() {
......@@ -1088,6 +1113,11 @@ void Worker::run() {
ev_run(loop, 0);
}
void Worker::sample_req_stat(RequestStat *req_stat) {
stats.req_stats.push_back(*req_stat);
assert(stats.req_stats.size() <= MAX_STATS);
}
namespace {
// Returns percentage of number of samples within mean +/- sd.
double within_sd(const std::vector<double> &samples, double mean, double sd) {
......@@ -1106,12 +1136,15 @@ double within_sd(const std::vector<double> &samples, double mean, double sd) {
namespace {
// Computes statistics using |samples|. The min, max, mean, sd, and
// percentage of number of samples within mean +/- sd are computed.
SDStat compute_time_stat(const std::vector<double> &samples) {
// If |sampling| is true, this computes sample variance. Otherwise,
// population variance.
SDStat compute_time_stat(const std::vector<double> &samples,
bool sampling = false) {
if (samples.empty()) {
return {0.0, 0.0, 0.0, 0.0, 0.0};
}
// standard deviation calculated using Rapid calculation method:
// http://en.wikipedia.org/wiki/Standard_deviation#Rapid_calculation_methods
// https://en.wikipedia.org/wiki/Standard_deviation#Rapid_calculation_methods
double a = 0, q = 0;
size_t n = 0;
double sum = 0;
......@@ -1130,7 +1163,7 @@ SDStat compute_time_stat(const std::vector<double> &samples) {
assert(n > 0);
res.mean = sum / n;
res.sd = sqrt(q / n);
res.sd = sqrt(q / (sampling && n > 1 ? n - 1 : n));
res.within_sd = within_sd(samples, res.mean, res.sd);
return res;
......@@ -1140,9 +1173,13 @@ SDStat compute_time_stat(const std::vector<double> &samples) {
namespace {
SDStats
process_time_stats(const std::vector<std::unique_ptr<Worker>> &workers) {
auto request_times_sampling = false;
size_t nrequest_times = 0;
for (const auto &w : workers) {
nrequest_times += w->stats.req_stats.size();
if (w->request_times_sampling_step != 1) {
request_times_sampling = true;
}
}
std::vector<double> request_times;
......@@ -1195,8 +1232,9 @@ process_time_stats(const std::vector<std::unique_ptr<Worker>> &workers) {
}
}
return {compute_time_stat(request_times), compute_time_stat(connect_times),
compute_time_stat(ttfb_times), compute_time_stat(rps_values)};
return {compute_time_stat(request_times, request_times_sampling),
compute_time_stat(connect_times), compute_time_stat(ttfb_times),
compute_time_stat(rps_values)};
}
} // namespace
......
......@@ -226,6 +226,9 @@ struct Worker {
// at most nreqs_rem clients get an extra request
size_t nreqs_rem;
size_t rate;
// every successful request_times_sampling_step-th request's
// req_stat will get sampled.
size_t request_times_sampling_step;
ev_timer timeout_watcher;
// The next client ID this worker assigns
uint32_t next_client_id;
......@@ -235,9 +238,11 @@ struct Worker {
~Worker();
Worker(Worker &&o) = default;
void run();
void sample_req_stat(RequestStat *req_stat);
};
struct Stream {
RequestStat req_stat;
int status_success;
Stream();
};
......@@ -318,8 +323,12 @@ struct Client {
// |success| == true means that the request/response was exchanged
// |successfully, but it does not mean response carried successful
// |HTTP status code.
void on_stream_close(int32_t stream_id, bool success, RequestStat *req_stat,
bool final = false);
void on_stream_close(int32_t stream_id, bool success, bool final = false);
// Returns RequestStat for |stream_id|. This function must be
// called after on_request(stream_id), and before
// on_stream_close(stream_id, ...). Otherwise, this will return
// nullptr.
RequestStat *get_req_stat(int32_t stream_id);
void record_request_time(RequestStat *req_stat);
void record_connect_start_time();
......
......@@ -80,9 +80,11 @@ int htp_msg_completecb(http_parser *htp) {
auto client = session->get_client();
auto final = http_should_keep_alive(htp) == 0;
client->on_stream_close(session->stream_resp_counter_, true,
session->req_stats_[session->stream_resp_counter_],
final);
auto req_stat = client->get_req_stat(session->stream_resp_counter_);
assert(req_stat);
client->on_stream_close(session->stream_resp_counter_, true, final);
session->stream_resp_counter_ += 2;
......@@ -150,7 +152,7 @@ http_parser_settings htp_hooks = {
void Http1Session::on_connect() { client_->signal_write(); }
int Http1Session::submit_request(RequestStat *req_stat) {
int Http1Session::submit_request() {
auto config = client_->worker->config;
const auto &req = config->h1reqs[client_->reqidx];
client_->reqidx++;
......@@ -159,13 +161,13 @@ int Http1Session::submit_request(RequestStat *req_stat) {
client_->reqidx = 0;
}
assert(req_stat);
client_->on_request(stream_req_counter_);
auto req_stat = client_->get_req_stat(stream_req_counter_);
client_->record_request_time(req_stat);
client_->wb.write(req.c_str(), req.size());
client_->on_request(stream_req_counter_);
req_stats_[stream_req_counter_] = req_stat;
// increment for next request
stream_req_counter_ += 2;
......
......@@ -38,14 +38,13 @@ public:
Http1Session(Client *client);
virtual ~Http1Session();
virtual void on_connect();
virtual int submit_request(RequestStat *req_stat);
virtual int submit_request();
virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write();
virtual void terminate();
Client *get_client();
int32_t stream_req_counter_;
int32_t stream_resp_counter_;
std::unordered_map<int32_t, RequestStat *> req_stats_;
private:
Client *client_;
......
......@@ -89,12 +89,25 @@ namespace {
int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *user_data) {
auto client = static_cast<Client *>(user_data);
auto req_stat = static_cast<RequestStat *>(
nghttp2_session_get_stream_user_data(session, stream_id));
if (!req_stat) {
client->on_stream_close(stream_id, error_code == NGHTTP2_NO_ERROR);
return 0;
}
} // namespace
namespace {
int on_frame_not_send_callback(nghttp2_session *session,
const nghttp2_frame *frame, int lib_error_code,
void *user_data) {
if (frame->hd.type != NGHTTP2_HEADERS ||
frame->headers.cat != NGHTTP2_HCAT_REQUEST) {
return 0;
}
client->on_stream_close(stream_id, error_code == NGHTTP2_NO_ERROR, req_stat);
auto client = static_cast<Client *>(user_data);
// request was not sent. Mark it as error.
client->on_stream_close(frame->hd.stream_id, false);
return 0;
}
} // namespace
......@@ -108,9 +121,7 @@ int before_frame_send_callback(nghttp2_session *session,
}
auto client = static_cast<Client *>(user_data);
client->on_request(frame->hd.stream_id);
auto req_stat = static_cast<RequestStat *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
auto req_stat = client->get_req_stat(frame->hd.stream_id);
assert(req_stat);
client->record_request_time(req_stat);
......@@ -124,8 +135,7 @@ ssize_t file_read_callback(nghttp2_session *session, int32_t stream_id,
nghttp2_data_source *source, void *user_data) {
auto client = static_cast<Client *>(user_data);
auto config = client->worker->config;
auto req_stat = static_cast<RequestStat *>(
nghttp2_session_get_stream_user_data(session, stream_id));
auto req_stat = client->get_req_stat(stream_id);
assert(req_stat);
ssize_t nread;
while ((nread = pread(config->data_fd, buf, length, req_stat->data_offset)) ==
......@@ -183,6 +193,9 @@ void Http2Session::on_connect() {
nghttp2_session_callbacks_set_on_header_callback(callbacks,
on_header_callback);
nghttp2_session_callbacks_set_on_frame_not_send_callback(
callbacks, on_frame_not_send_callback);
nghttp2_session_callbacks_set_before_frame_send_callback(
callbacks, before_frame_send_callback);
......@@ -212,7 +225,7 @@ void Http2Session::on_connect() {
client_->signal_write();
}
int Http2Session::submit_request(RequestStat *req_stat) {
int Http2Session::submit_request() {
if (nghttp2_session_check_request_allowed(session_) == 0) {
return -1;
}
......@@ -228,11 +241,13 @@ int Http2Session::submit_request(RequestStat *req_stat) {
auto stream_id =
nghttp2_submit_request(session_, nullptr, nva.data(), nva.size(),
config->data_fd == -1 ? nullptr : &prd, req_stat);
config->data_fd == -1 ? nullptr : &prd, nullptr);
if (stream_id < 0) {
return -1;
}
client_->on_request(stream_id);
return 0;
}
......
......@@ -38,7 +38,7 @@ public:
Http2Session(Client *client);
virtual ~Http2Session();
virtual void on_connect();
virtual int submit_request(RequestStat *req_stat);
virtual int submit_request();
virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write();
virtual void terminate();
......
......@@ -41,7 +41,7 @@ public:
// Called when the connection was made.
virtual void on_connect() = 0;
// Called when one request must be issued.
virtual int submit_request(RequestStat *req_stat) = 0;
virtual int submit_request() = 0;
// Called when incoming bytes are available. The subclass has to
// return the number of bytes read.
virtual int on_read(const uint8_t *data, size_t len) = 0;
......
......@@ -48,9 +48,7 @@ void before_ctrl_send_callback(spdylay_session *session,
return;
}
client->on_request(frame->syn_stream.stream_id);
auto req_stat =
static_cast<RequestStat *>(spdylay_session_get_stream_user_data(
session, frame->syn_stream.stream_id));
auto req_stat = client->get_req_stat(frame->syn_stream.stream_id);
client->record_request_time(req_stat);
}
} // namespace
......@@ -104,9 +102,7 @@ void on_stream_close_callback(spdylay_session *session, int32_t stream_id,
spdylay_status_code status_code,
void *user_data) {
auto client = static_cast<Client *>(user_data);
auto req_stat = static_cast<RequestStat *>(
spdylay_session_get_stream_user_data(session, stream_id));
client->on_stream_close(stream_id, status_code == SPDYLAY_OK, req_stat);
client->on_stream_close(stream_id, status_code == SPDYLAY_OK);
}
} // namespace
......@@ -130,8 +126,7 @@ ssize_t file_read_callback(spdylay_session *session, int32_t stream_id,
spdylay_data_source *source, void *user_data) {
auto client = static_cast<Client *>(user_data);
auto config = client->worker->config;
auto req_stat = static_cast<RequestStat *>(
spdylay_session_get_stream_user_data(session, stream_id));
auto req_stat = client->get_req_stat(stream_id);
ssize_t nread;
while ((nread = pread(config->data_fd, buf, length, req_stat->data_offset)) ==
......@@ -185,7 +180,7 @@ void SpdySession::on_connect() {
client_->signal_write();
}
int SpdySession::submit_request(RequestStat *req_stat) {
int SpdySession::submit_request() {
int rv;
auto config = client_->worker->config;
auto &nv = config->nv[client_->reqidx++];
......@@ -197,7 +192,7 @@ int SpdySession::submit_request(RequestStat *req_stat) {
spdylay_data_provider prd{{0}, file_read_callback};
rv = spdylay_submit_request(session_, 0, nv.data(),
config->data_fd == -1 ? nullptr : &prd, req_stat);
config->data_fd == -1 ? nullptr : &prd, nullptr);
if (rv != 0) {
return -1;
......
......@@ -40,7 +40,7 @@ public:
SpdySession(Client *client, uint16_t spdy_version);
virtual ~SpdySession();
virtual void on_connect();
virtual int submit_request(RequestStat *req_stat);
virtual int submit_request();
virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write();
virtual void terminate();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment