Commit 1f4f3030 authored by Alexey Spiridonov's avatar Alexey Spiridonov Committed by Chip Turner

A generic line-reading callback for communicate()

Summary: There are a couple of places where this behavior is useful, and it's not 100% trivial to implement it from scratch. Adding it to Folly to save people code & bugs.

Test Plan: unit tests

Reviewed By: tudorb@fb.com

Subscribers: tjackson, folly@lists, tudorb

FB internal diff: D1297506
parent dbcb08a7
......@@ -39,7 +39,6 @@
#include <folly/Conv.h>
#include <folly/Exception.h>
#include <folly/FileUtil.h>
#include <folly/ScopeGuard.h>
#include <folly/String.h>
#include <folly/io/Cursor.h>
......
......@@ -70,6 +70,8 @@
#include <boost/operators.hpp>
#include <boost/noncopyable.hpp>
#include <folly/FileUtil.h>
#include <folly/gen/String.h>
#include <folly/io/IOBufQueue.h>
#include <folly/MapUtil.h>
#include <folly/Portability.h>
......@@ -372,26 +374,133 @@ class Subprocess : private boost::noncopyable {
* identifying the stream; 0 = child's standard input, etc)
*
* The read and write callbacks must read from / write to pfd and return
* false during normal operation or true at end-of-file;
* communicate() will then close the pipe. Note that pfd is
* nonblocking, so be prepared for read() / write() to return -1 and
* set errno to EAGAIN (in which case you should return false).
* false during normal operation. Return true to tell communicate() to
* close the pipe. For readCallback, this might send SIGPIPE to the
* child, or make its writes fail with EPIPE, so you should generally
* avoid returning true unless you've reached end-of-file.
*
* NOTE that you MUST consume all data passed to readCallback (or return
* true, which will close the pipe, possibly sending SIGPIPE to the child or
* making its writes fail with EPIPE), and you MUST write to a writable pipe
* (or return true, which will close the pipe). To do otherwise is an
* error. You must do this even for pipes you are not interested in.
* true to close the pipe). Similarly, you MUST write to a writable pipe
* (or return true to close the pipe). To do otherwise is an error that
* can result in a deadlock. You must do this even for pipes you are not
* interested in.
*
* Note that pfd is nonblocking, so be prepared for read() / write() to
* return -1 and set errno to EAGAIN (in which case you should return
* false). Use readNoInt() from FileUtil.h to handle interrupted reads
* for you
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
*
* Most users won't need to use this; the simpler version of communicate
* (which buffers data in memory) will probably work fine.
*
* See ReadLinesCallback for an easy way to consume the child's output
* streams line-by-line (or tokenized by another delimiter).
*/
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
/**
* A readCallback for Subprocess::communicate() that helps you consume
* lines (or other delimited pieces) from your subprocess's file
* descriptors. Use the readLinesCallback() helper to get template
* deduction. For example:
*
* auto read_cb = Subprocess::readLinesCallback(
* [](int fd, folly::StringPiece s) {
* std::cout << fd << " said: " << s;
* return false; // Keep reading from the child
* }
* );
* subprocess.communicate(
* // ReadLinesCallback contains StreamSplitter contains IOBuf, making
* // it noncopyable, whereas std::function must be copyable. So, we
* // keep the callback in a local, and instead pass a reference.
* std::ref(read_cb),
* [](int pdf, int cfd){ return true; } // Don't write to the child
* );
*
* If a file line exceeds maxLineLength, your callback will get some
* initial chunks of maxLineLength with no trailing delimiters. The final
* chunk of a line is delimiter-terminated iff the delimiter was present
* in the input. In particular, the last line in a file always lacks a
* delimiter -- so if a file ends on a delimiter, the final line is empty.
*
* Like a regular communicate() callback, your fdLineCb() normally returns
* false. It may return true to tell Subprocess to close the underlying
* file descriptor. The child process may then receive SIGPIPE or get
* EPIPE errors on writes.
*/
template <class Callback>
class ReadLinesCallback {
private:
// Binds an FD to the client-provided FD+line callback
struct StreamSplitterCallback {
StreamSplitterCallback(Callback& cb, int fd) : cb_(cb), fd_(fd) { }
// The return value semantics are inverted vs StreamSplitter
bool operator()(StringPiece s) { return !cb_(fd_, s); }
Callback& cb_;
int fd_;
};
typedef gen::StreamSplitter<StreamSplitterCallback> LineSplitter;
public:
explicit ReadLinesCallback(
Callback&& fdLineCb,
uint64_t maxLineLength = 0, // No line length limit by default
char delimiter = '\n',
uint64_t bufSize = 1024
) : fdLineCb_(std::move(fdLineCb)),
maxLineLength_(maxLineLength),
delimiter_(delimiter),
bufSize_(bufSize) {}
bool operator()(int pfd, int cfd) {
// Make a splitter for this cfd if it doesn't already exist
auto it = fdToSplitter_.find(cfd);
auto& splitter = (it != fdToSplitter_.end()) ? it->second
: fdToSplitter_.emplace(cfd, LineSplitter(
delimiter_, StreamSplitterCallback(fdLineCb_, cfd), maxLineLength_
)).first->second;
// Read as much as we can from this FD
char buf[bufSize_];
while (true) {
ssize_t ret = readNoInt(pfd, buf, bufSize_);
if (ret == -1 && errno == EAGAIN) { // No more data for now
return false;
}
if (ret == 0) { // Reached end-of-file
splitter.flush(); // Ignore return since the file is over anyway
return true;
}
if (!splitter(StringPiece(buf, ret))) {
return true; // The callback told us to stop
}
}
}
private:
Callback fdLineCb_;
const uint64_t maxLineLength_;
const char delimiter_;
const uint64_t bufSize_;
// We lazily make splitters for all cfds that get used.
std::unordered_map<int, LineSplitter> fdToSplitter_;
};
// Helper to enable template deduction
template <class Callback>
static ReadLinesCallback<Callback> readLinesCallback(
Callback&& fdLineCb,
uint64_t maxLineLength = 0, // No line length limit by default
char delimiter = '\n',
uint64_t bufSize = 1024) {
return ReadLinesCallback<Callback>(
std::move(fdLineCb), maxLineLength, delimiter, bufSize
);
}
/**
* Enable notifications (callbacks) for one pipe to/from child. By default,
* all are enabled. Useful for "chatty" communication -- you want to disable
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment