Commit 7469139d authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

h2load: Implement HTTP/1 upload

h2load has supported uploading a file quite a while, but it turns out
that it worked with HTTP/2 and SPDY only.  HTTP/1 with upload did not
work.  This commit fixes this bug, and implement HTTP/1 upload.  Due
to architectural limitation of h2load, when -d option is used, the
number of in-flight pipe-lined requests is set to 1.
parent 51c7a13c
......@@ -269,7 +269,7 @@ bool check_stop_client_request_timeout(Client *client, ev_timer *w) {
auto nreq = client->req_todo - client->req_started;
if (nreq == 0 ||
client->streams.size() >= (size_t)config.max_concurrent_streams) {
client->streams.size() >= client->session->max_concurrent_streams()) {
// no more requests to make, stop timer
ev_timer_stop(client->worker->loop, w);
return true;
......@@ -330,7 +330,8 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
req_done(0),
id(id),
fd(-1),
new_connection_requested(false) {
new_connection_requested(false),
final(false) {
ev_io_init(&wev, writecb, 0, EV_WRITE);
ev_io_init(&rev, readcb, 0, EV_READ);
......@@ -518,6 +519,8 @@ void Client::disconnect() {
close(fd);
fd = -1;
}
final = false;
}
int Client::submit_request() {
......@@ -860,7 +863,7 @@ int Client::connection_made() {
if (!config.timing_script) {
auto nreq =
std::min(req_todo - req_started, (size_t)config.max_concurrent_streams);
std::min(req_todo - req_started, session->max_concurrent_streams());
for (; nreq > 0; --nreq) {
if (submit_request() != 0) {
process_request_failure();
......@@ -2244,6 +2247,12 @@ int main(int argc, char **argv) {
h1req += nv.value;
h1req += "\r\n";
}
// TODO do this for h2 and spdy too.
if (config.data_fd != -1) {
h1req += "Content-Length: ";
h1req += util::utos(config.data_length);
h1req += "\r\n";
}
h1req += "\r\n";
config.h1reqs.push_back(std::move(h1req));
......
......@@ -298,6 +298,9 @@ struct Client {
ev_timer conn_inactivity_watcher;
std::string selected_proto;
bool new_connection_requested;
// true if the current connection will be closed, and no more new
// request cannot be processed.
bool final;
enum { ERR_CONNECT_FAIL = -100 };
......
......@@ -57,7 +57,7 @@ namespace {
int htp_msg_begincb(http_parser *htp) {
auto session = static_cast<Http1Session *>(htp->data);
if (session->stream_resp_counter_ >= session->stream_req_counter_) {
if (session->stream_resp_counter_ > session->stream_req_counter_) {
return -1;
}
......@@ -82,16 +82,21 @@ int htp_msg_completecb(http_parser *htp) {
auto session = static_cast<Http1Session *>(htp->data);
auto client = session->get_client();
auto final = http_should_keep_alive(htp) == 0;
client->final = http_should_keep_alive(htp) == 0;
auto req_stat = client->get_req_stat(session->stream_resp_counter_);
assert(req_stat);
client->on_stream_close(session->stream_resp_counter_, true, final);
auto config = client->worker->config;
if (req_stat->data_offset >= config->data_length) {
client->on_stream_close(session->stream_resp_counter_, true, client->final);
}
session->stream_resp_counter_ += 2;
if (final) {
if (client->final) {
session->stream_req_counter_ = session->stream_resp_counter_;
http_parser_pause(htp, 1);
// Connection is going down. If we have still request to do,
// create new connection and keep on doing the job.
......@@ -171,8 +176,12 @@ int Http1Session::submit_request() {
client_->record_request_time(req_stat);
client_->wb.write(req.c_str(), req.size());
// increment for next request
stream_req_counter_ += 2;
// TODO try read some data here
if (config->data_fd == -1 || config->data_length == 0) {
// increment for next request
stream_req_counter_ += 2;
}
return 0;
}
......@@ -206,6 +215,47 @@ int Http1Session::on_write() {
if (complete_) {
return -1;
}
auto config = client_->worker->config;
auto req_stat = client_->get_req_stat(stream_req_counter_);
if (!req_stat) {
return 0;
}
if (req_stat->data_offset < config->data_length) {
auto req_stat = client_->get_req_stat(stream_req_counter_);
auto &wb = client_->wb;
ssize_t nread;
while ((nread = pread(config->data_fd, wb.last, wb.wleft(),
req_stat->data_offset)) == -1 &&
errno == EINTR)
;
if (nread == -1) {
return -1;
}
req_stat->data_offset += nread;
wb.write(nread);
if (client_->worker->config->verbose) {
std::cout << "[send " << nread << " byte(s)]" << std::endl;
}
if (req_stat->data_offset == config->data_length) {
// increment for next request
stream_req_counter_ += 2;
if (stream_resp_counter_ == stream_req_counter_) {
// Response has already been received
client_->on_stream_close(stream_resp_counter_ - 2, true,
client_->final);
}
}
}
return 0;
}
......@@ -213,4 +263,10 @@ void Http1Session::terminate() { complete_ = true; }
Client *Http1Session::get_client() { return client_; }
size_t Http1Session::max_concurrent_streams() {
auto config = client_->worker->config;
return config->data_fd == -1 ? config->max_concurrent_streams : 1;
}
} // namespace h2load
......@@ -42,6 +42,7 @@ public:
virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write();
virtual void terminate();
virtual size_t max_concurrent_streams();
Client *get_client();
int32_t stream_req_counter_;
int32_t stream_resp_counter_;
......
......@@ -289,4 +289,8 @@ void Http2Session::terminate() {
nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR);
}
size_t Http2Session::max_concurrent_streams() {
return (size_t)client_->worker->config->max_concurrent_streams;
}
} // namespace h2load
......@@ -42,6 +42,7 @@ public:
virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write();
virtual void terminate();
virtual size_t max_concurrent_streams();
private:
Client *client_;
......
......@@ -50,6 +50,8 @@ public:
virtual int on_write() = 0;
// Called when the underlying session must be terminated.
virtual void terminate() = 0;
// Return the maximum concurrency per connection
virtual size_t max_concurrent_streams() = 0;
};
} // namespace h2load
......
......@@ -282,4 +282,8 @@ void SpdySession::handle_window_update(int32_t stream_id, size_t recvlen) {
}
}
size_t SpdySession::max_concurrent_streams() {
return (size_t)client_->worker->config->max_concurrent_streams;
}
} // namespace h2load
......@@ -44,6 +44,7 @@ public:
virtual int on_read(const uint8_t *data, size_t len);
virtual int on_write();
virtual void terminate();
virtual size_t max_concurrent_streams();
void handle_window_update(int32_t stream_id, size_t recvlen);
private:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment