通过eventfd实现的事件通知机制

Comment From: xiaozhitaba

include

include

include

include

include

include

include

include

static int s_efd = 0;

int createEventfd() { int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

std::cout << "createEventfd() fd : " << evtfd << std::endl;

if (evtfd < 0) { std::cout << "Failed in eventfd\n"; abort(); }

return evtfd; }

void testThread() { int timeout = 0; while(timeout < 3) { sleep(1); timeout++; }

uint64_t one = 1; ssize_t n = write(s_efd, &one, sizeof one); if(n != sizeof one) { std::cout << " writes " << n << " bytes instead of 8\n"; } }

int main() { s_efd = createEventfd();

fd_set rdset; FD_ZERO(&rdset); FD_SET(s_efd, &rdset);

struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0;

std::thread t(testThread);

while(1) { if(select(s_efd + 1, &rdset, NULL, NULL, &timeout) == 0) { std::cout << "timeout\n"; timeout.tv_sec = 1; timeout.tv_usec = 0; FD_SET(s_efd, &rdset); continue; }

uint64_t one = 0;

ssize_t n = read(s_efd, &one, sizeof one);
if(n != sizeof one)
{
  std::cout << " read " << n << " bytes instead of 8\n";
}

std::cout << " wakeup !\n";

break;

}

t.join(); close(s_efd);

return 0; } ./test.out createEventfd() fd : 3 timeout timeout timeout wakeup !

Comment From: savagecm

I think the event fd should not work, but maybe you forget the cost of using eventfd....

Comment From: xiaozhitaba

i know the cost of using eventfd . it make system call,work through kernel space users space。

but IOThreadMain takes a lot of CPU / Wait for start / for (int j = 0; j < 1000000; j++) { if (io_threads_pending[id] != 0) break; } / Give the main thread a chance to stop this thread. / if (io_threads_pending[id] == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; }

in keydb use pipe notifyother threads。it just fine use in little control msg

fastlock_init(&eventLoop->flock, "event loop");
int rgfd[2];
if (pipe(rgfd) < 0)
    goto err;
eventLoop->fdCmdRead = rgfd[0];
eventLoop->fdCmdWrite = rgfd[1];
fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK);
fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK);
eventLoop->cevents = 0;
aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL);

int aePostFunction(aeEventLoop eventLoop, aePostFunctionProc proc, void *arg) { if (eventLoop == g_eventLoopThisThread) { proc(arg); return AE_OK; } aeCommand cmd = {}; cmd.op = AE_ASYNC_OP::PostFunction; cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) return AE_ERR; return AE_OK; }

Comment From: xiaozhitaba

I think the event fd should not work, but maybe you forget the cost of using eventfd....

the quotes above

Comment From: savagecm

yes I also have the same question. I think we can open another issue to address it with approprate title

Comment From: antirez

@savagecm @xiaozhitaba initially it used to be like that, I used a signal condition or something like that, but the cost of going via the syscall was high. Now we have the tight loop, however actually now there is no longer this problem because recently the main thread is also used in order to serve data, so it will stop waiting for a very little time. Also when the traffic is not so high, we return single threaded. If we still have this problem after profiling, what we can do is to just always give a few more clients to the main thread, so that it is very likely to finish as the last one. CC @soloestoy

Comment From: xiaozhitaba

i have some idea. https://github.com/xiaozhitaba/disruptor/tree/master/src/main/java/com/lmax/disruptor Disruptor support these WaitStrategy               BlockingWaitStrategy               BusySpinWaitStrategy               LiteBlockingWaitStrategy               SleepingWaitStrategy               TimeoutBlockingWaitStrategy               YieldingWaitStrategy               PhasedBackoffWaitStrategy how about SleepingWaitStrategy YieldingWaitStrategy use CAS

Comment From: xiaozhitaba

CC @antirez

Comment From: savagecm

if the bottleneck is memory copy from kernel space to user space, do we ever think about using zero-copy tech? such as using the following socket option? https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html

Maybe this is only efficient for large TCP package... But maybe worth to have a try @antirez @xiaozhitaba

Comment From: xiaozhitaba

recently I study zeromq work model:http://www.aosabook.org/en/zeromq.html maybe can bring enlightening

Comment From: xiaozhitaba

in zeromq multi threads use eventfd to notify 。use Lock-free queue transfer msg。 signaler.wait (timeout_) use poll indeed

// Lock-free queue implementation. // Only a single thread can read from the pipe at any specific moment. // Only a single thread can write to the pipe at any specific moment. // T is the type of the object in the queue. // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation.

template class ypipe_t : public ypipe_base_t

int zmq::make_fdpair (fd_t r_, fd_t w_) {

if defined ZMQ_HAVE_EVENTFD

int flags = 0;

if defined ZMQ_HAVE_EVENTFD_CLOEXEC

//  Setting this option result in sane behaviour when exec() functions
//  are used. Old sockets are closed and don't block TCP ports, avoid
//  leaks, etc.
flags |= EFD_CLOEXEC;

endif

fd_t fd = eventfd (0, flags);
if (fd == -1) {
    errno_assert (errno == ENFILE || errno == EMFILE);
    *w_ = *r_ = -1;
    return -1;
} else {
    *w_ = *r_ = fd;
    return 0;
}

int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) { // Try to get the command straight away. if (active) {//开始的时候,信箱是未激活状态 bool ok = cpipe.read (cmd_); if (ok) return 0;

    //  If there are no more commands available, switch into passive state.
    //  没有命令可读时,先把信箱设置为未激活状态,表示没命令可读,然后把对方发过来的激活信箱的信号处理一下(没什么特殊的处理,就是接受一下)
    active = false;
    signaler.recv ();
}

//  Wait for signal from the command sender.
int rc = signaler.wait (timeout_);//signaler.wait的返回值有三种①wait函数出错,返回-1,并且设置errno=EINTR②返回-1并且errno=EAGAIN,表示信号没等到③等到信号。
if (rc != 0 && (errno == EAGAIN || errno == EINTR))//这里对应wait的前两种情况
    return -1;

//  We've got the signal. Now we can switch into active state.
active = true;//等到激活信箱的信号了,激活信箱

//  Get a command.
errno_assert (rc == 0);
bool ok = cpipe.read (cmd_);
zmq_assert (ok);
return 0;

}

Comment From: antirez

Here the steps needed would be to implement a "proof of concept", check if there is a big performances regression, and evaluate. No other way around...