Commit 2714e5b6 authored by Aravind Anbudurai's avatar Aravind Anbudurai Committed by Facebook Github Bot 7

Support for EPOLLPRI

Summary:
Depends on D3718573 and the other diff to sync tp2 on fbcode after D3718573
lands.

This adds the PRI flag for using on EPOLLPRI events. The test makes a
server/client and client a MSG_OOB message to make it appear as a priority
event.

Masked under a preprocessor directive until libevent upstreams the patch that I
will send for 2.0

Reviewed By: djwatson

Differential Revision: D3722009

fbshipit-source-id: c15233d4739a38092d11c3026c483c7a9c8e5dac
parent 96e8ac5b
......@@ -44,7 +44,11 @@ class EventHandler : private boost::noncopyable {
READ = EV_READ,
WRITE = EV_WRITE,
READ_WRITE = (READ | WRITE),
PERSIST = EV_PERSIST
PERSIST = EV_PERSIST,
// Temporary flag until EPOLLPRI is upstream on libevent.
#ifdef EV_PRI
PRI = EV_PRI,
#endif
};
/**
......
......@@ -14,16 +14,18 @@
* limitations under the License.
*/
#include <folly/io/async/EventHandler.h>
#include <sys/eventfd.h>
#include <bitset>
#include <future>
#include <thread>
#include <folly/MPMCQueue.h>
#include <folly/ScopeGuard.h>
#include <folly/io/async/EventBase.h>
#include <gtest/gtest.h>
#include <folly/io/async/EventHandler.h>
#include <folly/portability/Sockets.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <sys/eventfd.h>
using namespace std;
using namespace folly;
......@@ -185,3 +187,82 @@ TEST_F(EventHandlerTest, many_concurrent_consumers) {
EXPECT_EQ(0, writesRemaining);
EXPECT_EQ(0, readsRemaining);
}
#ifdef EV_PRI
TEST(EventHandlerSocketTest, EPOLLPRI) {
std::promise<void> serverReady;
std::thread t([serverReadyFuture = serverReady.get_future()] {
// client
serverReadyFuture.wait();
LOG(INFO) << "Server is ready";
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
SCOPE_EXIT {
close(sockfd);
};
struct hostent* he;
struct sockaddr_in server;
const char hostname[] = "localhost";
he = gethostbyname(hostname);
PCHECK(he);
memcpy(&server.sin_addr, he->h_addr_list[0], he->h_length);
server.sin_family = AF_INET;
server.sin_port = htons(12345);
PCHECK(::connect(sockfd, (struct sockaddr*)&server, sizeof(server)) == 0);
LOG(INFO) << "Server connection available";
char buffer[] = "banana";
int n = send(sockfd, buffer, strlen(buffer) + 1, MSG_OOB);
PCHECK(n > 0);
});
SCOPE_EXIT {
t.join();
};
// make the server.
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
SCOPE_EXIT {
close(sockfd);
};
PCHECK(sockfd != -1) << "unable to open socket";
struct sockaddr_in sin;
sin.sin_port = htons(12345);
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_family = AF_INET;
PCHECK(::bind(sockfd, (struct sockaddr*)&sin, sizeof(sin)) >= 0)
<< "Can't bind to port";
listen(sockfd, 5);
serverReady.set_value();
socklen_t clilen;
struct sockaddr_in cli_addr;
clilen = sizeof(cli_addr);
int newsockfd = accept(sockfd, (struct sockaddr*)&cli_addr, &clilen);
PCHECK(newsockfd >= 0) << "can't accept";
SCOPE_EXIT {
close(newsockfd);
};
EventBase eb;
struct SockEvent : public EventHandler {
SockEvent(EventBase* eb, int fd) : EventHandler(eb, fd), fd_(fd) {}
void handlerReady(uint16_t events) noexcept override {
EXPECT_TRUE(EventHandler::EventFlags::PRI & events);
char buffer[256];
int n = read(fd_, buffer, 255);
EXPECT_EQ(6, n);
EXPECT_EQ("banana", std::string(buffer));
}
private:
int fd_;
} sockHandler(&eb, newsockfd);
sockHandler.registerHandler(EventHandler::EventFlags::PRI);
LOG(INFO) << "Registered Handler";
eb.loop();
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment