Commit 0bf69167 authored by Mohammad Husain's avatar Mohammad Husain Committed by facebook-github-bot-1

APIs to determine which end of the socket has closed it

Reviewed By: @afrind

Differential Revision: D2466921
parent f04e4550
...@@ -1347,11 +1347,13 @@ void AsyncSocket::handleRead() noexcept { ...@@ -1347,11 +1347,13 @@ void AsyncSocket::handleRead() noexcept {
// No more data to read right now. // No more data to read right now.
return; return;
} else if (bytesRead == READ_ERROR) { } else if (bytesRead == READ_ERROR) {
readErr_ = READ_ERROR;
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("recv() failed"), errno); withAddr("recv() failed"), errno);
return failRead(__func__, ex); return failRead(__func__, ex);
} else { } else {
assert(bytesRead == READ_EOF); assert(bytesRead == READ_EOF);
readErr_ = READ_EOF;
// EOF // EOF
shutdownFlags_ |= SHUT_READ; shutdownFlags_ |= SHUT_READ;
if (!updateEventRegistration(0, EventHandler::READ)) { if (!updateEventRegistration(0, EventHandler::READ)) {
......
...@@ -369,6 +369,16 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -369,6 +369,16 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
return (state_ == StateEnum::CONNECTING); return (state_ == StateEnum::CONNECTING);
} }
virtual bool isClosedByPeer() const {
return (state_ == StateEnum::CLOSED &&
(readErr_ == READ_EOF || readErr_ == READ_ERROR));
}
virtual bool isClosedBySelf() const {
return (state_ == StateEnum::CLOSED &&
(readErr_ != READ_EOF && readErr_ != READ_ERROR));
}
size_t getAppBytesWritten() const override { size_t getAppBytesWritten() const override {
return appBytesWritten_; return appBytesWritten_;
} }
...@@ -546,6 +556,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -546,6 +556,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
READ_EOF = 0, READ_EOF = 0,
READ_ERROR = -1, READ_ERROR = -1,
READ_BLOCKING = -2, READ_BLOCKING = -2,
READ_NO_ERROR = -3,
}; };
/** /**
...@@ -770,6 +781,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -770,6 +781,8 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
bool isBufferMovable_{false}; bool isBufferMovable_{false};
bool peek_{false}; // Peek bytes. bool peek_{false}; // Peek bytes.
int8_t readErr_{READ_NO_ERROR}; ///< The read error encountered, if any.
}; };
......
...@@ -226,6 +226,7 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase { ...@@ -226,6 +226,7 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
virtual bool isPending() const { virtual bool isPending() const {
return readable(); return readable();
} }
/** /**
* Determine if transport is connected to the endpoint * Determine if transport is connected to the endpoint
* *
......
...@@ -183,6 +183,9 @@ TEST(AsyncSocketTest, ConnectAndWrite) { ...@@ -183,6 +183,9 @@ TEST(AsyncSocketTest, ConnectAndWrite) {
// Make sure the server got a connection and received the data // Make sure the server got a connection and received the data
socket->close(); socket->close();
server.verifyConnection(buf, sizeof(buf)); server.verifyConnection(buf, sizeof(buf));
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -210,6 +213,9 @@ TEST(AsyncSocketTest, ConnectNullCallback) { ...@@ -210,6 +213,9 @@ TEST(AsyncSocketTest, ConnectNullCallback) {
// Make sure the server got a connection and received the data // Make sure the server got a connection and received the data
socket->close(); socket->close();
server.verifyConnection(buf, sizeof(buf)); server.verifyConnection(buf, sizeof(buf));
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -245,6 +251,9 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) { ...@@ -245,6 +251,9 @@ TEST(AsyncSocketTest, ConnectWriteAndClose) {
// Make sure the server got a connection and received the data // Make sure the server got a connection and received the data
server.verifyConnection(buf, sizeof(buf)); server.verifyConnection(buf, sizeof(buf));
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -274,6 +283,9 @@ TEST(AsyncSocketTest, ConnectAndClose) { ...@@ -274,6 +283,9 @@ TEST(AsyncSocketTest, ConnectAndClose) {
// Make sure the connection was aborted // Make sure the connection was aborted
CHECK_EQ(ccb.state, STATE_FAILED); CHECK_EQ(ccb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -305,6 +317,9 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) { ...@@ -305,6 +317,9 @@ TEST(AsyncSocketTest, ConnectAndCloseNow) {
// Make sure the connection was aborted // Make sure the connection was aborted
CHECK_EQ(ccb.state, STATE_FAILED); CHECK_EQ(ccb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -344,6 +359,9 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) { ...@@ -344,6 +359,9 @@ TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
CHECK_EQ(ccb.state, STATE_FAILED); CHECK_EQ(ccb.state, STATE_FAILED);
CHECK_EQ(wcb.state, STATE_FAILED); CHECK_EQ(wcb.state, STATE_FAILED);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -378,6 +396,9 @@ TEST(AsyncSocketTest, ConnectAndRead) { ...@@ -378,6 +396,9 @@ TEST(AsyncSocketTest, ConnectAndRead) {
CHECK_EQ(rcb.buffers.size(), 1); CHECK_EQ(rcb.buffers.size(), 1);
CHECK_EQ(rcb.buffers[0].length, sizeof(buf)); CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0); CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -413,6 +434,9 @@ TEST(AsyncSocketTest, ConnectReadAndClose) { ...@@ -413,6 +434,9 @@ TEST(AsyncSocketTest, ConnectReadAndClose) {
CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
CHECK_EQ(rcb.buffers.size(), 0); CHECK_EQ(rcb.buffers.size(), 0);
CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -471,6 +495,9 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) { ...@@ -471,6 +495,9 @@ TEST(AsyncSocketTest, ConnectWriteAndRead) {
CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0); CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf)); uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
CHECK_EQ(bytesRead, 0); CHECK_EQ(bytesRead, 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_TRUE(socket->isClosedByPeer());
} }
/** /**
...@@ -556,6 +583,9 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) { ...@@ -556,6 +583,9 @@ TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf)); CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
CHECK_EQ(memcmp(rcb.buffers[0].buffer, CHECK_EQ(memcmp(rcb.buffers[0].buffer,
acceptedWbuf, sizeof(acceptedWbuf)), 0); acceptedWbuf, sizeof(acceptedWbuf)), 0);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -641,6 +671,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) { ...@@ -641,6 +671,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
// Fully close both sockets // Fully close both sockets
acceptedSocket->close(); acceptedSocket->close();
socket->close(); socket->close();
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_TRUE(socket->isClosedByPeer());
} }
/** /**
...@@ -729,6 +762,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { ...@@ -729,6 +762,9 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
// Fully close both sockets // Fully close both sockets
acceptedSocket->close(); acceptedSocket->close();
socket->close(); socket->close();
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_TRUE(socket->isClosedByPeer());
} }
// Helper function for use in testConnectOptWrite() // Helper function for use in testConnectOptWrite()
...@@ -902,6 +938,9 @@ TEST(AsyncSocketTest, WriteNullCallback) { ...@@ -902,6 +938,9 @@ TEST(AsyncSocketTest, WriteNullCallback) {
// Make sure the server got a connection and received the data // Make sure the server got a connection and received the data
socket->close(); socket->close();
server.verifyConnection(buf, sizeof(buf)); server.verifyConnection(buf, sizeof(buf));
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -988,6 +1027,9 @@ TEST(AsyncSocketTest, WritePipeError) { ...@@ -988,6 +1027,9 @@ TEST(AsyncSocketTest, WritePipeError) {
CHECK_EQ(wcb.state, STATE_FAILED); CHECK_EQ(wcb.state, STATE_FAILED);
CHECK_EQ(wcb.exception.getType(), CHECK_EQ(wcb.exception.getType(),
AsyncSocketException::INTERNAL_ERROR); AsyncSocketException::INTERNAL_ERROR);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -1062,6 +1104,9 @@ TEST(AsyncSocketTest, WriteIOBuf) { ...@@ -1062,6 +1104,9 @@ TEST(AsyncSocketTest, WriteIOBuf) {
acceptedSocket->close(); acceptedSocket->close();
socket->close(); socket->close();
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
TEST(AsyncSocketTest, WriteIOBufCorked) { TEST(AsyncSocketTest, WriteIOBufCorked) {
...@@ -1120,6 +1165,9 @@ TEST(AsyncSocketTest, WriteIOBufCorked) { ...@@ -1120,6 +1165,9 @@ TEST(AsyncSocketTest, WriteIOBufCorked) {
acceptedSocket->close(); acceptedSocket->close();
socket->close(); socket->close();
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/** /**
...@@ -1161,6 +1209,9 @@ TEST(AsyncSocketTest, ZeroLengthWrite) { ...@@ -1161,6 +1209,9 @@ TEST(AsyncSocketTest, ZeroLengthWrite) {
CHECK_EQ(wcb3.state, STATE_SUCCEEDED); CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
CHECK_EQ(wcb4.state, STATE_SUCCEEDED); CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2); rcb.verifyData(buf.get(), len1 + len2);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
TEST(AsyncSocketTest, ZeroLengthWritev) { TEST(AsyncSocketTest, ZeroLengthWritev) {
...@@ -1200,6 +1251,9 @@ TEST(AsyncSocketTest, ZeroLengthWritev) { ...@@ -1200,6 +1251,9 @@ TEST(AsyncSocketTest, ZeroLengthWritev) {
CHECK_EQ(wcb.state, STATE_SUCCEEDED); CHECK_EQ(wcb.state, STATE_SUCCEEDED);
rcb.verifyData(buf.get(), len1 + len2); rcb.verifyData(buf.get(), len1 + len2);
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
...@@ -1258,6 +1312,9 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) { ...@@ -1258,6 +1312,9 @@ TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
++it) { ++it) {
CHECK_EQ((*it)->state, STATE_FAILED); CHECK_EQ((*it)->state, STATE_FAILED);
} }
ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
} }
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
...@@ -1317,6 +1374,9 @@ TEST(AsyncSocket, ConnectReadImmediateRead) { ...@@ -1317,6 +1374,9 @@ TEST(AsyncSocket, ConnectReadImmediateRead) {
CHECK_EQ(wcb1.state, STATE_SUCCEEDED); CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
rcb.verifyData(expectedData, expectedDataSz); rcb.verifyData(expectedData, expectedDataSz);
CHECK_EQ(socket.immediateReadCalled, true); CHECK_EQ(socket.immediateReadCalled, true);
ASSERT_FALSE(socket.isClosedBySelf());
ASSERT_FALSE(socket.isClosedByPeer());
} }
TEST(AsyncSocket, ConnectReadUninstallRead) { TEST(AsyncSocket, ConnectReadUninstallRead) {
...@@ -1368,6 +1428,9 @@ TEST(AsyncSocket, ConnectReadUninstallRead) { ...@@ -1368,6 +1428,9 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
* was reset in dataAvailableCallback */ * was reset in dataAvailableCallback */
CHECK_EQ(rcb.dataRead(), maxBufferSz); CHECK_EQ(rcb.dataRead(), maxBufferSz);
CHECK_EQ(socket.immediateReadCalled, false); CHECK_EQ(socket.immediateReadCalled, false);
ASSERT_FALSE(socket.isClosedBySelf());
ASSERT_FALSE(socket.isClosedByPeer());
} }
// TODO: // TODO:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment