shrpx_worker.cc 18.7 KB
Newer Older
1
/*
2
 * nghttp2 - HTTP/2 C Library
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
 *
 * Copyright (c) 2012 Tatsuhiro Tsujikawa
 *
 * Permission is hereby granted, free of charge, to any person obtaining
 * a copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to
 * permit persons to whom the Software is furnished to do so, subject to
 * the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */
#include "shrpx_worker.h"

27
#ifdef HAVE_UNISTD_H
28
#include <unistd.h>
29
#endif // HAVE_UNISTD_H
30

31
#include <memory>
32 33 34

#include "shrpx_ssl.h"
#include "shrpx_log.h"
35
#include "shrpx_client_handler.h"
36
#include "shrpx_http2_session.h"
37
#include "shrpx_log_config.h"
38
#include "shrpx_connect_blocker.h"
39
#include "shrpx_live_check.h"
40
#include "shrpx_memcached_dispatcher.h"
41
#ifdef HAVE_MRUBY
42
#include "shrpx_mruby.h"
43
#endif // HAVE_MRUBY
44
#include "util.h"
45
#include "template.h"
46

47 48 49
namespace shrpx {

namespace {
50 51 52
void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
  auto worker = static_cast<Worker *>(w->data);
  worker->process_events();
53 54 55
}
} // namespace

56 57 58 59 60 61 62 63 64 65
namespace {
void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
  auto worker = static_cast<Worker *>(w->data);
  if (worker->get_worker_stat()->num_connections != 0) {
    return;
  }
  worker->get_mcpool()->clear();
}
} // namespace

66 67 68 69 70 71 72
namespace {
void proc_wev_cb(struct ev_loop *loop, ev_timer *w, int revents) {
  auto worker = static_cast<Worker *>(w->data);
  worker->process_events();
}
} // namespace

73 74 75 76
namespace {
bool match_shared_downstream_addr(
    const std::shared_ptr<SharedDownstreamAddr> &lhs,
    const std::shared_ptr<SharedDownstreamAddr> &rhs) {
77
  if (lhs->addrs.size() != rhs->addrs.size()) {
78 79 80
    return false;
  }

81 82 83 84
  if (lhs->affinity != rhs->affinity) {
    return false;
  }

85 86
  auto used = std::vector<bool>(lhs->addrs.size());

87
  for (auto &a : lhs->addrs) {
88 89 90 91 92 93 94
    size_t i;
    for (i = 0; i < rhs->addrs.size(); ++i) {
      if (used[i]) {
        continue;
      }

      auto &b = rhs->addrs[i];
95
      if (a.host == b.host && a.port == b.port && a.host_unix == b.host_unix &&
96 97
          a.proto == b.proto && a.tls == b.tls && a.sni == b.sni &&
          a.fall == b.fall && a.rise == b.rise) {
98 99 100 101 102
        break;
      }
    }

    if (i == rhs->addrs.size()) {
103 104
      return false;
    }
105 106

    used[i] = true;
107 108 109 110 111 112
  }

  return true;
}
} // namespace

113 114 115 116
namespace {
std::random_device rd;
} // namespace

117
Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
118
               SSL_CTX *tls_session_cache_memcached_ssl_ctx,
119
               ssl::CertLookupTree *cert_tree,
120 121 122
               const std::shared_ptr<TicketKeys> &ticket_keys,
               ConnectionHandler *conn_handler,
               std::shared_ptr<DownstreamConfig> downstreamconf)
123
    : randgen_(rd()),
124
      worker_stat_{},
125 126 127 128
      loop_(loop),
      sv_ssl_ctx_(sv_ssl_ctx),
      cl_ssl_ctx_(cl_ssl_ctx),
      cert_tree_(cert_tree),
129
      conn_handler_(conn_handler),
130
      ticket_keys_(ticket_keys),
131 132
      connect_blocker_(
          make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
133
      graceful_shutdown_(false) {
134 135 136 137
  ev_async_init(&w_, eventcb);
  w_.data = this;
  ev_async_start(loop_, &w_);

138 139 140
  ev_timer_init(&mcpool_clear_timer_, mcpool_clear_cb, 0., 0.);
  mcpool_clear_timer_.data = this;

141 142 143
  ev_timer_init(&proc_wev_timer_, proc_wev_cb, 0., 0.);
  proc_wev_timer_.data = this;

144 145
  auto &session_cacheconf = get_config()->tls.session_cache;

146
  if (!session_cacheconf.memcached.host.empty()) {
147
    session_cache_memcached_dispatcher_ = make_unique<MemcachedDispatcher>(
148 149
        &session_cacheconf.memcached.addr, loop,
        tls_session_cache_memcached_ssl_ctx,
150
        StringRef{session_cacheconf.memcached.host}, &mcpool_);
151 152
  }

153
  replace_downstream_config(std::move(downstreamconf));
154 155 156
}

void Worker::replace_downstream_config(
157
    std::shared_ptr<DownstreamConfig> downstreamconf) {
158 159
  for (auto &g : downstream_addr_groups_) {
    g->retired = true;
160 161 162 163 164 165 166 167 168 169 170

    auto &shared_addr = g->shared_addr;

    if (shared_addr->affinity == AFFINITY_NONE) {
      shared_addr->dconn_pool.remove_all();
      continue;
    }

    for (auto &addr : shared_addr->addrs) {
      addr.dconn_pool->remove_all();
    }
171 172
  }

173 174
  downstreamconf_ = downstreamconf;

175 176 177
  // Making a copy is much faster with multiple thread on
  // backendconfig API call.
  auto groups = downstreamconf->addr_groups;
178

179 180 181 182 183
  downstream_addr_groups_ =
      std::vector<std::shared_ptr<DownstreamAddrGroup>>(groups.size());

  for (size_t i = 0; i < groups.size(); ++i) {
    auto &src = groups[i];
184 185
    auto &dst = downstream_addr_groups_[i];

186 187
    dst = std::make_shared<DownstreamAddrGroup>();
    dst->pattern = src.pattern;
188

189 190
    // TODO for some reason, clang-3.6 which comes with Ubuntu 15.10
    // does not value initialize with std::make_shared.
191
    auto shared_addr = std::make_shared<SharedDownstreamAddr>();
192

193
    shared_addr->addrs.resize(src.addrs.size());
194
    shared_addr->affinity = src.affinity;
195 196 197

    size_t num_http1 = 0;
    size_t num_http2 = 0;
198 199 200

    for (size_t j = 0; j < src.addrs.size(); ++j) {
      auto &src_addr = src.addrs[j];
201
      auto &dst_addr = shared_addr->addrs[j];
202 203 204 205 206 207

      dst_addr.addr = src_addr.addr;
      dst_addr.host = src_addr.host;
      dst_addr.hostport = src_addr.hostport;
      dst_addr.port = src_addr.port;
      dst_addr.host_unix = src_addr.host_unix;
208 209
      dst_addr.proto = src_addr.proto;
      dst_addr.tls = src_addr.tls;
210
      dst_addr.sni = src_addr.sni;
211 212
      dst_addr.fall = src_addr.fall;
      dst_addr.rise = src_addr.rise;
213

214 215
      auto shared_addr_ptr = shared_addr.get();

216 217
      dst_addr.connect_blocker =
          make_unique<ConnectBlocker>(randgen_, loop_,
218
                                      [shared_addr_ptr, &dst_addr]() {
219 220
                                        switch (dst_addr.proto) {
                                        case PROTO_HTTP1:
221
                                          --shared_addr_ptr->http1_pri.weight;
222 223
                                          break;
                                        case PROTO_HTTP2:
224
                                          --shared_addr_ptr->http2_pri.weight;
225 226 227 228 229
                                          break;
                                        default:
                                          assert(0);
                                        }
                                      },
230
                                      [shared_addr_ptr, &dst_addr]() {
231 232
                                        switch (dst_addr.proto) {
                                        case PROTO_HTTP1:
233
                                          ++shared_addr_ptr->http1_pri.weight;
234 235
                                          break;
                                        case PROTO_HTTP2:
236
                                          ++shared_addr_ptr->http2_pri.weight;
237 238 239 240 241 242
                                          break;
                                        default:
                                          assert(0);
                                        }
                                      });

243 244 245 246 247 248 249 250 251
      dst_addr.live_check =
          make_unique<LiveCheck>(loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);

      if (dst_addr.proto == PROTO_HTTP2) {
        ++num_http2;
      } else {
        assert(dst_addr.proto == PROTO_HTTP1);
        ++num_http1;
      }
252
    }
253 254 255 256

    // share the connection if patterns have the same set of backend
    // addresses.
    auto end = std::begin(downstream_addr_groups_) + i;
257 258 259 260 261
    auto it = std::find_if(
        std::begin(downstream_addr_groups_), end,
        [&shared_addr](const std::shared_ptr<DownstreamAddrGroup> &group) {
          return match_shared_downstream_addr(group->shared_addr, shared_addr);
        });
262 263

    if (it == end) {
264 265 266 267 268
      if (LOG_ENABLED(INFO)) {
        LOG(INFO) << "number of http/1.1 backend: " << num_http1
                  << ", number of h2 backend: " << num_http2;
      }

269 270
      shared_addr->http1_pri.weight = num_http1;
      shared_addr->http2_pri.weight = num_http2;
271

272 273 274 275 276 277
      if (shared_addr->affinity != AFFINITY_NONE) {
        for (auto &addr : shared_addr->addrs) {
          addr.dconn_pool = make_unique<DownstreamConnectionPool>();
        }
      }

278
      dst->shared_addr = shared_addr;
279
    } else {
280
      if (LOG_ENABLED(INFO)) {
281 282
        LOG(INFO) << dst->pattern << " shares the same backend group with "
                  << (*it)->pattern;
283
      }
284
      dst->shared_addr = (*it)->shared_addr;
285
    }
286
  }
287
}
288

289 290 291
Worker::~Worker() {
  ev_async_stop(loop_, &w_);
  ev_timer_stop(loop_, &mcpool_clear_timer_);
292
  ev_timer_stop(loop_, &proc_wev_timer_);
293 294 295 296 297 298 299 300
}

void Worker::schedule_clear_mcpool() {
  // libev manual says: "If the watcher is already active nothing will
  // happen."  Since we don't change any timeout here, we don't have
  // to worry about querying ev_is_active.
  ev_timer_start(loop_, &mcpool_clear_timer_);
}
301

302 303 304 305 306
void Worker::wait() {
#ifndef NOTHREADS
  fut_.get();
#endif // !NOTHREADS
}
307

308 309 310 311 312
void Worker::run_async() {
#ifndef NOTHREADS
  fut_ = std::async(std::launch::async, [this] {
    (void)reopen_log_files();
    ev_run(loop_);
313
    delete log_config();
314 315 316 317
  });
#endif // !NOTHREADS
}

318 319 320
void Worker::send(const WorkerEvent &event) {
  {
    std::lock_guard<std::mutex> g(m_);
321

322
    q_.push_back(event);
323
  }
324 325 326 327 328

  ev_async_send(loop_, &w_);
}

void Worker::process_events() {
329
  WorkerEvent wev;
330 331
  {
    std::lock_guard<std::mutex> g(m_);
332

333 334 335 336
    // Process event one at a time.  This is important for
    // NEW_CONNECTION event since accepting large number of new
    // connections at once may delay time to 1st byte for existing
    // connections.
337

338 339 340 341
    if (q_.empty()) {
      ev_timer_stop(loop_, &proc_wev_timer_);
      return;
    }
342

343 344 345
    wev = q_.front();
    q_.pop_front();
  }
346

347
  ev_timer_start(loop_, &proc_wev_timer_);
348

349
  auto worker_connections = get_config()->conn.upstream.worker_connections;
350

351 352 353 354 355 356
  switch (wev.type) {
  case NEW_CONNECTION: {
    if (LOG_ENABLED(INFO)) {
      WLOG(INFO, this) << "WorkerEvent: client_fd=" << wev.client_fd
                       << ", addrlen=" << wev.client_addrlen;
    }
357

358
    if (worker_stat_.num_connections >= worker_connections) {
359 360

      if (LOG_ENABLED(INFO)) {
361
        WLOG(INFO, this) << "Too many connections >= " << worker_connections;
362 363
      }

364 365
      close(wev.client_fd);

366
      break;
367
    }
368

369 370 371 372 373 374 375 376
    auto client_handler =
        ssl::accept_connection(this, wev.client_fd, &wev.client_addr.sa,
                               wev.client_addrlen, wev.faddr);
    if (!client_handler) {
      if (LOG_ENABLED(INFO)) {
        WLOG(ERROR, this) << "ClientHandler creation failed";
      }
      close(wev.client_fd);
377
      break;
378
    }
379

380 381 382
    if (LOG_ENABLED(INFO)) {
      WLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created ";
    }
383

384 385 386 387 388
    break;
  }
  case REOPEN_LOG:
    WLOG(NOTICE, this) << "Reopening log files: worker process (thread " << this
                       << ")";
389

390
    reopen_log_files();
391

392 393 394
    break;
  case GRACEFUL_SHUTDOWN:
    WLOG(NOTICE, this) << "Graceful shutdown commencing";
395

396
    graceful_shutdown_ = true;
397

398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
    if (worker_stat_.num_connections == 0) {
      ev_break(loop_);

      return;
    }

    break;
  case REPLACE_DOWNSTREAM:
    WLOG(NOTICE, this) << "Replace downstream";

    replace_downstream_config(wev.downstreamconf);

    break;
  default:
    if (LOG_ENABLED(INFO)) {
      WLOG(INFO, this) << "unknown event type " << wev.type;
414 415
    }
  }
416 417
}

418 419
ssl::CertLookupTree *Worker::get_cert_lookup_tree() const { return cert_tree_; }

420 421
std::shared_ptr<TicketKeys> Worker::get_ticket_keys() {
  std::lock_guard<std::mutex> g(m_);
422 423 424 425
  return ticket_keys_;
}

void Worker::set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys) {
426
  std::lock_guard<std::mutex> g(m_);
427 428 429 430 431 432 433 434 435 436 437
  ticket_keys_ = std::move(ticket_keys);
}

WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }

struct ev_loop *Worker::get_loop() const {
  return loop_;
}

SSL_CTX *Worker::get_sv_ssl_ctx() const { return sv_ssl_ctx_; }

438 439
SSL_CTX *Worker::get_cl_ssl_ctx() const { return cl_ssl_ctx_; }

440 441 442 443
void Worker::set_graceful_shutdown(bool f) { graceful_shutdown_ = f; }

bool Worker::get_graceful_shutdown() const { return graceful_shutdown_; }

444 445
MemchunkPool *Worker::get_mcpool() { return &mcpool_; }

446 447 448 449
MemcachedDispatcher *Worker::get_session_cache_memcached_dispatcher() {
  return session_cache_memcached_dispatcher_.get();
}

450 451
std::mt19937 &Worker::get_randgen() { return randgen_; }

452
#ifdef HAVE_MRUBY
453
int Worker::create_mruby_context() {
454
  mruby_ctx_ = mruby::create_mruby_context(StringRef{get_config()->mruby_file});
455 456 457 458 459 460 461 462 463 464
  if (!mruby_ctx_) {
    return -1;
  }

  return 0;
}

mruby::MRubyContext *Worker::get_mruby_context() const {
  return mruby_ctx_.get();
}
465
#endif // HAVE_MRUBY
466

467 468
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
Worker::get_downstream_addr_groups() {
469 470 471
  return downstream_addr_groups_;
}

472 473 474 475
ConnectBlocker *Worker::get_connect_blocker() const {
  return connect_blocker_.get();
}

476 477
const DownstreamConfig *Worker::get_downstream_config() const {
  return downstreamconf_.get();
478 479
}

480 481 482 483
ConnectionHandler *Worker::get_connection_handler() const {
  return conn_handler_;
}

484 485
namespace {
size_t match_downstream_addr_group_host(
486 487
    const RouterConfig &routerconf, const StringRef &host,
    const StringRef &path,
488
    const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
489
    size_t catch_all, BlockAllocator &balloc) {
490 491 492 493 494

  const auto &router = routerconf.router;
  const auto &rev_wildcard_router = routerconf.rev_wildcard_router;
  const auto &wildcard_patterns = routerconf.wildcard_patterns;

495 496 497 498 499
  if (path.empty() || path[0] != '/') {
    auto group = router.match(host, StringRef::from_lit("/"));
    if (group != -1) {
      if (LOG_ENABLED(INFO)) {
        LOG(INFO) << "Found pattern with query " << host
500
                  << ", matched pattern=" << groups[group]->pattern;
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
      }
      return group;
    }
    return catch_all;
  }

  if (LOG_ENABLED(INFO)) {
    LOG(INFO) << "Perform mapping selection, using host=" << host
              << ", path=" << path;
  }

  auto group = router.match(host, path);
  if (group != -1) {
    if (LOG_ENABLED(INFO)) {
      LOG(INFO) << "Found pattern with query " << host << path
516
                << ", matched pattern=" << groups[group]->pattern;
517 518 519 520
    }
    return group;
  }

521
  if (!wildcard_patterns.empty() && !host.empty()) {
522 523 524 525 526
    auto rev_host_src = make_byte_ref(balloc, host.size() - 1);
    auto ep =
        std::copy(std::begin(host) + 1, std::end(host), rev_host_src.base);
    std::reverse(rev_host_src.base, ep);
    auto rev_host = StringRef{rev_host_src.base, ep};
527 528 529 530 531 532 533 534 535 536 537 538 539

    ssize_t best_group = -1;
    const RNode *last_node = nullptr;

    for (;;) {
      size_t nread = 0;
      auto wcidx =
          rev_wildcard_router.match_prefix(&nread, &last_node, rev_host);
      if (wcidx == -1) {
        break;
      }

      rev_host = StringRef{std::begin(rev_host) + nread, std::end(rev_host)};
540 541 542 543 544 545 546 547 548 549

      auto &wc = wildcard_patterns[wcidx];
      auto group = wc.router.match(StringRef{}, path);
      if (group != -1) {
        // We sorted wildcard_patterns in a way that first match is the
        // longest host pattern.
        if (LOG_ENABLED(INFO)) {
          LOG(INFO) << "Found wildcard pattern with query " << host << path
                    << ", matched pattern=" << groups[group]->pattern;
        }
550 551

        best_group = group;
552 553
      }
    }
554 555 556 557

    if (best_group != -1) {
      return best_group;
    }
558 559
  }

560
  group = router.match(StringRef::from_lit(""), path);
561 562 563
  if (group != -1) {
    if (LOG_ENABLED(INFO)) {
      LOG(INFO) << "Found pattern with query " << path
564
                << ", matched pattern=" << groups[group]->pattern;
565 566 567 568 569 570 571 572 573 574 575 576
    }
    return group;
  }

  if (LOG_ENABLED(INFO)) {
    LOG(INFO) << "None match.  Use catch-all pattern";
  }
  return catch_all;
}
} // namespace

size_t match_downstream_addr_group(
577 578
    const RouterConfig &routerconf, const StringRef &hostport,
    const StringRef &raw_path,
579
    const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
580
    size_t catch_all, BlockAllocator &balloc) {
581 582 583 584 585 586 587 588 589 590 591 592
  if (std::find(std::begin(hostport), std::end(hostport), '/') !=
      std::end(hostport)) {
    // We use '/' specially, and if '/' is included in host, it breaks
    // our code.  Select catch-all case.
    return catch_all;
  }

  auto fragment = std::find(std::begin(raw_path), std::end(raw_path), '#');
  auto query = std::find(std::begin(raw_path), fragment, '?');
  auto path = StringRef{std::begin(raw_path), query};

  if (hostport.empty()) {
593
    return match_downstream_addr_group_host(routerconf, hostport, path, groups,
594
                                            catch_all, balloc);
595 596
  }

597
  StringRef host;
598 599 600 601 602 603 604 605 606
  if (hostport[0] == '[') {
    // assume this is IPv6 numeric address
    auto p = std::find(std::begin(hostport), std::end(hostport), ']');
    if (p == std::end(hostport)) {
      return catch_all;
    }
    if (p + 1 < std::end(hostport) && *(p + 1) != ':') {
      return catch_all;
    }
607
    host = StringRef{std::begin(hostport), p + 1};
608 609 610 611 612
  } else {
    auto p = std::find(std::begin(hostport), std::end(hostport), ':');
    if (p == std::begin(hostport)) {
      return catch_all;
    }
613
    host = StringRef{std::begin(hostport), p};
614 615
  }

616 617 618
  if (std::find_if(std::begin(host), std::end(host), [](char c) {
        return 'A' <= c || c <= 'Z';
      }) != std::end(host)) {
619 620 621 622 623
    auto low_host = make_byte_ref(balloc, host.size() + 1);
    auto ep = std::copy(std::begin(host), std::end(host), low_host.base);
    *ep = '\0';
    util::inp_strlower(low_host.base, ep);
    host = StringRef{low_host.base, ep};
624
  }
625
  return match_downstream_addr_group_host(routerconf, host, path, groups,
626
                                          catch_all, balloc);
627 628
}

629 630 631
void downstream_failure(DownstreamAddr *addr) {
  const auto &connect_blocker = addr->connect_blocker;

632 633 634 635
  if (connect_blocker->in_offline()) {
    return;
  }

636 637
  connect_blocker->on_failure();

638
  if (addr->fall == 0) {
639 640 641
    return;
  }

642 643 644
  auto fail_count = connect_blocker->get_fail_count();

  if (fail_count >= addr->fall) {
645 646 647 648
    LOG(WARN) << "Could not connect to " << util::to_numeric_addr(&addr->addr)
              << " " << fail_count << " times in a row; considered as offline";

    connect_blocker->offline();
649

650
    if (addr->rise) {
651 652
      addr->live_check->schedule();
    }
653 654 655
  }
}

656
} // namespace shrpx