Commit 677bed09 authored by Tudor Bosman's avatar Tudor Bosman Committed by Jordan DeLong

Subprocess library, modeled after python's subprocess module

Summary:
Surprised we don't have one.  The API is modeled after Python's
subprocess module, http://docs.python.org/2/library/subprocess.html

Inspired by
https://www.facebook.com/groups/fbcode/permalink/445399858830192/, plus
I needed this functionality now.

Test Plan: test added

Reviewed By: chip@fb.com

FB internal diff: D614056
parent f919a052
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/Subprocess.h"
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <wait.h>
#include <array>
#include <algorithm>
#include <system_error>
#include <boost/container/flat_set.hpp>
#include <boost/range/adaptors.hpp>
#include <glog/logging.h>
#include "folly/Conv.h"
#include "folly/ScopeGuard.h"
#include "folly/String.h"
#include "folly/experimental/io/Cursor.h"
extern char** environ;
namespace folly {
ProcessReturnCode::State ProcessReturnCode::state() const {
if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
if (rawStatus_ == RV_RUNNING) return RUNNING;
if (WIFEXITED(rawStatus_)) return EXITED;
if (WIFSIGNALED(rawStatus_)) return KILLED;
throw std::runtime_error(to<std::string>(
"Invalid ProcessReturnCode: ", rawStatus_));
}
void ProcessReturnCode::enforce(State s) const {
if (state() != s) {
throw std::logic_error(to<std::string>("Invalid state ", s));
}
}
int ProcessReturnCode::exitStatus() const {
enforce(EXITED);
return WEXITSTATUS(rawStatus_);
}
int ProcessReturnCode::killSignal() const {
enforce(KILLED);
return WTERMSIG(rawStatus_);
}
bool ProcessReturnCode::coreDumped() const {
enforce(KILLED);
return WCOREDUMP(rawStatus_);
}
std::string ProcessReturnCode::str() const {
switch (state()) {
case NOT_STARTED:
return "not started";
case RUNNING:
return "running";
case EXITED:
return to<std::string>("exited with status ", exitStatus());
case KILLED:
return to<std::string>("killed by signal ", killSignal(),
(coreDumped() ? " (core dumped)" : ""));
}
CHECK(false); // unreached
}
CalledProcessError::CalledProcessError(ProcessReturnCode rc)
: returnCode_(rc),
what_(returnCode_.str()) {
}
namespace {
// Copy pointers to the given strings in a format suitable for posix_spawn
std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
for (int i = 0; i < s.size(); i++) {
d[i] = s[i].c_str();
}
d[s.size()] = nullptr;
return d;
}
// Helper to throw std::system_error
void throwSystemError(int err, const char* msg) __attribute__((noreturn));
void throwSystemError(int err, const char* msg) {
throw std::system_error(err, std::system_category(), msg);
}
// Helper to throw std::system_error from errno
void throwSystemError(const char* msg) __attribute__((noreturn));
void throwSystemError(const char* msg) {
throwSystemError(errno, msg);
}
// Check a Posix return code (0 on success, error number on error), throw
// on error.
void checkPosixError(int err, const char* msg) {
if (err != 0) {
throwSystemError(err, msg);
}
}
// Check a traditional Uinx return code (-1 and sets errno on error), throw
// on error.
void checkUnixError(ssize_t ret, const char* msg) {
if (ret == -1) {
throwSystemError(msg);
}
}
// Check a wait() status, throw on non-successful
void checkStatus(ProcessReturnCode returnCode) {
if (returnCode.state() != ProcessReturnCode::EXITED ||
returnCode.exitStatus() != 0) {
throw CalledProcessError(returnCode);
}
}
} // namespace
Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
if (action == Subprocess::PIPE) {
if (fd == 0) {
action = Subprocess::PIPE_IN;
} else if (fd == 1 || fd == 2) {
action = Subprocess::PIPE_OUT;
} else {
throw std::invalid_argument(
to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
}
}
fdActions_[fd] = action;
return *this;
}
Subprocess::Subprocess(
const std::vector<std::string>& argv,
const Options& options,
const char* executable,
const std::vector<std::string>* env)
: pid_(-1),
returnCode_(RV_NOT_STARTED) {
if (argv.empty()) {
throw std::invalid_argument("argv must not be empty");
}
if (!executable) executable = argv[0].c_str();
spawn(cloneStrings(argv), executable, options, env);
}
Subprocess::Subprocess(
const std::string& cmd,
const Options& options,
const std::vector<std::string>* env)
: pid_(-1),
returnCode_(RV_NOT_STARTED) {
if (options.usePath_) {
throw std::invalid_argument("usePath() not allowed when running in shell");
}
const char* shell = getenv("SHELL");
if (!shell) {
shell = "/bin/sh";
}
std::unique_ptr<const char*[]> argv(new const char*[4]);
argv[0] = shell;
argv[1] = "-c";
argv[2] = cmd.c_str();
argv[3] = nullptr;
spawn(std::move(argv), shell, options, env);
}
Subprocess::~Subprocess() {
if (returnCode_.state() == ProcessReturnCode::RUNNING) {
LOG(ERROR) << "Subprocess destroyed without reaping; killing child.";
try {
kill();
wait();
} catch (...) {
LOG(FATAL) << "Killing child failed, terminating: "
<< exceptionStr(std::current_exception());
}
}
try {
closeAll();
} catch (...) {
LOG(FATAL) << "close failed, terminating: "
<< exceptionStr(std::current_exception());
}
}
namespace {
void closeChecked(int fd) {
checkUnixError(::close(fd), "close");
}
} // namespace
void Subprocess::closeAll() {
for (auto& p : pipes_) {
closeChecked(p.parentFd);
}
pipes_.clear();
}
void Subprocess::setAllNonBlocking() {
for (auto& p : pipes_) {
int fd = p.parentFd;
int flags = ::fcntl(fd, F_GETFL);
checkUnixError(flags, "fcntl");
int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
checkUnixError(r, "fcntl");
}
}
void Subprocess::spawn(
std::unique_ptr<const char*[]> argv,
const char* executable,
const Options& optionsIn,
const std::vector<std::string>* env) {
if (optionsIn.usePath_ && env) {
throw std::invalid_argument(
"usePath() not allowed when overriding environment");
}
// Make a copy, we'll mutate options
Options options(optionsIn);
// Parent work, pre-fork: create pipes
std::vector<int> childFds;
for (auto& p : options.fdActions_) {
if (p.second == PIPE_IN || p.second == PIPE_OUT) {
int fds[2];
int r = ::pipe(fds);
checkUnixError(r, "pipe");
PipeInfo pinfo;
pinfo.direction = p.second;
int cfd;
if (p.second == PIPE_IN) {
// Child gets reading end
pinfo.parentFd = fds[1];
cfd = fds[0];
} else {
pinfo.parentFd = fds[0];
cfd = fds[1];
}
p.second = cfd; // ensure it gets dup2()ed
pinfo.childFd = p.first;
childFds.push_back(cfd);
pipes_.push_back(pinfo);
}
}
// This should already be sorted, as options.fdActions_ is
DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
// Note that the const casts below are legit, per
// http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
char** argVec = const_cast<char**>(argv.get());
// Set up environment
std::unique_ptr<const char*[]> envHolder;
char** envVec;
if (env) {
envHolder = cloneStrings(*env);
envVec = const_cast<char**>(envHolder.get());
} else {
envVec = environ;
}
pid_t pid = vfork();
if (pid == 0) {
runChild(executable, argVec, envVec, options);
// This should never return, but there's nothing else we can do here.
abort();
}
// In parent
checkUnixError(pid, "vfork");
pid_ = pid;
returnCode_ = ProcessReturnCode(RV_RUNNING);
// Parent work, post-fork: close child's ends of pipes
for (int f : childFds) {
closeChecked(f);
}
}
namespace {
// Checked version of close() to use in the child: abort() on error
void childClose(int fd) {
int r = ::close(fd);
if (r == -1) abort();
}
// Checked version of dup2() to use in the child: abort() on error
void childDup2(int oldfd, int newfd) {
int r = ::dup2(oldfd, newfd);
if (r == -1) abort();
}
} // namespace
void Subprocess::runChild(const char* executable,
char** argv, char** env,
const Options& options) const {
// Close parent's ends of all pipes
for (auto& p : pipes_) {
childClose(p.parentFd);
}
// Close all fds that we're supposed to close.
// Note that we're ignoring errors here, in case some of these
// fds were set to close on exec.
for (auto& p : options.fdActions_) {
if (p.second == CLOSE) {
::close(p.first);
} else {
childDup2(p.second, p.first);
}
}
// If requested, close all other file descriptors. Don't close
// any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
// Ignore errors.
if (options.closeOtherFds_) {
for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
if (options.fdActions_.count(fd) == 0) {
::close(fd);
}
}
}
// Now, finally, exec.
int r;
if (options.usePath_) {
::execvp(executable, argv);
} else {
::execve(executable, argv, env);
}
// If we're here, something's wrong.
abort();
}
ProcessReturnCode Subprocess::poll() {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0);
int status;
pid_t found = ::waitpid(pid_, &status, WNOHANG);
checkUnixError(found, "waitpid");
if (found != 0) {
returnCode_ = ProcessReturnCode(status);
pid_ = -1;
}
return returnCode_;
}
bool Subprocess::pollChecked() {
if (poll().state() == ProcessReturnCode::RUNNING) {
return false;
}
checkStatus(returnCode_);
return true;
}
ProcessReturnCode Subprocess::wait() {
returnCode_.enforce(ProcessReturnCode::RUNNING);
DCHECK_GT(pid_, 0);
int status;
pid_t found = ::waitpid(pid_, &status, 0);
checkUnixError(found, "waitpid");
returnCode_ = ProcessReturnCode(status);
return returnCode_;
}
void Subprocess::waitChecked() {
wait();
checkStatus(returnCode_);
}
void Subprocess::sendSignal(int signal) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
int r = ::kill(pid_, signal);
checkUnixError(r, "kill");
}
namespace {
void setNonBlocking(int fd) {
int flags = ::fcntl(fd, F_GETFL);
checkUnixError(flags, "fcntl");
int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
checkUnixError(r, "fcntl");
}
std::pair<const uint8_t*, size_t> queueFront(const IOBufQueue& queue) {
auto* p = queue.front();
if (!p) return std::make_pair(nullptr, 0);
return io::Cursor(p).peek();
}
// fd write
bool handleWrite(int fd, IOBufQueue& queue) {
for (;;) {
auto p = queueFront(queue);
if (p.second == 0) {
return true; // EOF
}
ssize_t n;
do {
n = ::write(fd, p.first, p.second);
} while (n == -1 && errno == EINTR);
if (n == -1 && errno == EAGAIN) {
return false;
}
checkUnixError(n, "write");
queue.trimStart(n);
}
}
// fd read
bool handleRead(int fd, IOBufQueue& queue) {
for (;;) {
auto p = queue.preallocate(100, 65000);
ssize_t n;
do {
n = ::read(fd, p.first, p.second);
} while (n == -1 && errno == EINTR);
if (n == -1 && errno == EAGAIN) {
return false;
}
checkUnixError(n, "read");
if (n == 0) {
return true;
}
queue.postallocate(n);
}
}
bool discardRead(int fd) {
static const size_t bufSize = 65000;
// Thread unsafe, but it doesn't matter.
static std::unique_ptr<char[]> buf(new char[bufSize]);
for (;;) {
ssize_t n;
do {
n = ::read(fd, buf.get(), bufSize);
} while (n == -1 && errno == EINTR);
if (n == -1 && errno == EAGAIN) {
return false;
}
checkUnixError(n, "read");
if (n == 0) {
return true;
}
}
}
} // namespace
std::pair<std::string, std::string> Subprocess::communicate(
int flags,
StringPiece data) {
IOBufQueue dataQueue;
dataQueue.wrapBuffer(data.data(), data.size());
auto outQueues = communicateIOBuf(flags, std::move(dataQueue));
auto outBufs = std::make_pair(outQueues.first.move(),
outQueues.second.move());
std::pair<std::string, std::string> out;
if (outBufs.first) {
outBufs.first->coalesce();
out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
outBufs.first->length());
}
if (outBufs.second) {
outBufs.second->coalesce();
out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
outBufs.second->length());
}
return out;
}
std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
int flags,
IOBufQueue data) {
std::pair<IOBufQueue, IOBufQueue> out;
auto readCallback = [&, flags] (int pfd, int cfd) {
if (cfd == 1 && (flags & READ_STDOUT)) {
return handleRead(pfd, out.first);
} else if (cfd == 2 && (flags & READ_STDERR)) {
return handleRead(pfd, out.second);
} else {
// Don't close the file descriptor, the child might not like SIGPIPE,
// just read and throw the data away.
return discardRead(pfd);
}
};
auto writeCallback = [&, flags] (int pfd, int cfd) {
if (cfd == 0 && (flags & WRITE_STDIN)) {
return handleWrite(pfd, data);
} else {
// If we don't want to write to this fd, just close it.
return false;
}
};
communicate(std::move(readCallback), std::move(writeCallback));
return out;
}
void Subprocess::communicate(FdCallback readCallback,
FdCallback writeCallback) {
returnCode_.enforce(ProcessReturnCode::RUNNING);
setAllNonBlocking();
std::vector<pollfd> fds;
fds.reserve(pipes_.size());
std::vector<int> toClose;
toClose.reserve(pipes_.size());
while (!pipes_.empty()) {
fds.clear();
toClose.clear();
for (auto& p : pipes_) {
pollfd pfd;
pfd.fd = p.parentFd;
// Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
// child's point of view.
pfd.events = (p.direction == PIPE_IN ? POLLOUT : POLLIN);
fds.push_back(pfd);
}
int r;
do {
r = ::poll(fds.data(), fds.size(), -1);
} while (r == -1 && errno == EINTR);
checkUnixError(r, "poll");
for (int i = 0; i < pipes_.size(); ++i) {
auto& p = pipes_[i];
DCHECK_EQ(fds[i].fd, p.parentFd);
short events = fds[i].revents;
bool closed = false;
if (events & POLLOUT) {
DCHECK(!(events & POLLIN));
if (writeCallback(p.parentFd, p.childFd)) {
toClose.push_back(i);
closed = true;
}
}
if (events & POLLIN) {
DCHECK(!(events & POLLOUT));
if (readCallback(p.parentFd, p.childFd)) {
toClose.push_back(i);
closed = true;
}
}
if ((events & (POLLHUP | POLLERR)) && !closed) {
toClose.push_back(i);
closed = true;
}
}
// Close the fds in reverse order so the indexes hold after erase()
for (int idx : boost::adaptors::reverse(toClose)) {
auto pos = pipes_.begin() + idx;
closeChecked(pos->parentFd);
pipes_.erase(pos);
}
}
}
int Subprocess::findByChildFd(int childFd) const {
auto pos = std::lower_bound(
pipes_.begin(), pipes_.end(), childFd,
[] (const PipeInfo& info, int fd) { return info.childFd < fd; });
if (pos == pipes_.end() || pos->childFd != childFd) {
throw std::invalid_argument(folly::to<std::string>(
"child fd not found ", childFd));
}
return pos - pipes_.begin();
}
void Subprocess::closeParentFd(int childFd) {
int idx = findByChildFd(childFd);
closeChecked(pipes_[idx].parentFd);
pipes_.erase(pipes_.begin() + idx);
}
namespace {
class Initializer {
public:
Initializer() {
// We like EPIPE, thanks.
::signal(SIGPIPE, SIG_IGN);
}
};
Initializer initializer;
} // namespace
} // namespace folly
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Subprocess library, modeled after Python's subprocess module
* (http://docs.python.org/2/library/subprocess.html)
*
* This library defines one class (Subprocess) which represents a child
* process. Subprocess has two constructors: one that takes a vector<string>
* and executes the given executable without using the shell, and one
* that takes a string and executes the given command using the shell.
* Subprocess allows you to redirect the child's standard input, standard
* output, and standard error to/from child descriptors in the parent,
* or to create communication pipes between the child and the parent.
*
* The simplest example is a thread-safe version of the system() library
* function:
* Subprocess(cmd).wait();
* which executes the command using the default shell and waits for it
* to complete, returning the exit status.
*
* A thread-safe version of popen() (type="r", to read from the child):
* Subprocess proc(cmd, Subprocess::Options().stdout(Subprocess::PIPE));
* // read from proc.stdout()
* proc.wait();
*
* A thread-safe version of popen() (type="w", to write from the child):
* Subprocess proc(cmd, Subprocess::Options().stdin(Subprocess::PIPE));
* // write to proc.stdin()
* proc.wait();
*
* If you want to redirect both stdin and stdout to pipes, you can, but
* note that you're subject to a variety of deadlocks. You'll want to use
* nonblocking I/O; look at the implementation of communicate() for an example.
*
* communicate() is a way to communicate to a child via its standard input,
* standard output, and standard error. It buffers everything in memory,
* so it's not great for large amounts of data (or long-running processes),
* but it insulates you from the deadlocks mentioned above.
*/
#ifndef FOLLY_SUBPROCESS_H_
#define FOLLY_SUBPROCESS_H_
#include <sys/types.h>
#include <signal.h>
#include <wait.h>
#include <exception>
#include <vector>
#include <string>
#include <boost/container/flat_map.hpp>
#include <boost/operators.hpp>
#include <boost/noncopyable.hpp>
#include "folly/experimental/io/IOBufQueue.h"
#include "folly/MapUtil.h"
#include "folly/Portability.h"
#include "folly/Range.h"
namespace folly {
/**
* Class to wrap a process return code.
*/
class Subprocess;
class ProcessReturnCode {
friend class Subprocess;
public:
enum State {
NOT_STARTED,
RUNNING,
EXITED,
KILLED
};
/**
* Process state. One of:
* NOT_STARTED: process hasn't been started successfully
* RUNNING: process is currently running
* EXITED: process exited (successfully or not)
* KILLED: process was killed by a signal.
*/
State state() const;
/**
* Helper wrappers around state().
*/
bool notStarted() const { return state() == NOT_STARTED; }
bool running() const { return state() == RUNNING; }
bool exited() const { return state() == EXITED; }
bool killed() const { return state() == KILLED; }
/**
* Exit status. Only valid if state() == EXITED; throws otherwise.
*/
int exitStatus() const;
/**
* Signal that caused the process's termination. Only valid if
* state() == KILLED; throws otherwise.
*/
int killSignal() const;
/**
* Was a core file generated? Only valid if state() == KILLED; throws
* otherwise.
*/
bool coreDumped() const;
/**
* String representation; one of
* "not started"
* "running"
* "exited with status <status>"
* "killed by signal <signal>"
* "killed by signal <signal> (core dumped)"
*/
std::string str() const;
/**
* Helper function to enforce a precondition based on this.
* Throws std::logic_error if in an unexpected state.
*/
void enforce(State state) const;
private:
explicit ProcessReturnCode(int rv) : rawStatus_(rv) { }
static constexpr int RV_NOT_STARTED = -2;
static constexpr int RV_RUNNING = -1;
int rawStatus_;
};
/**
* Exception thrown by *Checked methods of Subprocess.
*/
class CalledProcessError : public std::exception {
public:
explicit CalledProcessError(ProcessReturnCode rc);
~CalledProcessError() throw() { }
const char* what() const throw() FOLLY_OVERRIDE { return what_.c_str(); }
ProcessReturnCode returnCode() const { return returnCode_; }
private:
ProcessReturnCode returnCode_;
std::string what_;
};
/**
* Subprocess.
*/
class Subprocess : private boost::noncopyable {
public:
static const int CLOSE = -1;
static const int PIPE = -2;
static const int PIPE_IN = -3;
static const int PIPE_OUT = -4;
/**
* Class representing various options: file descriptor behavior, and
* whether to use $PATH for searching for the executable,
*
* By default, we don't use $PATH, file descriptors are closed if
* the close-on-exec flag is set (fcntl FD_CLOEXEC) and inherited
* otherwise.
*/
class Options {
friend class Subprocess;
public:
Options() : closeOtherFds_(false), usePath_(false) { }
/**
* Change action for file descriptor fd.
*
* "action" may be another file descriptor number (dup2()ed before the
* child execs), or one of CLOSE, PIPE_IN, and PIPE_OUT.
*
* CLOSE: close the file descriptor in the child
* PIPE_IN: open a pipe *from* the child
* PIPE_OUT: open a pipe *to* the child
*
* PIPE is a shortcut; same as PIPE_IN for stdin (fd 0), same as
* PIPE_OUT for stdout (fd 1) or stderr (fd 2), and an error for
* other file descriptors.
*/
Options& fd(int fd, int action);
/**
* Shortcut to change the action for standard input.
*/
Options& stdin(int action) { return fd(0, action); }
/**
* Shortcut to change the action for standard output.
*/
Options& stdout(int action) { return fd(1, action); }
/**
* Shortcut to change the action for standard error.
* Note that stderr(1) will redirect the standard error to the same
* file descriptor as standard output; the equivalent of bash's "2>&1"
*/
Options& stderr(int action) { return fd(2, action); }
/**
* Close all other fds (other than standard input, output, error,
* and file descriptors explicitly specified with fd()).
*
* This is potentially slow; it's generally a better idea to
* set the close-on-exec flag on all file descriptors that shouldn't
* be inherited by the child.
*
* Even with this option set, standard input, output, and error are
* not closed; use stdin(CLOSE), stdout(CLOSE), stderr(CLOSE) if you
* desire this.
*/
Options& closeOtherFds() { closeOtherFds_ = true; return *this; }
/**
* Use the search path ($PATH) when searching for the executable.
*/
Options& usePath() { usePath_ = true; return *this; }
private:
typedef boost::container::flat_map<int, int> FdMap;
FdMap fdActions_;
bool closeOtherFds_;
bool usePath_;
};
/**
* Create a subprocess from the given arguments. argv[0] must be listed.
* If not-null, executable must be the actual executable
* being used (otherwise it's the same as argv[0]).
*
* If env is not-null, it must contain name=value strings to be used
* as the child's environment; otherwise, we inherit the environment
* from the parent. env must be null if options.usePath is set.
*/
explicit Subprocess(
const std::vector<std::string>& argv,
const Options& options = Options(),
const char* executable = nullptr,
const std::vector<std::string>* env = nullptr);
~Subprocess();
/**
* Create a subprocess run as a shell command (as shell -c 'command')
*
* The shell to use is taken from the environment variable $SHELL,
* or /bin/sh if $SHELL is unset.
*/
explicit Subprocess(
const std::string& cmd,
const Options& options = Options(),
const std::vector<std::string>* env = nullptr);
/**
* Append all data, close the stdin (to-child) fd, and read all data,
* except that this is done in a safe manner to prevent deadlocking.
*
* If WRITE_STDIN is given in flags, the process must have been opened with
* stdinFd=PIPE.
*
* If READ_STDOUT is given in flags, the first returned value will be the
* value read from the child's stdout; the child must have been opened with
* stdoutFd=PIPE.
*
* If READ_STDERR is given in flags, the second returned value will be the
* value read from the child's stderr; the child must have been opened with
* stderrFd=PIPE.
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
*
* communicateIOBuf uses IOBufQueue for buffering (which has the advantage
* that it won't try to allocate all data at once). communicate
* uses strings for simplicity.
*/
enum {
WRITE_STDIN = 1 << 0,
READ_STDOUT = 1 << 1,
READ_STDERR = 1 << 2,
};
std::pair<IOBufQueue, IOBufQueue> communicateIOBuf(
int flags = READ_STDOUT,
IOBufQueue data = IOBufQueue());
std::pair<std::string, std::string> communicate(
int flags = READ_STDOUT,
StringPiece data = StringPiece());
/**
* Communicate with the child until all pipes to/from the child are closed.
*
* readCallback(pfd, cfd) will be called whenever there's data available
* on any pipe *from* the child (PIPE_OUT). pfd is the file descriptor
* in the parent (that you use to read from); cfd is the file descriptor
* in the child (used for identifying the stream; 1 = child's standard
* output, 2 = child's standard error, etc)
*
* writeCallback(pfd, cfd) will be called whenever a pipe *to* the child is
* writable (PIPE_IN). pfd is the file descriptor in the parent (that you
* use to write to); cfd is the file descriptor in the child (used for
* identifying the stream; 0 = child's standard input, etc)
*
* The read and write callbacks must read from / write to pfd and return
* false during normal operation or true at end-of-file;
* communicate() will then close the pipe. Note that pfd is
* nonblocking, so be prepared for read() / write() to return -1 and
* set errno to EAGAIN (in which case you should return false).
*
* NOTE that you MUST consume all data passed to readCallback (or return
* false, which will close the pipe, possibly sending SIGPIPE to the child or
* making its writes fail with EPIPE), and you MUST write to a writable pipe
* (or return false, which will close the pipe). To do otherwise is an
* error. You must do this even for pipes you are not interested in.
*
* Note that communicate() returns when all pipes to/from the child are
* closed; the child might stay alive after that, so you must still wait().
*
* Most users won't need to use this; the simpler version of communicate
* (which buffers data in memory) will probably work fine.
*/
typedef std::function<bool(int, int)> FdCallback;
void communicate(FdCallback readCallback, FdCallback writeCallback);
/**
* Return the child's pid, or -1 if the child wasn't successfully spawned
* or has already been wait()ed upon.
*/
pid_t pid() const;
static const int RV_RUNNING = ProcessReturnCode::RV_RUNNING;
static const int RV_NOT_STARTED = ProcessReturnCode::RV_NOT_STARTED;
/**
* Return the child's status (as per wait()) if the process has already
* been waited on, -1 if the process is still running, or -2 if the process
* hasn't been successfully started. NOTE that this does not poll, but
* returns the status stored in the Subprocess object.
*/
ProcessReturnCode returnCode() const { return returnCode_; }
/**
* Poll the child's status and return it, return -1 if the process
* is still running. NOTE that it is illegal to call poll again after
* poll indicated that the process has terminated, or to call poll on a
* process that hasn't been successfully started (the constructor threw an
* exception).
*/
ProcessReturnCode poll();
/**
* Poll the child's status. If the process is still running, return false.
* Otherwise, return true if the process exited with status 0 (success),
* or throw CalledProcessError if the process exited with a non-zero status.
*/
bool pollChecked();
/**
* Wait for the process to terminate and return its status.
* Similarly to poll, it is illegal to call poll after the process
* has already been reaped or if the process has not successfully started.
*/
ProcessReturnCode wait();
/**
* Wait for the process to terminate, throw if unsuccessful.
*/
void waitChecked();
/**
* Set all pipes from / to child non-blocking. communicate() does
* this for you.
*/
void setAllNonBlocking();
/**
* Get parent file descriptor corresponding to the given file descriptor
* in the child. Throws if childFd isn't a pipe (PIPE_IN / PIPE_OUT).
* Do not close() the return file descriptor; use closeParentFd, below.
*/
int parentFd(int childFd) const {
return pipes_[findByChildFd(childFd)].parentFd;
}
int stdin() const { return parentFd(0); }
int stdout() const { return parentFd(1); }
int stderr() const { return parentFd(2); }
/**
* Close the parent file descriptor given a file descriptor in the child.
*/
void closeParentFd(int childFd);
/**
* Send a signal to the child. Shortcuts for the commonly used Unix
* signals are below.
*/
void sendSignal(int signal);
void terminate() { sendSignal(SIGTERM); }
void kill() { sendSignal(SIGKILL); }
private:
void spawn(
std::unique_ptr<const char*[]> argv,
const char* executable,
const Options& options,
const std::vector<std::string>* env);
// Action to run in child.
// Note that this runs after vfork(), so tread lightly.
void runChild(const char* executable, char** argv, char** env,
const Options& options) const;
/**
* Close all file descriptors.
*/
void closeAll();
// return index in pipes_
int findByChildFd(int childFd) const;
pid_t pid_;
ProcessReturnCode returnCode_;
// The number of pipes between parent and child is assumed to be small,
// so we're happy with a vector here, even if it means linear erase.
// sorted by childFd
struct PipeInfo : private boost::totally_ordered<PipeInfo> {
int parentFd;
int childFd;
int direction; // one of PIPE_IN / PIPE_OUT
bool operator<(const PipeInfo& other) const {
return childFd < other.childFd;
}
bool operator==(const PipeInfo& other) const {
return childFd == other.childFd;
}
};
std::vector<PipeInfo> pipes_;
};
} // namespace folly
#endif /* FOLLY_SUBPROCESS_H_ */
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/Subprocess.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "folly/Format.h"
#include "folly/experimental/io/Stream.h"
using namespace folly;
TEST(SimpleSubprocessTest, ExitsSuccessfully) {
Subprocess proc(std::vector<std::string>{ "/bin/true" });
EXPECT_EQ(0, proc.wait().exitStatus());
}
TEST(SimpleSubprocessTest, ExitsSuccessfullyChecked) {
Subprocess proc(std::vector<std::string>{ "/bin/true" });
proc.waitChecked();
}
TEST(SimpleSubprocessTest, ExitsWithError) {
Subprocess proc(std::vector<std::string>{ "/bin/false" });
EXPECT_EQ(1, proc.wait().exitStatus());
}
TEST(SimpleSubprocessTest, ExitsWithErrorChecked) {
Subprocess proc(std::vector<std::string>{ "/bin/false" });
EXPECT_THROW(proc.waitChecked(), CalledProcessError);
}
TEST(SimpleSubprocessTest, ShellExitsSuccesssfully) {
Subprocess proc("true");
EXPECT_EQ(0, proc.wait().exitStatus());
}
TEST(SimpleSubprocessTest, ShellExitsWithError) {
Subprocess proc("false");
EXPECT_EQ(1, proc.wait().exitStatus());
}
TEST(PopenSubprocessTest, PopenRead) {
Subprocess proc("ls /", Subprocess::Options().stdout(Subprocess::PIPE));
int found = 0;
for (auto bline : byLine(proc.stdout())) {
StringPiece line(bline);
if (line == "etc" || line == "bin" || line == "usr") {
++found;
}
}
EXPECT_EQ(3, found);
proc.waitChecked();
}
TEST(CommunicateSubprocessTest, SimpleRead) {
Subprocess proc(std::vector<std::string>{ "/bin/echo", "-n", "foo", "bar"},
Subprocess::Options().stdout(Subprocess::PIPE));
auto p = proc.communicate();
EXPECT_EQ("foo bar", p.first);
proc.waitChecked();
}
TEST(CommunicateSubprocessTest, BigWrite) {
const int numLines = 1 << 20;
std::string line("hello\n");
std::string data;
data.reserve(numLines * line.size());
for (int i = 0; i < numLines; ++i) {
data.append(line);
}
Subprocess::Options options;
options.stdin(Subprocess::PIPE).stdout(Subprocess::PIPE);
Subprocess proc("wc -l", options);
auto p = proc.communicate(Subprocess::WRITE_STDIN | Subprocess::READ_STDOUT,
data);
EXPECT_EQ(folly::format("{}\n", numLines).str(), p.first);
proc.waitChecked();
}
TEST(CommunicateSubprocessTest, Duplex) {
// Take 10MB of data and pass them through a filter.
// One line, as tr is line-buffered
const int bytes = 10 << 20;
std::string line(bytes, 'x');
Subprocess::Options options;
options.stdin(Subprocess::PIPE).stdout(Subprocess::PIPE);
Subprocess proc("tr a-z A-Z", options);
auto p = proc.communicate(Subprocess::WRITE_STDIN | Subprocess::READ_STDOUT,
line);
EXPECT_EQ(bytes, p.first.size());
EXPECT_EQ(std::string::npos, p.first.find_first_not_of('X'));
proc.waitChecked();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment