Commit 30c26b8f authored by Tudor Bosman's avatar Tudor Bosman Committed by Sara Golemon

Add ability to silence callbacks for Subprocess::communicate

Summary:
Subprocess::communicate callbacks are level-triggered, which makes writing
"chatty" communication protocols difficult -- you often want to silence the
write callback until you read the expected message. Add
Subprocess::enableNotifications() for this purpose.

Test Plan: test added

Reviewed By: lucian@fb.com

FB internal diff: D1251564

@override-unit-failures
parent 2d08baf6
......@@ -700,7 +700,15 @@ void Subprocess::communicate(FdCallback readCallback,
pfd.fd = p.parentFd;
// Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
// child's point of view.
pfd.events = (p.direction == PIPE_IN ? POLLOUT : POLLIN);
if (!p.enabled) {
// Still keeping fd in watched set so we get notified of POLLHUP /
// POLLERR
pfd.events = 0;
} else if (p.direction == PIPE_IN) {
pfd.events = POLLOUT;
} else {
pfd.events = POLLIN;
}
fds.push_back(pfd);
}
......@@ -747,6 +755,14 @@ void Subprocess::communicate(FdCallback readCallback,
}
}
void Subprocess::enableNotifications(int childFd, bool enabled) {
pipes_[findByChildFd(childFd)].enabled = enabled;
}
bool Subprocess::notificationsEnabled(int childFd) const {
return pipes_[findByChildFd(childFd)].enabled;
}
int Subprocess::findByChildFd(int childFd) const {
auto pos = std::lower_bound(
pipes_.begin(), pipes_.end(), childFd,
......
......@@ -383,6 +383,18 @@ class Subprocess : private boost::noncopyable {
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
/**
* Enable notifications (callbacks) for one pipe to/from child. By default,
* all are enabled. Useful for "chatty" communication -- you want to disable
* write callbacks until you receive the expected message.
*/
void enableNotifications(int childFd, bool enabled);
/**
* Are notifications for one pipe to/from child enabled?
*/
bool notificationsEnabled(int childFd) const;
/**
* Return the child's pid, or -1 if the child wasn't successfully spawned
* or has already been wait()ed upon.
......@@ -504,9 +516,11 @@ class Subprocess : private boost::noncopyable {
// so we're happy with a vector here, even if it means linear erase.
// sorted by childFd
struct PipeInfo : private boost::totally_ordered<PipeInfo> {
int parentFd;
int childFd;
int direction; // one of PIPE_IN / PIPE_OUT
int parentFd = -1;
int childFd = -1;
int direction = PIPE_IN; // one of PIPE_IN / PIPE_OUT
bool enabled = true;
bool operator<(const PipeInfo& other) const {
return childFd < other.childFd;
}
......
......@@ -26,6 +26,7 @@
#include "folly/Exception.h"
#include "folly/Format.h"
#include "folly/FileUtil.h"
#include "folly/String.h"
#include "folly/gen/Base.h"
#include "folly/gen/File.h"
......@@ -292,3 +293,102 @@ TEST(CommunicateSubprocessTest, Duplex2) {
}
});
}
namespace {
bool readToString(int fd, std::string& buf, size_t maxSize) {
size_t bytesRead = 0;
buf.resize(maxSize);
char* dest = &buf.front();
size_t remaining = maxSize;
ssize_t n = -1;
while (remaining) {
n = ::read(fd, dest, remaining);
if (n == -1) {
if (errno == EINTR) {
continue;
}
if (errno == EAGAIN) {
break;
}
PCHECK("read failed");
} else if (n == 0) {
break;
}
dest += n;
remaining -= n;
}
buf.resize(dest - buf.data());
return (n == 0);
}
} // namespace
TEST(CommunicateSubprocessTest, Chatty) {
checkFdLeak([] {
const int lineCount = 1000;
int wcount = 0;
int rcount = 0;
auto options = Subprocess::pipeStdin().pipeStdout().pipeStderr().usePath();
std::vector<std::string> cmd {
"sed",
"-u",
"-e",
"s/a test/a successful test/",
};
Subprocess proc(cmd, options);
auto writeCallback = [&] (int pfd, int cfd) -> bool {
EXPECT_EQ(0, cfd); // child stdin
EXPECT_EQ(rcount, wcount); // chatty, one read for every write
auto msg = folly::to<std::string>("a test ", wcount, "\n");
// Not entirely kosher, we should handle partial writes, but this is
// fine for writes <= PIPE_BUF
EXPECT_EQ(msg.size(), writeFull(pfd, msg.data(), msg.size()));
++wcount;
proc.enableNotifications(0, false);
return (wcount == lineCount);
};
auto readCallback = [&] (int pfd, int cfd) -> bool {
EXPECT_EQ(1, cfd); // child stdout
EXPECT_EQ(wcount, rcount + 1);
auto expected =
folly::to<std::string>("a successful test ", rcount, "\n");
std::string lineBuf;
// Not entirely kosher, we should handle partial reads, but this is
// fine for reads <= PIPE_BUF
bool r = readToString(pfd, lineBuf, expected.size() + 1);
EXPECT_EQ((rcount == lineCount), r); // EOF iff at lineCount
EXPECT_EQ(expected, lineBuf);
++rcount;
if (rcount != lineCount) {
proc.enableNotifications(0, true);
}
return (rcount == lineCount);
};
proc.communicate(readCallback, writeCallback);
EXPECT_EQ(lineCount, wcount);
EXPECT_EQ(lineCount, rcount);
EXPECT_EQ(0, proc.wait().exitStatus());
});
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment