epoll: Add a sample program
diff --git a/include/uapi/linux/eventpoll.h b/include/uapi/linux/eventpoll.h
index a77ae13..3152ae7 100644
--- a/include/uapi/linux/eventpoll.h
+++ b/include/uapi/linux/eventpoll.h
@@ -16,7 +16,9 @@
#define _UAPI_LINUX_EVENTPOLL_H
/* For O_CLOEXEC */
+#ifndef O_CLOEXEC
#include <linux/fcntl.h>
+#endif
#include <linux/types.h>
#include <linux/watch_queue.h>
diff --git a/samples/watch_queue/Makefile b/samples/watch_queue/Makefile
index 6ee61e3..1df08ab 100644
--- a/samples/watch_queue/Makefile
+++ b/samples/watch_queue/Makefile
@@ -1,8 +1,11 @@
# List of programs to build
-hostprogs-y := watch_test
+hostprogs-y := watch_test watch_epoll
# Tell kbuild to always build the programs
always := $(hostprogs-y)
HOSTCFLAGS_watch_test.o += -I$(objtree)/usr/include
HOSTLDLIBS_watch_test += -lkeyutils
+
+HOSTCFLAGS_watch_epoll.o += -I$(objtree)/usr/include
+HOSTLDLIBS_watch_epoll += -lpthread -lnuma
diff --git a/samples/watch_queue/watch_epoll.c b/samples/watch_queue/watch_epoll.c
new file mode 100644
index 0000000..dc24a6c
--- /dev/null
+++ b/samples/watch_queue/watch_epoll.c
@@ -0,0 +1,446 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (C) 2018 Roman Penyaev
+ * Copyright (C) 2019 David Howells <dhowells@redhat.com>
+ *
+ * Purpose of the tool is to generate N events from different threads and to
+ * measure how fast those events will be delivered to thread which does epoll.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <time.h>
+#include <assert.h>
+#include <sys/mman.h>
+#include <sys/ioctl.h>
+#include <sys/eventfd.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <err.h>
+#include <numa.h>
+#include <poll.h>
+#include <linux/unistd.h>
+#include <linux/watch_queue.h>
+#define epoll_event xxx_epoll_event
+#include <linux/eventpoll.h>
+#undef epoll_event
+#undef EPOLL_CLOEXEC
+#undef EPOLLIN
+#undef EPOLLPRI
+#undef EPOLLOUT
+#undef EPOLLERR
+#undef EPOLLHUP
+#undef EPOLLNVAL
+#undef EPOLLRDNORM
+#undef EPOLLRDBAND
+#undef EPOLLWRNORM
+#undef EPOLLWRBAND
+#undef EPOLLMSG
+#undef EPOLLRDHUP
+#undef EPOLLEXCLUSIVE
+#undef EPOLLWAKEUP
+#undef EPOLLONESHOT
+#undef EPOLLET
+
+#include <sys/epoll.h>
+
+#define BUILD_BUG_ON(condition) ((void )sizeof(char [1 - 2*!!(condition)]))
+#define READ_ONCE(v) __atomic_load(&v, __ATOMIC_RELAXED)
+
+#define ITERS 1000000ull
+
+#ifndef __NR_epoll_create2
+#define __NR_epoll_create2 -1
+#endif
+
+static inline long epoll_create2(int flags, size_t size, int watch_fd)
+{
+ return syscall(__NR_epoll_create2, flags, size, watch_fd);
+}
+
+struct thread_ctx {
+ pthread_t thread;
+ int efd;
+};
+
+struct cpu_map {
+ unsigned int nr;
+ unsigned int map[];
+};
+
+static volatile unsigned int thr_ready;
+static volatile unsigned int start;
+
+static int is_cpu_online(int cpu)
+{
+ char buf[64];
+ char online;
+ FILE *f;
+ int rc;
+
+ snprintf(buf, sizeof(buf), "/sys/devices/system/cpu/cpu%d/online", cpu);
+ f = fopen(buf, "r");
+ if (!f)
+ return 1;
+
+ rc = fread(&online, 1, 1, f);
+ assert(rc == 1);
+ fclose(f);
+
+ return (char)online == '1';
+}
+
+static struct cpu_map *cpu_map__new(void)
+{
+ struct cpu_map *cpu;
+ struct bitmask *bm;
+
+ int i, bit, cpus_nr;
+
+ cpus_nr = numa_num_possible_cpus();
+ cpu = calloc(1, sizeof(*cpu) + sizeof(cpu->map[0]) * cpus_nr);
+ if (!cpu)
+ return NULL;
+
+ bm = numa_all_cpus_ptr;
+ assert(bm);
+
+ for (bit = 0, i = 0; bit < bm->size; bit++) {
+ if (numa_bitmask_isbitset(bm, bit) && is_cpu_online(bit)) {
+ cpu->map[i++] = bit;
+ }
+ }
+ cpu->nr = i;
+
+ return cpu;
+}
+
+static void cpu_map__put(struct cpu_map *cpu)
+{
+ free(cpu);
+}
+
+static inline unsigned long long nsecs(void)
+{
+ struct timespec ts = {0, 0};
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ return ((unsigned long long)ts.tv_sec * 1000000000ull) + ts.tv_nsec;
+}
+
+static void *thread_work(void *arg)
+{
+ struct thread_ctx *ctx = arg;
+ __u64 ucnt = 1;
+ unsigned int i;
+ int rc;
+
+ __atomic_add_fetch(&thr_ready, 1, __ATOMIC_RELAXED);
+
+ while (!start)
+ ;
+
+ for (i = 0; i < ITERS; i++) {
+ rc = write(ctx->efd, &ucnt, sizeof(ucnt));
+ assert(rc == sizeof(ucnt));
+ }
+
+ return NULL;
+}
+
+/*
+ * Process an event.
+ */
+static __attribute__((noinline))
+void read_event(struct epoll_uheader *header, unsigned int idx,
+ struct epoll_event *event)
+{
+ struct epoll_uitem *item = &header->items[idx];
+
+ assert(idx <= header->max_items_nr); /* Corrupted index? */
+
+ /*
+ * Fetch data first, if event is cleared by the kernel we drop the data
+ * returning false.
+ */
+ event->data.u64 = item->data;
+ event->events = __atomic_exchange_n(&item->ready_events, 0,
+ __ATOMIC_RELEASE);
+
+ assert(event->events & ~EPOLLREMOVED);
+}
+
+/*
+ * Consume watch notifications, looking for EPOLL events.
+ */
+static int watch_queue_consumer(int wfd, struct watch_queue_buffer *buf,
+ struct epoll_uheader *header,
+ struct epoll_event *events,
+ bool can_sleep)
+{
+ struct watch_notification *n;
+ struct epoll_notification *en;
+ unsigned int len, head, tail, mask = buf->meta.mask;
+ unsigned int epoll_slot;
+ int nfds = 0;
+
+ /* 'tail' belongs to us and is where events are consumed from */
+ tail = buf->meta.tail;
+
+ /* 'head' belongs to the kernel and is where events are inserted. */
+ if (can_sleep) {
+ head = __atomic_load_n(&buf->meta.head, __ATOMIC_ACQUIRE);
+ if (tail == head) {
+ struct pollfd p[1];
+ p[0].fd = wfd;
+ p[0].events = POLLIN | POLLERR;
+ p[0].revents = 0;
+
+ if (poll(p, 1, -1) == -1)
+ err(EXIT_FAILURE, "wq/poll");
+ }
+ }
+
+#if 0
+ printf("ptrs h=%x t=%x m=%x\n",
+ buf->meta.head, buf->meta.tail, buf->meta.mask);
+#endif
+
+ head = __atomic_load_n(&buf->meta.head, __ATOMIC_ACQUIRE);
+ while (tail != head) {
+ n = &buf->slots[tail & mask];
+#if 0
+ printf("NOTIFY[%08x-%08x] ty=%04x sy=%04x i=%08x\n",
+ head, tail, n->type, n->subtype, n->info);
+#endif
+ if (buf->meta.watch.info & WATCH_INFO_NOTIFICATIONS_LOST)
+ printf("[!] notifications lost\n");
+
+ len = (n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT;
+ assert(len > 0);
+
+ switch (n->type) {
+ case WATCH_TYPE_META:
+#if 0
+ if (n->subtype == WATCH_META_REMOVAL_NOTIFICATION)
+ printf("REMOVAL of watchpoint %08x\n",
+ n->info & WATCH_INFO_ID);
+#endif
+ /* Fall through */
+ default:
+ tail += len;
+ __atomic_store_n(&buf->meta.tail, tail, __ATOMIC_RELEASE);
+ break;
+
+ case WATCH_TYPE_EPOLL_NOTIFY:
+ en = (struct epoll_notification *)n;
+ epoll_slot = en->watch.info & WATCH_INFO_TYPE_INFO;
+ epoll_slot >>= WATCH_INFO_TYPE_INFO__SHIFT;
+
+ /* Consume the slot before we clear the events */
+ tail += len;
+ __atomic_store_n(&buf->meta.tail, tail, __ATOMIC_RELEASE);
+
+ read_event(header, epoll_slot, &events[nfds]);
+ nfds++;
+ break;
+ }
+ }
+
+ return nfds;
+}
+
+/*
+ * Map the epoll descriptor table into the kernel.
+ */
+static void uepoll_mmap(int epfd, struct epoll_uheader **_header, size_t *_mapping_size)
+{
+ struct epoll_uheader *header;
+ unsigned int len;
+
+ BUILD_BUG_ON(sizeof(*header) != EPOLL_USERPOLL_HEADER_SIZE);
+ BUILD_BUG_ON(sizeof(header->items[0]) != 16);
+
+ len = sysconf(_SC_PAGESIZE);
+again:
+ header = mmap(NULL, len, PROT_WRITE|PROT_READ, MAP_SHARED, epfd, 0);
+ if (header == MAP_FAILED)
+ err(EXIT_FAILURE, "mmap(header)");
+
+ if (header->header_length != len) {
+ unsigned int tmp_len = len;
+
+ len = header->header_length;
+ munmap(header, tmp_len);
+ goto again;
+ }
+
+ assert(header->magic == EPOLL_USERPOLL_HEADER_MAGIC);
+ *_header = header;
+ *_mapping_size = len;
+}
+
+/*
+ * Create a watch queue and map it.
+ */
+static int create_watch_queue(unsigned int buffer_size,
+ struct watch_queue_buffer **_watch_queue)
+{
+ struct watch_queue_buffer *buf;
+ size_t page_size;
+ int wfd;
+
+ wfd = open("/dev/watch_queue", O_RDWR);
+ if (wfd == -1)
+ err(EXIT_FAILURE, "/dev/watch_queue");
+
+ if (ioctl(wfd, IOC_WATCH_QUEUE_SET_SIZE, buffer_size) == -1)
+ err(EXIT_FAILURE, "wq/size");
+
+ page_size = sysconf(_SC_PAGESIZE);
+ buf = mmap(NULL, buffer_size * page_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, wfd, 0);
+ if (buf == MAP_FAILED)
+ err(EXIT_FAILURE, "wq/mmap");
+
+ *_watch_queue = buf;
+ return wfd;
+}
+
+static int do_bench(struct cpu_map *cpu, unsigned int nthreads,
+ int wfd, struct watch_queue_buffer *watch_queue)
+{
+ struct epoll_event ev, events[nthreads];
+ struct thread_ctx threads[nthreads];
+ pthread_attr_t thrattr;
+ struct thread_ctx *ctx;
+ size_t mapping_size;
+ int rc, epfd, nfds;
+ cpu_set_t cpuset;
+ unsigned int i;
+
+ struct epoll_uheader *header;
+
+ unsigned long long epoll_calls = 0, epoll_nsecs;
+ unsigned long long ucnt, ucnt_sum = 0;
+
+ thr_ready = 0;
+ start = 0;
+
+ epfd = epoll_create2(EPOLL_USERPOLL, nthreads, wfd);
+ if (epfd < 0)
+ err(EXIT_FAILURE, "epoll_create2");
+
+ for (i = 0; i < nthreads; i++) {
+ ctx = &threads[i];
+
+ ctx->efd = eventfd(0, EFD_NONBLOCK);
+ if (ctx->efd < 0)
+ err(EXIT_FAILURE, "eventfd");
+
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.ptr = ctx;
+ rc = epoll_ctl(epfd, EPOLL_CTL_ADD, ctx->efd, &ev);
+ if (rc)
+ err(EXIT_FAILURE, "epoll_ctl");
+
+ CPU_ZERO(&cpuset);
+ CPU_SET(cpu->map[i % cpu->nr], &cpuset);
+
+ pthread_attr_init(&thrattr);
+ rc = pthread_attr_setaffinity_np(&thrattr, sizeof(cpu_set_t),
+ &cpuset);
+ if (rc) {
+ errno = rc;
+ err(EXIT_FAILURE, "pthread_attr_setaffinity_np");
+ }
+
+ rc = pthread_create(&ctx->thread, NULL, thread_work, ctx);
+ if (rc) {
+ errno = rc;
+ err(EXIT_FAILURE, "pthread_create");
+ }
+ }
+
+ /* Map all pointers */
+ uepoll_mmap(epfd, &header, &mapping_size);
+
+ while (thr_ready != nthreads)
+ ;
+
+ watch_queue_consumer(wfd, watch_queue, header, events, false);
+
+ /* Signal start for all threads */
+ start = 1;
+
+ epoll_nsecs = nsecs();
+ while (1) {
+ nfds = watch_queue_consumer(wfd, watch_queue, header, events,
+ true);
+ if (nfds < 0)
+ err(EXIT_FAILURE, "epoll_wait");
+
+ epoll_calls++;
+
+ for (i = 0; i < (unsigned int)nfds; ++i) {
+ ctx = events[i].data.ptr;
+ rc = read(ctx->efd, &ucnt, sizeof(ucnt));
+ if (rc < 0) {
+ assert(errno == EAGAIN);
+ continue;
+ }
+ assert(rc == sizeof(ucnt));
+ ucnt_sum += ucnt;
+ if (ucnt_sum == nthreads * ITERS)
+ goto end;
+ }
+ }
+end:
+ epoll_nsecs = nsecs() - epoll_nsecs;
+
+ for (i = 0; i < nthreads; i++) {
+ ctx = &threads[i];
+ pthread_join(ctx->thread, NULL);
+ }
+
+ close(epfd);
+
+ watch_queue_consumer(wfd, watch_queue, header, events, false);
+ munmap(header, mapping_size);
+
+ printf("%7d %8lld %8lld\n",
+ nthreads,
+ ITERS * nthreads / (epoll_nsecs / 1000 / 1000),
+ epoll_nsecs / 1000 / 1000);
+
+ return 0;
+}
+
+int main(int argc, char *argv[])
+{
+ static const unsigned int nthreads_arr[] = { 1, 8, 16, 32, 64, 128, 256 };
+ struct watch_queue_buffer *watch_queue;
+ struct cpu_map *cpu;
+ unsigned int i;
+ int wfd;
+
+ wfd = create_watch_queue(1, &watch_queue);
+
+ cpu = cpu_map__new();
+ if (!cpu) {
+ errno = ENOMEM;
+ err(EXIT_FAILURE, "cpu_map__new");
+ }
+
+ printf("threads events/ms run-time ms\n");
+ for (i = 0; i < sizeof(nthreads_arr) / sizeof(nthreads_arr[0]); i++)
+ do_bench(cpu, nthreads_arr[i], wfd, watch_queue);
+
+ cpu_map__put(cpu);
+
+ return 0;
+}