Commit d65439dd authored by Adam Simpkins's avatar Adam Simpkins Committed by Facebook Github Bot

logging: add a LogHandler::flush() call

Summary:
Add a flush() call to the LogHandler interface.  This is needed to implement
`FB_LOG(FATAL)` so that we can flush all LogHandlers before aborting the
program.

Reviewed By: wez

Differential Revision: D5189499

fbshipit-source-id: 75fa4d7e75ea26de5b7383bf7e8d073fb37e9309
parent 84458670
...@@ -63,7 +63,7 @@ class AsyncFileWriter : public LogWriter { ...@@ -63,7 +63,7 @@ class AsyncFileWriter : public LogWriter {
* Block until the I/O thread has finished writing all messages that * Block until the I/O thread has finished writing all messages that
* were already enqueued when flush() was called. * were already enqueued when flush() was called.
*/ */
void flush(); void flush() override;
private: private:
/* /*
......
...@@ -47,4 +47,6 @@ void ImmediateFileWriter::writeMessage( ...@@ -47,4 +47,6 @@ void ImmediateFileWriter::writeMessage(
errnoStr(errnum)); errnoStr(errnum));
} }
} }
void ImmediateFileWriter::flush() {}
} }
...@@ -48,6 +48,7 @@ class ImmediateFileWriter : public LogWriter { ...@@ -48,6 +48,7 @@ class ImmediateFileWriter : public LogWriter {
using LogWriter::writeMessage; using LogWriter::writeMessage;
void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) override; void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) override;
void flush() override;
private: private:
ImmediateFileWriter(ImmediateFileWriter const&) = delete; ImmediateFileWriter(ImmediateFileWriter const&) = delete;
......
...@@ -65,5 +65,19 @@ class LogHandler { ...@@ -65,5 +65,19 @@ class LogHandler {
virtual void handleMessage( virtual void handleMessage(
const LogMessage& message, const LogMessage& message,
const LogCategory* handlerCategory) = 0; const LogCategory* handlerCategory) = 0;
/**
* Block until all messages that have already been sent to this LogHandler
* have been processed.
*
* For LogHandlers that perform asynchronous processing of log messages,
* this ensures that messages already sent to this handler have finished
* being processed.
*
* Other threads may still call handleMessage() while flush() is running.
* handleMessage() calls that did not complete before the flush() call
* started will not necessarily be processed by the flush call.
*/
virtual void flush() = 0;
}; };
} }
...@@ -62,5 +62,15 @@ class LogWriter { ...@@ -62,5 +62,15 @@ class LogWriter {
virtual void writeMessage(std::string&& buffer, uint32_t flags = 0) { virtual void writeMessage(std::string&& buffer, uint32_t flags = 0) {
writeMessage(folly::StringPiece{buffer}, flags); writeMessage(folly::StringPiece{buffer}, flags);
} }
/**
* Block until all messages that have already been sent to this LogWriter
* have been written.
*
* Other threads may still call writeMessage() while flush() is running.
* writeMessage() calls that did not complete before the flush() call started
* will not necessarily be processed by the flush call.
*/
virtual void flush() = 0;
}; };
} }
...@@ -36,4 +36,8 @@ void StandardLogHandler::handleMessage( ...@@ -36,4 +36,8 @@ void StandardLogHandler::handleMessage(
} }
writer_->writeMessage(formatter_->formatMessage(message, handlerCategory)); writer_->writeMessage(formatter_->formatMessage(message, handlerCategory));
} }
void StandardLogHandler::flush() {
writer_->flush();
}
} }
...@@ -67,6 +67,8 @@ class StandardLogHandler : public LogHandler { ...@@ -67,6 +67,8 @@ class StandardLogHandler : public LogHandler {
const LogMessage& message, const LogMessage& message,
const LogCategory* handlerCategory) override; const LogCategory* handlerCategory) override;
void flush() override;
private: private:
std::atomic<LogLevel> level_{LogLevel::NONE}; std::atomic<LogLevel> level_{LogLevel::NONE};
std::shared_ptr<LogFormatter> formatter_; std::shared_ptr<LogFormatter> formatter_;
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#include <thread>
#include <folly/Conv.h> #include <folly/Conv.h>
#include <folly/Exception.h> #include <folly/Exception.h>
#include <folly/File.h> #include <folly/File.h>
...@@ -21,6 +23,8 @@ ...@@ -21,6 +23,8 @@
#include <folly/experimental/TestUtil.h> #include <folly/experimental/TestUtil.h>
#include <folly/experimental/logging/AsyncFileWriter.h> #include <folly/experimental/logging/AsyncFileWriter.h>
#include <folly/experimental/logging/LoggerDB.h> #include <folly/experimental/logging/LoggerDB.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <folly/portability/GFlags.h> #include <folly/portability/GFlags.h>
#include <folly/portability/GMock.h> #include <folly/portability/GMock.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
...@@ -133,6 +137,84 @@ TEST(AsyncFileWriter, ioError) { ...@@ -133,6 +137,84 @@ TEST(AsyncFileWriter, ioError) {
EXPECT_GT(logErrors.size(), 0); EXPECT_GT(logErrors.size(), 0);
EXPECT_LE(logErrors.size(), numMessages); EXPECT_LE(logErrors.size(), numMessages);
} }
namespace {
size_t fillUpPipe(int fd) {
int flags = fcntl(fd, F_GETFL);
folly::checkUnixError(flags, "failed get file descriptor flags");
auto rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
folly::checkUnixError(rc, "failed to put pipe in non-blocking mode");
std::vector<char> data;
data.resize(4000);
size_t totalBytes = 0;
size_t bytesToWrite = data.size();
while (true) {
auto bytesWritten = writeNoInt(fd, data.data(), bytesToWrite);
if (bytesWritten < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// We blocked. Keep trying smaller writes, until we get down to a
// single byte, just to make sure the logging code really won't be able
// to write anything to the pipe.
if (bytesToWrite <= 1) {
break;
} else {
bytesToWrite /= 2;
}
} else {
throwSystemError("error writing to pipe");
}
} else {
totalBytes += bytesWritten;
}
}
fprintf(stderr, "pipe filled up after %zu bytes\n", totalBytes);
rc = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
folly::checkUnixError(rc, "failed to put pipe back in blocking mode");
return totalBytes;
}
}
TEST(AsyncFileWriter, flush) {
// Set up a pipe(), then write data to the write endpoint until it fills up
// and starts blocking.
std::array<int, 2> fds;
auto rc = pipe(fds.data());
folly::checkUnixError(rc, "failed to create pipe");
File readPipe{fds[0], true};
File writePipe{fds[1], true};
auto paddingSize = fillUpPipe(writePipe.fd());
// Now set up an AsyncFileWriter pointing at the write end of the pipe
AsyncFileWriter writer{std::move(writePipe)};
// Write a message
writer.writeMessage(std::string{"test message"});
// Call flush(). Use a separate thread, since this should block until we
// consume data from the pipe.
Promise<Unit> promise;
auto future = promise.getFuture();
auto flushFunction = [&] { writer.flush(); };
std::thread flushThread{
[&]() { promise.setTry(makeTryWith(flushFunction)); }};
// Sleep briefly, and make sure flush() still hasn't completed.
/* sleep override */
std::this_thread::sleep_for(10ms);
EXPECT_FALSE(future.isReady());
// Now read from the pipe
std::vector<char> buf;
buf.resize(paddingSize);
readFull(readPipe.fd(), buf.data(), buf.size());
// Make sure flush completes successfully now
future.get(10ms);
flushThread.join();
}
#endif #endif
/** /**
...@@ -279,7 +361,7 @@ void readThread(folly::File&& file, ReadStats* stats) { ...@@ -279,7 +361,7 @@ void readThread(folly::File&& file, ReadStats* stats) {
size_t bufferIdx = 0; size_t bufferIdx = 0;
while (true) { while (true) {
/* sleep override */ /* sleep override */
usleep(stats->getSleepUS().count()); std::this_thread::sleep_for(stats->getSleepUS());
auto readResult = folly::readNoInt( auto readResult = folly::readNoInt(
file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx); file.fd(), buffer.data() + bufferIdx, buffer.size() - bufferIdx);
......
...@@ -53,6 +53,7 @@ class TestLogWriter : public LogWriter { ...@@ -53,6 +53,7 @@ class TestLogWriter : public LogWriter {
override { override {
messages_.emplace_back(buffer.str()); messages_.emplace_back(buffer.str());
} }
void flush() override {}
std::vector<std::string>& getMessages() { std::vector<std::string>& getMessages() {
return messages_; return messages_;
......
...@@ -38,6 +38,8 @@ class TestLogHandler : public LogHandler { ...@@ -38,6 +38,8 @@ class TestLogHandler : public LogHandler {
messages_.emplace_back(message, handlerCategory); messages_.emplace_back(message, handlerCategory);
} }
void flush() override {}
private: private:
std::vector<std::pair<LogMessage, const LogCategory*>> messages_; std::vector<std::pair<LogMessage, const LogCategory*>> messages_;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment