Introduction
I heard about epoll not so long ago and wanted to give it a try. Epoll is the new way to use non-blocking sockets (or any file descriptors). It replaces poll() and select().
epoll is easy enough to use. The good thing about it is that, unline select(), you don't need to rebuild the FDSET each time you call epoll_wait. Here is a typical usage:
- efd = epoll_create1()
- epoll_ctl(efd, EPOLL_CTL_ADD, listeningSocket)
- loop:
- n = epoll_wait()
- for i=0 to n:
- if (events[i].data.fd == listeningSocket)
- newsock = accept()
- epoll_ctl(efd, EPOLL_CTL_ADD, newsock)
- if (events[i].data.fd == listeningSocket)
- close(efd)
Another nice thing about epoll is that you don't need to worry about removing a socket from the list. You can call epoll_ctl(efd, EPOLL_CTL_DEL, sock) if you want but when the socket closes, it will be removed automatically.
One thread
Using epoll, I can do all my sending and receiving in one thread. So people may suggest to send data from other thread but what if you get EAGAIN? Assume that thread A is the epoll thread. Thread B attempts to send 100 bytes but could only send 50. Then Thread C attempts to write 100 bytes on the same socket. By the time that Thread C was called, the socket was ready. The remote socket will have received corrupted data. For that reason, Thread B and C, will add data in a queue so that each messages are guaranteed to be sent completely. The queue will be emptied in Thread A.
Since epoll might potentially handle thousands of connections, Thread A must do minimal work. It must only do this:
- epoll_wait
- if a socket is ready-ready, read all from that socket and dispatch to consumers. consumers should work fast, otherwise we should "post" the data to the consumer in another thread.
- send all from the sendqueue
Edge-triggered VS Level-Triggered
epoll offers two ways of working: Edged-triggered and Level-triggered. The default way is Level-triggered. so you might want to change this to level-triggered in your application.
In Level-triggered mode, epoll will return as long a socket is read or write ready. So if data is ready to be received on the socket and you only read part of it, the next epoll call will return because there is still data available. As soon as the internal receive buffer is empty, then epoll won't return. But since the socket will be write-ready almost all of the time, it will return. This is not something you would typically want. My guess is that if you want to use level-triggered mode, you should not register to get EPOLLOUT events unless you have something to send. So while your application's TX buffer is empty, you don't register for EPOLLOUT. As soon as data is added to the TX buffer, you register for EPOLLOUT, epoll will return and you write data out on the socket. If EAGAIN was returned then you will block on the next epoll_wait() and can send the rest of the buffer when it unblocks. Once the application's TX buffer is empty, you could unregister for EPOLLOUT.
With Edge-triggered mode, things are different. You will only receive an event if the status has changed from not-ready to ready. So if you have incomming data and you only read part of it, when you will call epoll_wait, the socket will still be ready. So epoll_wait will block because the state will not change from not-ready to ready. So if you are using that mode, you must make sure to read ALL the data on the socket until you get EAGAIN. If you want to send something, epoll_wait will only give you a write-ready event if you previously got EAGAIN while attempting to write out.
Let's say you have an internal transmit queue in which you add data when you want to send it out. The epoll thread would need to ready from that queue, write the data on the socket and handle EAGAIN appropriately.
- epoll_wait()
- if a socket is ready-ready, readAllFromThatSocket(socket);
- sendAllData() // sends all that is contained in the internal queue.
- goto step 1
But if epoll_wait() is blocked in triggered mode, and you add data in the queue (from another thread), sendAllData() will not be called until epoll_wait returns because data is ready to be received (it won't return because data is ready to write, because you need to write first and get EAGAIN for that.). To solve this problem, I created an eventfd (see sys/eventfd.h). I add the eventfd in the epoll_wait list and whenever I add data in the application's TX queue, I write 1 on the eventfd so that epoll_wait will break out. I could have used a timout in epoll_wait, but that would add unnecessary latency to amy reactor. That way of doing things is similar to what is called the "pipe to self trick".
My framework
To play with epoll, I wrote a small reactor framework that exposes an interface that's easy to use. all the gory details are hidden inside the framework. To use it you would need to define two classes:
class OwnProtocolFramingStrategy: public IFramingStrategy
{
protected:
virtual int processData(char* buffer, size_t size, FullMessageInfo& info)
{
info.buffer will first be zero. You need to allocate memory for that buffer
and copy the data from "buffer" to "info.buffer". If you determine that the buffer
does not contain a full message, return the quantity of bytes you read from the buffer.
In this case, it should be "return size;" And the next time that some data is received,
this function will be called with you buffer. You can update other fields in "info"
to help you resume next time.
If you determine that a full message was received, set "info.complete = true" and return
the number of bytes that you read from the buffer. After returning from this function, your
buffer created in "info.buffer" will be considered as containing a full message and will be
passed to the TcpEngine. Next time this function will be called, info.buffer will be back to
zero.
If you determine that a full message was received but there are still data left in the buffer,
it means that you probably have 2 messages in the buffer. As mentioned above, you will
return the number of bytes that you read from the buffer. This function will be called
again with "buffer" pointing to the index after the last byte you have read. So you
only need to try to parse one message only when this function is called.
If you find that data is not well-formed, does not respect you protocol or has any other errors
that prevents you from reliably build a full message, then return -1 and the client's connection
will be aborted.
}
};
A IFramingStrategy is a class that will process the tcp stream into full messages. As you know, when reading data from a socket, you may receive more than one message in a single read and you could also only receive half of a message. So the IFramingStrategy is what parses the stream and builds messages. For example, if implementing an HTTP Server, the strategy would build the message by expecting the data to be chunked so the logic to calculate the size would be done here.
class OwnProtocolServerEngine: public TcpEngine
{
public:
OwnProtocolServerEngine():TcpEngine(new ClientFactory(this))
{
}
protected:
virtual void onClientConnected(TcpClient* client)
{
Maybe add the client in some list so we can use it later?
}
virtual void onClientDisconnected(TcpClient* client)
{
if client was added in a list, we should remove it because
its instanced will be deleted after returning from this method.
}
virtual void onClientMessage(TcpClient* client, char* buffer, size_t size)
{
The buffer you get here, is the one that was created in the strategy. You
own this buffer in this method, so you must free it from here
(or remember to free at a later time)
In this method, you are guaranteed to get 1 and only 1 full message assuming
that the Strategy was coded correctly.
Note that everything is happening in 1 thread. So in the onClientConnected()
method, you might have saved a TcpClient pointer in a list. It is perfectly
safe to use it in here (i.e: for forwarding the message to someone else).
}
};
Then to run launch a server:
OwnProtocolServerEngine engine;
engine.init(5555,"127.0.0.1",10);
if (!engine.start())
{
printf("Could not start server\r\n");
return -1;
}
getchar();
engine.stop();
The source code for the framework is here
Thread-safe queue
I said earlier that everything was happening in 1 single thread so everything *should* be safe. But what if I want to send data to a client from another thread? That should be possible. The transmit queue inside the TcpClient is thread safe. So calling TcpClient::sendData() is a safe call to do in another thread. Accessing the client is another story though. The client instance could get deleted at any time, but there are ways around that but it's beyond the scope of this framework.
Since the TX queue is a Multi-Producer, Single-Consumer FIFO, creating a thread-safe implementation is very easy. Of course, I didn't want to use any of the STL containers.
This is using __sync_bool_compare_and_swap, which I strongly suspect uses the x86 instruction CMPXCHG. I described that instruction here I didn't fully test that code though . I addapted it from a version I had written in ASM, so there might be bugs in the C++ version.