epoll: Add a sample program
diff --git a/include/uapi/linux/eventpoll.h b/include/uapi/linux/eventpoll.h
index a77ae13..3152ae7 100644
--- a/include/uapi/linux/eventpoll.h
+++ b/include/uapi/linux/eventpoll.h
@@ -16,7 +16,9 @@
 #define _UAPI_LINUX_EVENTPOLL_H
 
 /* For O_CLOEXEC */
+#ifndef O_CLOEXEC
 #include <linux/fcntl.h>
+#endif
 #include <linux/types.h>
 #include <linux/watch_queue.h>
 
diff --git a/samples/watch_queue/Makefile b/samples/watch_queue/Makefile
index 6ee61e3..1df08ab 100644
--- a/samples/watch_queue/Makefile
+++ b/samples/watch_queue/Makefile
@@ -1,8 +1,11 @@
 # List of programs to build
-hostprogs-y := watch_test
+hostprogs-y := watch_test watch_epoll
 
 # Tell kbuild to always build the programs
 always := $(hostprogs-y)
 
 HOSTCFLAGS_watch_test.o += -I$(objtree)/usr/include
 HOSTLDLIBS_watch_test += -lkeyutils
+
+HOSTCFLAGS_watch_epoll.o += -I$(objtree)/usr/include
+HOSTLDLIBS_watch_epoll += -lpthread -lnuma
diff --git a/samples/watch_queue/watch_epoll.c b/samples/watch_queue/watch_epoll.c
new file mode 100644
index 0000000..dc24a6c
--- /dev/null
+++ b/samples/watch_queue/watch_epoll.c
@@ -0,0 +1,446 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ *  Copyright (C) 2018	Roman Penyaev
+ *  Copyright (C) 2019	David Howells <dhowells@redhat.com>
+ *
+ *  Purpose of the tool is to generate N events from different threads and to
+ *  measure how fast those events will be delivered to thread which does epoll.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <time.h>
+#include <assert.h>
+#include <sys/mman.h>
+#include <sys/ioctl.h>
+#include <sys/eventfd.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <errno.h>
+#include <err.h>
+#include <numa.h>
+#include <poll.h>
+#include <linux/unistd.h>
+#include <linux/watch_queue.h>
+#define epoll_event xxx_epoll_event
+#include <linux/eventpoll.h>
+#undef epoll_event
+#undef EPOLL_CLOEXEC
+#undef EPOLLIN
+#undef EPOLLPRI
+#undef EPOLLOUT
+#undef EPOLLERR
+#undef EPOLLHUP
+#undef EPOLLNVAL
+#undef EPOLLRDNORM
+#undef EPOLLRDBAND
+#undef EPOLLWRNORM
+#undef EPOLLWRBAND
+#undef EPOLLMSG
+#undef EPOLLRDHUP
+#undef EPOLLEXCLUSIVE
+#undef EPOLLWAKEUP
+#undef EPOLLONESHOT
+#undef EPOLLET
+
+#include <sys/epoll.h>
+
+#define BUILD_BUG_ON(condition) ((void )sizeof(char [1 - 2*!!(condition)]))
+#define READ_ONCE(v) __atomic_load(&v, __ATOMIC_RELAXED)
+
+#define ITERS	  1000000ull
+
+#ifndef __NR_epoll_create2
+#define __NR_epoll_create2 -1
+#endif
+
+static inline long epoll_create2(int flags, size_t size, int watch_fd)
+{
+	return syscall(__NR_epoll_create2, flags, size, watch_fd);
+}
+
+struct thread_ctx {
+	pthread_t thread;
+	int efd;
+};
+
+struct cpu_map {
+	unsigned int nr;
+	unsigned int map[];
+};
+
+static volatile unsigned int thr_ready;
+static volatile unsigned int start;
+
+static int is_cpu_online(int cpu)
+{
+	char buf[64];
+	char online;
+	FILE *f;
+	int rc;
+
+	snprintf(buf, sizeof(buf), "/sys/devices/system/cpu/cpu%d/online", cpu);
+	f = fopen(buf, "r");
+	if (!f)
+		return 1;
+
+	rc = fread(&online, 1, 1, f);
+	assert(rc == 1);
+	fclose(f);
+
+	return (char)online == '1';
+}
+
+static struct cpu_map *cpu_map__new(void)
+{
+	struct cpu_map *cpu;
+	struct bitmask *bm;
+
+	int i, bit, cpus_nr;
+
+	cpus_nr = numa_num_possible_cpus();
+	cpu = calloc(1, sizeof(*cpu) + sizeof(cpu->map[0]) * cpus_nr);
+	if (!cpu)
+		return NULL;
+
+	bm = numa_all_cpus_ptr;
+	assert(bm);
+
+	for (bit = 0, i = 0; bit < bm->size; bit++) {
+		if (numa_bitmask_isbitset(bm, bit) && is_cpu_online(bit)) {
+			cpu->map[i++] = bit;
+		}
+	}
+	cpu->nr = i;
+
+	return cpu;
+}
+
+static void cpu_map__put(struct cpu_map *cpu)
+{
+	free(cpu);
+}
+
+static inline unsigned long long nsecs(void)
+{
+	struct timespec ts = {0, 0};
+
+	clock_gettime(CLOCK_MONOTONIC, &ts);
+	return ((unsigned long long)ts.tv_sec * 1000000000ull) + ts.tv_nsec;
+}
+
+static void *thread_work(void *arg)
+{
+	struct thread_ctx *ctx = arg;
+	__u64 ucnt = 1;
+	unsigned int i;
+	int rc;
+
+	__atomic_add_fetch(&thr_ready, 1, __ATOMIC_RELAXED);
+
+	while (!start)
+		;
+
+	for (i = 0; i < ITERS; i++) {
+		rc = write(ctx->efd, &ucnt, sizeof(ucnt));
+		assert(rc == sizeof(ucnt));
+	}
+
+	return NULL;
+}
+
+/*
+ * Process an event.
+ */
+static __attribute__((noinline))
+void read_event(struct epoll_uheader *header, unsigned int idx,
+		struct epoll_event *event)
+{
+	struct epoll_uitem *item = &header->items[idx];
+
+	assert(idx <= header->max_items_nr); /* Corrupted index? */
+
+	/*
+	 * Fetch data first, if event is cleared by the kernel we drop the data
+	 * returning false.
+	 */
+	event->data.u64	= item->data;
+	event->events	= __atomic_exchange_n(&item->ready_events, 0,
+					      __ATOMIC_RELEASE);
+	
+	assert(event->events & ~EPOLLREMOVED);
+}
+
+/*
+ * Consume watch notifications, looking for EPOLL events.
+ */
+static int watch_queue_consumer(int wfd, struct watch_queue_buffer *buf,
+				struct epoll_uheader *header,
+				struct epoll_event *events,
+				bool can_sleep)
+{
+	struct watch_notification *n;
+	struct epoll_notification *en;
+	unsigned int len, head, tail, mask = buf->meta.mask;
+	unsigned int epoll_slot;
+	int nfds = 0;
+
+	/* 'tail' belongs to us and is where events are consumed from */
+	tail = buf->meta.tail;
+
+	/* 'head' belongs to the kernel and is where events are inserted. */
+	if (can_sleep) {
+		head = __atomic_load_n(&buf->meta.head, __ATOMIC_ACQUIRE);
+		if (tail == head) {
+			struct pollfd p[1];
+			p[0].fd = wfd;
+			p[0].events = POLLIN | POLLERR;
+			p[0].revents = 0;
+
+			if (poll(p, 1, -1) == -1)
+				err(EXIT_FAILURE, "wq/poll");
+		}
+	}
+
+#if 0
+	printf("ptrs h=%x t=%x m=%x\n",
+	       buf->meta.head, buf->meta.tail, buf->meta.mask);
+#endif
+
+	head = __atomic_load_n(&buf->meta.head, __ATOMIC_ACQUIRE);
+	while (tail != head) {
+		n = &buf->slots[tail & mask];
+#if 0
+		printf("NOTIFY[%08x-%08x] ty=%04x sy=%04x i=%08x\n",
+		       head, tail, n->type, n->subtype, n->info);
+#endif
+		if (buf->meta.watch.info & WATCH_INFO_NOTIFICATIONS_LOST)
+			printf("[!] notifications lost\n");
+
+		len = (n->info & WATCH_INFO_LENGTH) >> WATCH_INFO_LENGTH__SHIFT;
+		assert(len > 0);
+
+		switch (n->type) {
+		case WATCH_TYPE_META:
+#if 0
+			if (n->subtype == WATCH_META_REMOVAL_NOTIFICATION)
+				printf("REMOVAL of watchpoint %08x\n",
+				       n->info & WATCH_INFO_ID);
+#endif
+			/* Fall through */
+		default:
+			tail += len;
+			__atomic_store_n(&buf->meta.tail, tail, __ATOMIC_RELEASE);
+			break;
+
+		case WATCH_TYPE_EPOLL_NOTIFY:
+			en = (struct epoll_notification *)n;
+			epoll_slot = en->watch.info & WATCH_INFO_TYPE_INFO;
+			epoll_slot >>= WATCH_INFO_TYPE_INFO__SHIFT;
+
+			/* Consume the slot before we clear the events */
+			tail += len;
+			__atomic_store_n(&buf->meta.tail, tail, __ATOMIC_RELEASE);
+
+			read_event(header, epoll_slot, &events[nfds]);
+			nfds++;
+			break;
+		}
+	}
+
+	return nfds;
+}
+
+/*
+ * Map the epoll descriptor table into the kernel.
+ */
+static void uepoll_mmap(int epfd, struct epoll_uheader **_header, size_t *_mapping_size)
+{
+	struct epoll_uheader *header;
+	unsigned int len;
+
+	BUILD_BUG_ON(sizeof(*header) != EPOLL_USERPOLL_HEADER_SIZE);
+	BUILD_BUG_ON(sizeof(header->items[0]) != 16);
+
+	len = sysconf(_SC_PAGESIZE);
+again:
+	header = mmap(NULL, len, PROT_WRITE|PROT_READ, MAP_SHARED, epfd, 0);
+	if (header == MAP_FAILED)
+		err(EXIT_FAILURE, "mmap(header)");
+
+	if (header->header_length != len) {
+		unsigned int tmp_len = len;
+
+		len = header->header_length;
+		munmap(header, tmp_len);
+		goto again;
+	}
+
+	assert(header->magic == EPOLL_USERPOLL_HEADER_MAGIC);
+	*_header = header;
+	*_mapping_size = len;
+}
+
+/*
+ * Create a watch queue and map it.
+ */
+static int create_watch_queue(unsigned int buffer_size,
+			      struct watch_queue_buffer **_watch_queue)
+{
+	struct watch_queue_buffer *buf;
+	size_t page_size;
+	int wfd;
+
+	wfd = open("/dev/watch_queue", O_RDWR);
+	if (wfd == -1)
+		err(EXIT_FAILURE, "/dev/watch_queue");
+
+	if (ioctl(wfd, IOC_WATCH_QUEUE_SET_SIZE, buffer_size) == -1)
+		err(EXIT_FAILURE, "wq/size");
+
+	page_size = sysconf(_SC_PAGESIZE);
+	buf = mmap(NULL, buffer_size * page_size,
+		   PROT_READ | PROT_WRITE, MAP_SHARED, wfd, 0);
+	if (buf == MAP_FAILED)
+		err(EXIT_FAILURE, "wq/mmap");
+
+	*_watch_queue = buf;
+	return wfd;
+}
+
+static int do_bench(struct cpu_map *cpu, unsigned int nthreads,
+		    int wfd, struct watch_queue_buffer *watch_queue)
+{
+	struct epoll_event ev, events[nthreads];
+	struct thread_ctx threads[nthreads];
+	pthread_attr_t thrattr;
+	struct thread_ctx *ctx;
+	size_t mapping_size;
+	int rc, epfd, nfds;
+	cpu_set_t cpuset;
+	unsigned int i;
+
+	struct epoll_uheader *header;
+
+	unsigned long long epoll_calls = 0, epoll_nsecs;
+	unsigned long long ucnt, ucnt_sum = 0;
+
+	thr_ready = 0;
+	start = 0;
+
+	epfd = epoll_create2(EPOLL_USERPOLL, nthreads, wfd);
+	if (epfd < 0)
+		err(EXIT_FAILURE, "epoll_create2");
+
+	for (i = 0; i < nthreads; i++) {
+		ctx = &threads[i];
+
+		ctx->efd = eventfd(0, EFD_NONBLOCK);
+		if (ctx->efd < 0)
+			err(EXIT_FAILURE, "eventfd");
+
+		ev.events = EPOLLIN | EPOLLET;
+		ev.data.ptr = ctx;
+		rc = epoll_ctl(epfd, EPOLL_CTL_ADD, ctx->efd, &ev);
+		if (rc)
+			err(EXIT_FAILURE, "epoll_ctl");
+
+		CPU_ZERO(&cpuset);
+		CPU_SET(cpu->map[i % cpu->nr], &cpuset);
+
+		pthread_attr_init(&thrattr);
+		rc = pthread_attr_setaffinity_np(&thrattr, sizeof(cpu_set_t),
+						 &cpuset);
+		if (rc) {
+			errno = rc;
+			err(EXIT_FAILURE, "pthread_attr_setaffinity_np");
+		}
+
+		rc = pthread_create(&ctx->thread, NULL, thread_work, ctx);
+		if (rc) {
+			errno = rc;
+			err(EXIT_FAILURE, "pthread_create");
+		}
+	}
+
+	/* Map all pointers */
+	uepoll_mmap(epfd, &header, &mapping_size);
+
+	while (thr_ready != nthreads)
+		;
+
+	watch_queue_consumer(wfd, watch_queue, header, events, false);
+	
+	/* Signal start for all threads */
+	start = 1;
+
+	epoll_nsecs = nsecs();
+	while (1) {
+		nfds = watch_queue_consumer(wfd, watch_queue, header, events,
+					    true);
+		if (nfds < 0)
+			err(EXIT_FAILURE, "epoll_wait");
+
+		epoll_calls++;
+
+		for (i = 0; i < (unsigned int)nfds; ++i) {
+			ctx = events[i].data.ptr;
+			rc = read(ctx->efd, &ucnt, sizeof(ucnt));
+			if (rc < 0) {
+				assert(errno == EAGAIN);
+				continue;
+			}
+			assert(rc == sizeof(ucnt));
+			ucnt_sum += ucnt;
+			if (ucnt_sum == nthreads * ITERS)
+				goto end;
+		}
+	}
+end:
+	epoll_nsecs = nsecs() - epoll_nsecs;
+
+	for (i = 0; i < nthreads; i++) {
+		ctx = &threads[i];
+		pthread_join(ctx->thread, NULL);
+	}
+
+	close(epfd);
+
+	watch_queue_consumer(wfd, watch_queue, header, events, false);
+	munmap(header, mapping_size);
+
+	printf("%7d   %8lld	%8lld\n",
+	       nthreads,
+	       ITERS * nthreads / (epoll_nsecs / 1000 / 1000),
+	       epoll_nsecs / 1000 / 1000);
+
+	return 0;
+}
+
+int main(int argc, char *argv[])
+{
+	static const unsigned int nthreads_arr[] = { 1, 8, 16, 32, 64, 128, 256 };
+	struct watch_queue_buffer *watch_queue;
+	struct cpu_map *cpu;
+	unsigned int i;
+	int wfd;
+
+	wfd = create_watch_queue(1, &watch_queue);
+
+	cpu = cpu_map__new();
+	if (!cpu) {
+		errno = ENOMEM;
+		err(EXIT_FAILURE, "cpu_map__new");
+	}
+
+	printf("threads	 events/ms  run-time ms\n");
+	for (i = 0; i < sizeof(nthreads_arr) / sizeof(nthreads_arr[0]); i++)
+		do_bench(cpu, nthreads_arr[i], wfd, watch_queue);
+
+	cpu_map__put(cpu);
+
+	return 0;
+}