rxrpc: Implement a single I/O thread per local endpoint
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index ddf12f8..f87bc8f 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -16,6 +16,13 @@
 /*
  * Declare tracing information enums and their string mappings for display.
  */
+#define rxrpc_call_poke_traces \
+	EM(rxrpc_call_poke_error,		"Error")	\
+	EM(rxrpc_call_poke_idle,		"Idle")		\
+	EM(rxrpc_call_poke_start,		"Start")	\
+	EM(rxrpc_call_poke_timer,		"Timer")	\
+	E_(rxrpc_call_poke_timer_now,		"Timer-now")
+
 #define rxrpc_skb_traces \
 	EM(rxrpc_skb_ack,			"ACK") \
 	EM(rxrpc_skb_cleaned,			"CLE") \
@@ -75,6 +82,7 @@
 	EM(rxrpc_call_error,			"*E*") \
 	EM(rxrpc_call_got,			"GOT") \
 	EM(rxrpc_call_got_kernel,		"Gke") \
+	EM(rxrpc_call_got_poke,			"Gpo") \
 	EM(rxrpc_call_got_timer,		"GTM") \
 	EM(rxrpc_call_got_tx,			"Gtx") \
 	EM(rxrpc_call_got_userid,		"Gus") \
@@ -84,6 +92,7 @@
 	EM(rxrpc_call_put_kernel,		"Pke") \
 	EM(rxrpc_call_put_noqueue,		"PnQ") \
 	EM(rxrpc_call_put_notimer,		"PnT") \
+	EM(rxrpc_call_put_poke,			"Ppo") \
 	EM(rxrpc_call_put_timer,		"PTM") \
 	EM(rxrpc_call_put_tx,			"Ptx") \
 	EM(rxrpc_call_put_userid,		"Pus") \
@@ -289,6 +298,7 @@
 #define EM(a, b) a,
 #define E_(a, b) a
 
+enum rxrpc_call_poke_trace	{ rxrpc_call_poke_traces } __mode(byte);
 enum rxrpc_call_trace		{ rxrpc_call_traces } __mode(byte);
 enum rxrpc_client_trace		{ rxrpc_client_traces } __mode(byte);
 enum rxrpc_congest_change	{ rxrpc_congest_changes } __mode(byte);
@@ -318,6 +328,7 @@ enum rxrpc_txqueue_trace	{ rxrpc_txqueue_traces } __mode(byte);
 #define EM(a, b) TRACE_DEFINE_ENUM(a);
 #define E_(a, b) TRACE_DEFINE_ENUM(a);
 
+rxrpc_call_poke_traces;
 rxrpc_call_traces;
 rxrpc_client_traces;
 rxrpc_congest_changes;
@@ -1598,6 +1609,47 @@ TRACE_EVENT(rxrpc_txbuf,
 		      __entry->ref)
 	    );
 
+TRACE_EVENT(rxrpc_poke_call,
+	    TP_PROTO(struct rxrpc_call *call, bool busy,
+		     enum rxrpc_call_poke_trace what),
+
+	    TP_ARGS(call, busy, what),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,		call_debug_id	)
+		    __field(bool,			busy		)
+		    __field(enum rxrpc_call_poke_trace,	what		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call_debug_id = call->debug_id;
+		    __entry->busy = busy;
+		    __entry->what = what;
+			   ),
+
+	    TP_printk("c=%08x %s%s",
+		      __entry->call_debug_id,
+		      __print_symbolic(__entry->what, rxrpc_call_poke_traces),
+		      __entry->busy ? "!" : "")
+	    );
+
+TRACE_EVENT(rxrpc_call_poked,
+	    TP_PROTO(struct rxrpc_call *call),
+
+	    TP_ARGS(call),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,		call_debug_id	)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call_debug_id = call->debug_id;
+			   ),
+
+	    TP_printk("c=%08x",
+		      __entry->call_debug_id)
+	    );
+
 #undef EM
 #undef E_
 #endif /* _TRACE_RXRPC_H */
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index fbab970..f3385fa 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -195,6 +195,7 @@ struct rxrpc_host_header {
  * - max 48 bytes (struct sk_buff::cb)
  */
 struct rxrpc_skb_priv {
+	struct rxrpc_call *call;	/* Call referred to (poke packet) */
 	u16		offset;		/* Offset of data */
 	u16		len;		/* Length of data */
 	u8		flags;
@@ -279,11 +280,10 @@ struct rxrpc_local {
 	struct rxrpc_net	*rxnet;		/* The network ns in which this resides */
 	struct hlist_node	link;
 	struct socket		*socket;	/* my UDP socket */
-	struct work_struct	processor;
+	struct task_struct	*io_thread;
 	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
-	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
-	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
+	struct sk_buff_head	rx_queue;	/* Received packets */
 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 	spinlock_t		lock;		/* access lock */
@@ -504,15 +504,13 @@ enum rxrpc_call_flag {
 	RXRPC_CALL_UPGRADE,		/* Service upgrade was requested for the call */
 	RXRPC_CALL_RX_IS_IDLE,		/* Reception is idle - send an ACK */
 	RXRPC_CALL_IS_DEAD,		/* The call is not interested in any further packets */
+	RXRPC_CALL_START_TIMERS,	/* The timers need starting */
 };
 
 /*
  * Events that can be raised on a call.
  */
 enum rxrpc_call_event {
-	RXRPC_CALL_EV_ABORT,		/* need to generate abort */
-	RXRPC_CALL_EV_RESEND,		/* Tx resend required */
-	RXRPC_CALL_EV_EXPIRED,		/* Expiry occurred */
 	RXRPC_CALL_EV_ACK_LOST,		/* ACK may be lost, send ping */
 	RXRPC_CALL_EV_INITIAL_PING,	/* Send initial ping for a new service call */
 };
@@ -583,7 +581,7 @@ struct rxrpc_call {
 	u32			next_req_timo;	/* Timeout for next Rx request packet (jif) */
 	struct skcipher_request	*cipher_req;	/* Packet cipher request buffer */
 	struct timer_list	timer;		/* Combined event timer */
-	struct work_struct	processor;	/* Event processor */
+	struct work_struct	destructor;	/* Call destructor */
 	rxrpc_notify_rx_t	notify_rx;	/* kernel service Rx notification function */
 	struct list_head	link;		/* link in master call list */
 	struct list_head	chan_wait_link;	/* Link in conn->bundle->waiting_calls */
@@ -592,6 +590,7 @@ struct rxrpc_call {
 	struct list_head	recvmsg_link;	/* Link in rx->recvmsg_q */
 	struct list_head	sock_link;	/* Link in rx->sock_calls */
 	struct rb_node		sock_node;	/* Node in rx->calls */
+	struct sk_buff		*poke;		/* Socket buffer used to poke the call */
 	struct rxrpc_txbuf	*tx_pending;	/* Tx buffer being filled */
 	wait_queue_head_t	waitq;		/* Wait queue for channel or Tx */
 	s64			tx_total_len;	/* Total length left to be transmitted (or -1) */
@@ -631,7 +630,6 @@ struct rxrpc_call {
 	ktime_t			tx_last_sent;	/* Last time a transmission occurred */
 
 	/* Received data tracking */
-	struct sk_buff_head	input_queue;	/* Received call-level packets */
 	struct sk_buff_head	rx_queue;	/* Queue of packets ready for recvmsg() */
 	struct sk_buff_head	rx_oos_queue;	/* Queue of out of sequence packets */
 
@@ -816,6 +814,7 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
 			     enum rxrpc_timer_trace why);
 
 void rxrpc_delete_call_timer(struct rxrpc_call *call);
+void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb);
 
 /*
  * call_object.c
@@ -824,6 +823,7 @@ extern const char *const rxrpc_call_states[];
 extern const char *const rxrpc_call_completions[];
 extern struct kmem_cache *rxrpc_call_jar;
 
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
 struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
@@ -835,8 +835,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
 			 struct sk_buff *);
 void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
-bool __rxrpc_queue_call(struct rxrpc_call *);
-bool rxrpc_queue_call(struct rxrpc_call *);
 void rxrpc_see_call(struct rxrpc_call *);
 bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op);
 void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
@@ -880,6 +878,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *);
  */
 void rxrpc_process_connection(struct work_struct *);
 void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
+int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
 
 /*
  * conn_object.c
@@ -943,6 +942,11 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
  * input.c
  */
 int rxrpc_input_packet(struct sock *, struct sk_buff *);
+int rxrpc_io_thread(void *data);
+static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
+{
+	wake_up_process(local->io_thread);
+}
 
 /*
  * insecure.c
@@ -961,7 +965,9 @@ int rxrpc_get_server_data_key(struct rxrpc_connection *, const void *, time64_t,
 /*
  * local_event.c
  */
-extern void rxrpc_process_local_events(struct rxrpc_local *);
+void rxrpc_send_version_request(struct rxrpc_local *local,
+				struct rxrpc_host_header *hdr,
+				struct sk_buff *skb);
 
 /*
  * local_object.c
@@ -972,7 +978,7 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *);
 void rxrpc_put_local(struct rxrpc_local *);
 struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *);
 void rxrpc_unuse_local(struct rxrpc_local *);
-void rxrpc_queue_local(struct rxrpc_local *);
+void rxrpc_destroy_local(struct rxrpc_local *local);
 void rxrpc_destroy_all_locals(struct rxrpc_net *);
 
 static inline bool __rxrpc_unuse_local(struct rxrpc_local *local)
@@ -1012,7 +1018,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
 int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
 int rxrpc_send_abort_packet(struct rxrpc_call *);
 int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
-void rxrpc_reject_packets(struct rxrpc_local *);
+void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
 void rxrpc_send_keepalive(struct rxrpc_peer *);
 void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
 
@@ -1050,6 +1056,7 @@ extern const struct seq_operations rxrpc_local_seq_ops;
 /*
  * receive.c
  */
+void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb);
 void rxrpc_receive(struct rxrpc_call *call, struct sk_buff *skb);
 
 /*
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 459855a..f4b88d5 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -290,30 +290,6 @@ static void rxrpc_send_initial_ping(struct rxrpc_call *call)
 			       rxrpc_propose_ack_ping_for_params);
 }
 
-/*
- * Decant some if the sendmsg prepared queue into the transmission buffer.
- */
-static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
-{
-	struct rxrpc_txbuf *txb;
-
-	if (rxrpc_is_client_call(call) &&
-	    !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
-		rxrpc_expose_client_call(call);
-
-	if ((txb = list_first_entry_or_null(&call->tx_sendmsg,
-					    struct rxrpc_txbuf, call_link))) {
-		spin_lock(&call->tx_lock);
-		list_del(&txb->call_link);
-		spin_unlock(&call->tx_lock);
-
-		call->tx_top = txb->seq;
-		list_add_tail(&txb->call_link, &call->tx_buffer);
-
-		rxrpc_transmit_one(call, txb);
-	}
-}
-
 static bool rxrpc_can_decant(struct rxrpc_call *call)
 {
 	unsigned int winsize;
@@ -329,16 +305,39 @@ static bool rxrpc_can_decant(struct rxrpc_call *call)
 }
 
 /*
+ * Decant some if the sendmsg prepared queue into the transmission buffer.
+ */
+static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+{
+	struct rxrpc_txbuf *txb;
+
+	if (rxrpc_is_client_call(call) &&
+	    !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
+		rxrpc_expose_client_call(call);
+
+	while (rxrpc_can_decant(call) &&
+	       (txb = list_first_entry_or_null(&call->tx_sendmsg,
+					       struct rxrpc_txbuf, call_link))) {
+		spin_lock(&call->tx_lock);
+		list_del(&txb->call_link);
+		spin_unlock(&call->tx_lock);
+
+		call->tx_top = txb->seq;
+		list_add_tail(&txb->call_link, &call->tx_buffer);
+
+		rxrpc_transmit_one(call, txb);
+	}
+}
+
+/*
  * Handle retransmission and deferred ACK/abort generation.
  */
-void rxrpc_process_call(struct work_struct *work)
+void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
 {
-	struct rxrpc_call *call =
-		container_of(work, struct rxrpc_call, processor);
-	struct sk_buff *skb;
 	unsigned long now, next, t;
-	unsigned int iterations = 0, error_report;
+	unsigned int error_report;
 	rxrpc_serial_t ackr_serial;
+	bool resend = false, expired = false;
 
 	rxrpc_see_call(call);
 
@@ -346,26 +345,12 @@ void rxrpc_process_call(struct work_struct *work)
 	_enter("{%d,%s,%lx}",
 	       call->debug_id, rxrpc_call_states[call->state], call->events);
 
-recheck_state:
 	if (call->acks_hard_ack != call->tx_bottom)
 		rxrpc_shrink_call_tx_buffer(call);
 
-	/* Limit the number of times we do this before returning to the manager */
-	if (!rxrpc_can_decant(call) &&
-	    skb_queue_empty(&call->input_queue)) {
-		iterations++;
-		if (iterations > 5)
-			goto requeue;
-	}
-
-	if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
-		rxrpc_send_abort_packet(call);
-		goto recheck_state;
-	}
-
 	if (call->state == RXRPC_CALL_COMPLETE) {
 		rxrpc_delete_call_timer(call);
-		goto out_put;
+		return;
 	}
 
 	error_report = READ_ONCE(call->error_report);
@@ -378,11 +363,75 @@ void rxrpc_process_call(struct work_struct *work)
 			rxrpc_set_call_completion(
 				call, RXRPC_CALL_NETWORK_ERROR, 0, -error_report);
 		}
-		goto out_put;
+		goto out;
 	}
 
-	if ((skb = skb_dequeue(&call->input_queue)))
+	//if (test_and_clear_bit(RXRPC_CALL_START_TIMERS, &call->flags))
+	//	rxrpc_start_call_timer(call);
+
+	/* If we see our async-event poke, check for timeout trippage. */
+	if (skb == call->poke) {
+		now = jiffies;
+		t = READ_ONCE(call->expect_rx_by);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now);
+			expired = true;
+		}
+
+		t = READ_ONCE(call->expect_req_by);
+		if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
+		    time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
+			expired = true;
+		}
+
+		t = READ_ONCE(call->expect_term_by);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now);
+			expired = true;
+		}
+
+		t = READ_ONCE(call->delay_ack_at);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
+			cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET);
+			ackr_serial = xchg(&call->ackr_serial, 0);
+			rxrpc_send_ACK(call, RXRPC_ACK_DELAY, ackr_serial,
+				       rxrpc_propose_ack_ping_for_lost_ack);
+		}
+
+		t = READ_ONCE(call->ack_lost_at);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now);
+			cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET);
+			set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events);
+		}
+
+		t = READ_ONCE(call->keepalive_at);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
+			cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
+			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+				       rxrpc_propose_ack_ping_for_keepalive);
+		}
+
+		t = READ_ONCE(call->ping_at);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
+			cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
+			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+				       rxrpc_propose_ack_ping_for_keepalive);
+		}
+
+		t = READ_ONCE(call->resend_at);
+		if (time_after_eq(now, t)) {
+			trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now);
+			cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET);
+			resend = true;
+		}
+	} else {
 		rxrpc_receive(call, skb);
+	}
 
 	if (rxrpc_can_decant(call))
 		rxrpc_decant_prepared_tx(call);
@@ -390,68 +439,8 @@ void rxrpc_process_call(struct work_struct *work)
 	if (!test_and_set_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
 		rxrpc_send_initial_ping(call);
 
-	/* Work out if any timeouts tripped */
-	now = jiffies;
-	t = READ_ONCE(call->expect_rx_by);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now);
-		set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
-	}
-
-	t = READ_ONCE(call->expect_req_by);
-	if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
-	    time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
-		set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
-	}
-
-	t = READ_ONCE(call->expect_term_by);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now);
-		set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
-	}
-
-	t = READ_ONCE(call->delay_ack_at);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
-		cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET);
-		ackr_serial = xchg(&call->ackr_serial, 0);
-		rxrpc_send_ACK(call, RXRPC_ACK_DELAY, ackr_serial,
-			       rxrpc_propose_ack_ping_for_lost_ack);
-	}
-
-	t = READ_ONCE(call->ack_lost_at);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now);
-		cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET);
-		set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events);
-	}
-
-	t = READ_ONCE(call->keepalive_at);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
-		cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
-		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
-			       rxrpc_propose_ack_ping_for_keepalive);
-	}
-
-	t = READ_ONCE(call->ping_at);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
-		cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
-		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
-			       rxrpc_propose_ack_ping_for_keepalive);
-	}
-
-	t = READ_ONCE(call->resend_at);
-	if (time_after_eq(now, t)) {
-		trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now);
-		cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET);
-		set_bit(RXRPC_CALL_EV_RESEND, &call->events);
-	}
-
 	/* Process events */
-	if (test_and_clear_bit(RXRPC_CALL_EV_EXPIRED, &call->events)) {
+	if (expired) {
 		if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
 		    (int)call->conn->hi_serial - (int)call->rx_serial > 0) {
 			trace_rxrpc_call_reset(call);
@@ -459,19 +448,16 @@ void rxrpc_process_call(struct work_struct *work)
 		} else {
 			rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
 		}
-		set_bit(RXRPC_CALL_EV_ABORT, &call->events);
-		goto recheck_state;
+		rxrpc_send_abort_packet(call);
+		goto out;
 	}
 
 	if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
 			       rxrpc_propose_ack_ping_for_lost_ack);
 
-	if (test_and_clear_bit(RXRPC_CALL_EV_RESEND, &call->events) &&
-	    call->state != RXRPC_CALL_CLIENT_RECV_REPLY) {
-		rxrpc_resend(call, skb);
-		goto recheck_state;
-	}
+	if (resend && call->state != RXRPC_CALL_CLIENT_RECV_REPLY)
+		rxrpc_resend(call, NULL);
 
 	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
 		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
@@ -482,35 +468,28 @@ void rxrpc_process_call(struct work_struct *work)
 			       rxrpc_propose_ack_input_data);
 
 	/* Make sure the timer is restarted */
-	next = call->expect_rx_by;
+	if (call->state != RXRPC_CALL_COMPLETE) {
+		next = call->expect_rx_by;
 
 #define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; }
 
-	set(call->expect_req_by);
-	set(call->expect_term_by);
-	set(call->delay_ack_at);
-	set(call->ack_lost_at);
-	set(call->resend_at);
-	set(call->keepalive_at);
-	set(call->ping_at);
+		set(call->expect_req_by);
+		set(call->expect_term_by);
+		set(call->delay_ack_at);
+		set(call->ack_lost_at);
+		set(call->resend_at);
+		set(call->keepalive_at);
+		set(call->ping_at);
 
-	now = jiffies;
-	if (time_after_eq(now, next))
-		goto recheck_state;
+		now = jiffies;
+		if (time_after_eq(now, next))
+			rxrpc_poke_call(call, rxrpc_call_poke_timer_now);
 
-	rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
+		rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
+	}
 
-	/* other events may have been raised since we started checking */
-	if (call->events && call->state < RXRPC_CALL_COMPLETE)
-		goto requeue;
-
-out_put:
-	rxrpc_put_call(call, rxrpc_call_put);
 out:
+	if (call->state == RXRPC_CALL_COMPLETE)
+		rxrpc_delete_call_timer(call);
 	_leave("");
-	return;
-
-requeue:
-	__rxrpc_queue_call(call);
-	goto out;
 }
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 8b8937f..ca7601b 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -45,6 +45,33 @@ static struct semaphore rxrpc_call_limiter =
 static struct semaphore rxrpc_kernel_call_limiter =
 	__SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
 
+static void rxrpc_destroy_call(struct work_struct *work);
+
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
+{
+	struct rxrpc_local *local;
+	struct rxrpc_peer *peer = call->peer;
+	unsigned long flags;
+	bool busy;
+
+	if (WARN_ON_ONCE(!peer))
+		return;
+	local = peer->local;
+
+	if (call->state < RXRPC_CALL_COMPLETE) {
+		spin_lock_irqsave(&local->rx_queue.lock, flags);
+		busy = (call->poke->next != NULL);
+		trace_rxrpc_poke_call(call, busy, what);
+		if (!busy) {
+			rxrpc_get_call(call, rxrpc_call_got_poke);
+			rxrpc_get_skb(call->poke, rxrpc_skb_got);
+			__skb_queue_tail(&local->rx_queue, call->poke);
+		}
+		spin_unlock_irqrestore(&local->rx_queue.lock, flags);
+		rxrpc_wake_up_io_thread(local);
+	}
+}
+
 static void rxrpc_call_timer_expired(struct timer_list *t)
 {
 	struct rxrpc_call *call = from_timer(call, t, timer);
@@ -53,9 +80,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t)
 
 	if (call->state < RXRPC_CALL_COMPLETE) {
 		trace_rxrpc_timer_expired(call, jiffies);
-		__rxrpc_queue_call(call);
-	} else {
-		rxrpc_put_call(call, rxrpc_call_put);
+		rxrpc_poke_call(call, rxrpc_call_poke_timer);
 	}
 }
 
@@ -64,17 +89,13 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
 			     unsigned long now,
 			     enum rxrpc_timer_trace why)
 {
-	if (rxrpc_try_get_call(call, rxrpc_call_got_timer)) {
-		trace_rxrpc_timer(call, why, now);
-		if (timer_reduce(&call->timer, expire_at))
-			rxrpc_put_call(call, rxrpc_call_put_notimer);
-	}
+	trace_rxrpc_timer(call, why, now);
+	timer_reduce(&call->timer, expire_at);
 }
 
 void rxrpc_delete_call_timer(struct rxrpc_call *call)
 {
-	if (del_timer_sync(&call->timer))
-		rxrpc_put_call(call, rxrpc_call_put_timer);
+	del_timer_sync(&call->timer);
 }
 
 static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
@@ -122,6 +143,7 @@ struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
 struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 				    unsigned int debug_id)
 {
+	struct rxrpc_skb_priv *sp;
 	struct rxrpc_call *call;
 	struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
 
@@ -129,6 +151,16 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	if (!call)
 		return NULL;
 
+	call->poke = alloc_skb(0, gfp);
+	if (!call->poke) {
+		kmem_cache_free(rxrpc_call_jar, call);
+		return NULL;
+	}
+
+	rxrpc_new_skb(call->poke, rxrpc_skb_new);
+	sp = rxrpc_skb(call->poke);
+	sp->call = call;
+
 	mutex_init(&call->user_mutex);
 
 	/* Prevent lockdep reporting a deadlock false positive between the afs
@@ -139,7 +171,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 				  &rxrpc_call_user_mutex_lock_class_key);
 
 	timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
-	INIT_WORK(&call->processor, &rxrpc_process_call);
+	INIT_WORK(&call->destructor, rxrpc_destroy_call);
 	INIT_LIST_HEAD(&call->link);
 	INIT_LIST_HEAD(&call->chan_wait_link);
 	INIT_LIST_HEAD(&call->accept_link);
@@ -147,7 +179,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	INIT_LIST_HEAD(&call->sock_link);
 	INIT_LIST_HEAD(&call->tx_sendmsg);
 	INIT_LIST_HEAD(&call->tx_buffer);
-	skb_queue_head_init(&call->input_queue);
 	skb_queue_head_init(&call->rx_queue);
 	skb_queue_head_init(&call->rx_oos_queue);
 	init_waitqueue_head(&call->waitq);
@@ -160,6 +191,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	call->next_rx_timo = 20 * HZ;
 	call->next_req_timo = 1 * HZ;
 	atomic64_set(&call->ackr_window, 0x100000001ULL);
+	__set_bit(RXRPC_CALL_START_TIMERS, &call->flags);
 
 	memset(&call->sock_node, 0xed, sizeof(call->sock_node));
 
@@ -219,6 +251,7 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
 	call->ack_lost_at = j;
 	call->resend_at = j;
 	call->ping_at = j;
+	call->keepalive_at = j;
 	call->expect_rx_by = j;
 	call->expect_req_by = j;
 	call->expect_term_by = j;
@@ -347,7 +380,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 
 	trace_rxrpc_call(call->debug_id, rxrpc_call_connected,
 			 refcount_read(&call->ref), here, NULL);
-
 	rxrpc_start_call_timer(call);
 
 	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
@@ -455,40 +487,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
 }
 
 /*
- * Queue a call's work processor, getting a ref to pass to the work queue.
- */
-bool rxrpc_queue_call(struct rxrpc_call *call)
-{
-	const void *here = __builtin_return_address(0);
-	int n;
-
-	if (!__refcount_inc_not_zero(&call->ref, &n))
-		return false;
-	if (rxrpc_queue_work(&call->processor))
-		trace_rxrpc_call(call->debug_id, rxrpc_call_queued, n + 1,
-				 here, NULL);
-	else
-		rxrpc_put_call(call, rxrpc_call_put_noqueue);
-	return true;
-}
-
-/*
- * Queue a call's work processor, passing the callers ref to the work queue.
- */
-bool __rxrpc_queue_call(struct rxrpc_call *call)
-{
-	const void *here = __builtin_return_address(0);
-	int n = refcount_read(&call->ref);
-	ASSERTCMP(n, >=, 1);
-	if (rxrpc_queue_work(&call->processor))
-		trace_rxrpc_call(call->debug_id, rxrpc_call_queued_ref, n,
-				 here, NULL);
-	else
-		rxrpc_put_call(call, rxrpc_call_put_noqueue);
-	return true;
-}
-
-/*
  * Note the re-emergence of a call.
  */
 void rxrpc_see_call(struct rxrpc_call *call)
@@ -659,11 +657,12 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
  */
 static void rxrpc_destroy_call(struct work_struct *work)
 {
-	struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
+	struct rxrpc_call *call = container_of(work, struct rxrpc_call, destructor);
 	struct rxrpc_net *rxnet = call->rxnet;
 
 	rxrpc_delete_call_timer(call);
 
+	rxrpc_free_skb(call->poke, rxrpc_skb_cleaned);
 	rxrpc_put_connection(call->conn);
 	rxrpc_put_peer(call->peer);
 	kmem_cache_free(rxrpc_call_jar, call);
@@ -679,11 +678,10 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
 	struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
 
 	if (in_softirq()) {
-		INIT_WORK(&call->processor, rxrpc_destroy_call);
-		if (!rxrpc_queue_work(&call->processor))
+		if (!rxrpc_queue_work(&call->destructor))
 			BUG();
 	} else {
-		rxrpc_destroy_call(&call->processor);
+		rxrpc_destroy_call(&call->destructor);
 	}
 }
 
@@ -713,8 +711,6 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
 		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
 	}
 	rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
-	rxrpc_purge_queue(&call->input_queue);
-
 	call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
 }
 
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index f606ccd..111108a 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -497,3 +497,77 @@ void rxrpc_process_connection(struct work_struct *work)
 	_leave("");
 	return;
 }
+
+/*
+ * post connection-level events to the connection
+ * - this includes challenges, responses, some aborts and call terminal packet
+ *   retransmission.
+ */
+static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
+				      struct sk_buff *skb)
+{
+	_enter("%p,%p", conn, skb);
+
+	skb_queue_tail(&conn->rx_queue, skb);
+	rxrpc_queue_conn(conn);
+}
+
+/*
+ * Input a connection-level packet.
+ */
+int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
+{
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	__be32 wtmp;
+	u32 abort_code;
+
+	if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) {
+		_leave(" = -ECONNABORTED [%u]", conn->state);
+		return -ECONNABORTED;
+	}
+
+	_enter("{%d},{%u,%%%u},", conn->debug_id, sp->hdr.type, sp->hdr.serial);
+
+	switch (sp->hdr.type) {
+	case RXRPC_PACKET_TYPE_DATA:
+	case RXRPC_PACKET_TYPE_ACK:
+		rxrpc_conn_retransmit_call(conn, skb,
+					   sp->hdr.cid & RXRPC_CHANNELMASK);
+		rxrpc_free_skb(skb, rxrpc_skb_freed);
+		return 0;
+
+	case RXRPC_PACKET_TYPE_BUSY:
+		/* Just ignore BUSY packets for now. */
+		rxrpc_free_skb(skb, rxrpc_skb_freed);
+		return 0;
+
+	case RXRPC_PACKET_TYPE_ABORT:
+		if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
+				  &wtmp, sizeof(wtmp)) < 0) {
+			trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
+					      tracepoint_string("bad_abort"));
+			return -EPROTO;
+		}
+		abort_code = ntohl(wtmp);
+		_proto("Rx ABORT %%%u { ac=%d }", sp->hdr.serial, abort_code);
+
+		conn->error = -ECONNABORTED;
+		conn->abort_code = abort_code;
+		conn->state = RXRPC_CONN_REMOTELY_ABORTED;
+		set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
+		rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED, sp->hdr.serial);
+		rxrpc_free_skb(skb, rxrpc_skb_freed);
+		return -ECONNABORTED;
+
+	case RXRPC_PACKET_TYPE_CHALLENGE:
+	case RXRPC_PACKET_TYPE_RESPONSE:
+		rxrpc_post_packet_to_conn(conn, skb);
+		return 0;
+
+	default:
+		trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
+				      tracepoint_string("bad_conn_pkt"));
+		rxrpc_free_skb(skb, rxrpc_skb_freed);
+		return -EPROTO;
+	}
+}
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 5a0a7b2..05894a9 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1,7 +1,7 @@
 // SPDX-License-Identifier: GPL-2.0-or-later
 /* RxRPC packet reception
  *
- * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  */
 
@@ -10,103 +10,58 @@
 #include "ar-internal.h"
 
 /*
- * Post a call-level packet to a call.
- */
-static void rxrpc_post_packet_to_call(struct rxrpc_call *call, struct sk_buff *skb)
-{
-	if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
-		set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
-	if (!test_bit(RXRPC_CALL_IS_DEAD, &call->flags)) {
-		skb_queue_tail(&call->input_queue, skb);
-		rxrpc_queue_call(call);
-	} else {
-		rxrpc_free_skb(skb, rxrpc_skb_freed);
-	}
-}
-
-/*
- * Handle a new service call on a channel implicitly completing the preceding
- * call on that channel.  This does not apply to client conns.
+ * handle data received on the local endpoint
+ * - may be called in interrupt context
  *
- * TODO: If callNumber > call_id + 1, renegotiate security.
+ * [!] Note that as this is called from the encap_rcv hook, the socket is not
+ * held locked by the caller and nothing prevents sk_user_data on the UDP from
+ * being cleared in the middle of processing this function.
+ *
+ * Called with the RCU read lock held from the IP layer via UDP.
  */
-static struct sk_buff *rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
-						     struct rxrpc_channel *chan,
-						     struct rxrpc_call *call,
-						     struct sk_buff *skb)
+int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 {
-	struct sk_buff *copy;
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
 
-	/* We post a copy of the message to the call we're terminating to make
-	 * sure it gets terminated.  If we fail to copy the message, we steal
-	 * the original and hope the server sends us a new copy.
-	 */
-	if (call->state < RXRPC_CALL_COMPLETE &&
-	    cmpxchg(&chan->call, call, NULL) == call) {
-		set_bit(RXRPC_CALL_IS_DEAD, &call->flags);
-		copy = skb_clone(skb, GFP_ATOMIC | __GFP_NOWARN);
-		if (copy) {
-			rxrpc_new_skb(skb, rxrpc_skb_new);
-		} else {
-			copy = skb;
-			skb = NULL;
-		}
-		skb_queue_tail(&call->input_queue, copy);
-		rxrpc_queue_call(call);
+	if (unlikely(!local)) {
+		kfree_skb(skb);
+		return 0;
 	}
+	if (skb->tstamp == 0)
+		skb->tstamp = ktime_get_real();
 
-	return skb;
+	rxrpc_new_skb(skb, rxrpc_skb_received);
+	memset(sp, 0, sizeof(*sp));
+	skb_queue_tail(&local->rx_queue, skb);
+	rxrpc_wake_up_io_thread(local);
+	return 0;
 }
 
 /*
- * post connection-level events to the connection
- * - this includes challenges, responses, some aborts and call terminal packet
- *   retransmission.
+ * Process event packets targeted at a local endpoint.
  */
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
-				      struct sk_buff *skb)
+static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
 {
-	_enter("%p,%p", conn, skb);
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	char v;
 
-	skb_queue_tail(&conn->rx_queue, skb);
-	rxrpc_queue_conn(conn);
-}
+	_enter("");
 
-/*
- * post endpoint-level events to the local endpoint
- * - this includes debug and version messages
- */
-static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
-				       struct sk_buff *skb)
-{
-	_enter("%p,%p", local, skb);
-
-	if (rxrpc_get_local_maybe(local)) {
-		skb_queue_tail(&local->event_queue, skb);
-		rxrpc_queue_local(local);
-	} else {
-		rxrpc_free_skb(skb, rxrpc_skb_freed);
+	rxrpc_see_skb(skb, rxrpc_skb_seen);
+	if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
+		_proto("Rx VERSION { %02x }", v);
+		if (v == 0)
+			rxrpc_send_version_request(local, &sp->hdr, skb);
 	}
-}
 
-/*
- * put a packet up for transport-level abort
- */
-static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
-{
-	if (rxrpc_get_local_maybe(local)) {
-		skb_queue_tail(&local->reject_queue, skb);
-		rxrpc_queue_local(local);
-	} else {
-		rxrpc_free_skb(skb, rxrpc_skb_freed);
-	}
+	rxrpc_free_skb(skb, rxrpc_skb_freed);
 }
 
 /*
  * Extract the wire header from a packet and translate the byte order.
  */
-static noinline
-int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
+static int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
 {
 	struct rxrpc_wire_header whdr;
 
@@ -117,7 +72,6 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
 		return -EBADMSG;
 	}
 
-	memset(sp, 0, sizeof(*sp));
 	sp->hdr.epoch		= ntohl(whdr.epoch);
 	sp->hdr.cid		= ntohl(whdr.cid);
 	sp->hdr.callNumber	= ntohl(whdr.callNumber);
@@ -133,18 +87,11 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
 }
 
 /*
- * handle data received on the local endpoint
- * - may be called in interrupt context
- *
- * [!] Note that as this is called from the encap_rcv hook, the socket is not
- * held locked by the caller and nothing prevents sk_user_data on the UDP from
- * being cleared in the middle of processing this function.
- *
- * Called with the RCU read lock held from the IP layer via UDP.
+ * Process a socket buffer, distributing it to the appropriate connection or
+ * call.
  */
-int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
+static void rxrpc_input_one_packet(struct rxrpc_local *local, struct sk_buff *skb)
 {
-	struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
 	struct rxrpc_connection *conn;
 	struct rxrpc_channel *chan;
 	struct rxrpc_call *call = NULL;
@@ -153,23 +100,17 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	struct rxrpc_sock *rx = NULL;
 	unsigned int channel;
 
-	_enter("%p", udp_sk);
-
-	if (unlikely(!local)) {
-		kfree_skb(skb);
-		return 0;
-	}
-	if (skb->tstamp == 0)
-		skb->tstamp = ktime_get_real();
-
-	rxrpc_new_skb(skb, rxrpc_skb_received);
+	_enter("");
 
 	skb_pull(skb, sizeof(struct udphdr));
 
-	/* The UDP protocol already released all skb resources;
-	 * we are free to add our own data there.
-	 */
 	sp = rxrpc_skb(skb);
+	if (sp->call) {
+		trace_rxrpc_call_poked(sp->call);
+		rxrpc_input_call_packet(sp->call, skb);
+		rxrpc_put_call(sp->call, rxrpc_call_put_poke);
+		goto discard;
+	}
 
 	/* dig out the RxRPC connection details */
 	if (rxrpc_extract_header(sp, skb) < 0)
@@ -180,19 +121,17 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 		if ((lose++ & 7) == 7) {
 			trace_rxrpc_rx_lose(sp);
 			rxrpc_free_skb(skb, rxrpc_skb_lost);
-			return 0;
+			return;
 		}
 	}
 
-	if (skb->tstamp == 0)
-		skb->tstamp = ktime_get_real();
 	trace_rxrpc_rx_packet(sp);
 
 	switch (sp->hdr.type) {
 	case RXRPC_PACKET_TYPE_VERSION:
 		if (rxrpc_to_client(sp))
 			goto discard;
-		rxrpc_post_packet_to_local(local, skb);
+		rxrpc_input_version(local, skb);
 		goto out;
 
 	case RXRPC_PACKET_TYPE_BUSY:
@@ -272,7 +211,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 		if (sp->hdr.callNumber == 0) {
 			/* Connection-level packet */
 			_debug("CONN %p {%d}", conn, conn->debug_id);
-			rxrpc_post_packet_to_conn(conn, skb);
+			rxrpc_input_conn_packet(conn, skb);
 			goto out;
 		}
 
@@ -307,7 +246,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 						    sp->hdr.seq,
 						    sp->hdr.serial,
 						    sp->hdr.flags);
-			rxrpc_post_packet_to_conn(conn, skb);
+			rxrpc_input_conn_packet(conn, skb);
 			goto out;
 		}
 
@@ -317,11 +256,10 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 			if (rxrpc_to_client(sp))
 				goto reject_packet;
 			if (call) {
-				skb = rxrpc_input_implicit_end_call(rx, chan, call, skb);
-				if (!skb)
-					goto out;
+				rxrpc_implicit_end_call(call, skb);
+				chan->call = NULL;
+				call = NULL;
 			}
-			call = NULL;
 		}
 	}
 
@@ -339,14 +277,16 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	/* Process a call packet; this either discards or passes on the ref
 	 * elsewhere.
 	 */
-	rxrpc_post_packet_to_call(call, skb);
+	if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
+		set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
+	rxrpc_input_call_packet(call, skb);
 	goto out;
 
 discard:
 	rxrpc_free_skb(skb, rxrpc_skb_freed);
 out:
 	trace_rxrpc_rx_done(0, 0);
-	return 0;
+	return;
 
 wrong_security:
 	trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
@@ -376,5 +316,44 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	trace_rxrpc_rx_done(skb->mark, skb->priority);
 	rxrpc_reject_packet(local, skb);
 	_leave(" [badmsg]");
+	return;
+}
+
+/*
+ * I/O and event handling thread.
+ */
+int rxrpc_io_thread(void *data)
+{
+	struct sk_buff_head rx_queue;
+	struct rxrpc_local *local = data;
+	struct sk_buff *skb;
+
+	skb_queue_head_init(&rx_queue);
+
+	set_user_nice(current, MIN_NICE);
+
+	for (;;) {
+		if (!skb_queue_empty(&local->rx_queue)) {
+			spin_lock_irq(&local->rx_queue.lock);
+			skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
+			spin_unlock_irq(&local->rx_queue.lock);
+		}
+
+		while ((skb = __skb_dequeue(&rx_queue)))
+			rxrpc_input_one_packet(local, skb);
+
+		set_current_state(TASK_INTERRUPTIBLE);
+		if (kthread_should_stop())
+			break;
+		if (!skb_queue_empty(&local->rx_queue)) {
+			__set_current_state(TASK_RUNNING);
+			continue;
+		}
+		schedule();
+	}
+
+	__set_current_state(TASK_RUNNING);
+	rxrpc_destroy_local(local);
+	local->io_thread = NULL;
 	return 0;
 }
diff --git a/net/rxrpc/local_event.c b/net/rxrpc/local_event.c
index 19e929c..47474a1 100644
--- a/net/rxrpc/local_event.c
+++ b/net/rxrpc/local_event.c
@@ -21,9 +21,9 @@ static const char rxrpc_version_string[65] = "linux-" UTS_RELEASE " AF_RXRPC";
 /*
  * Reply to a version request
  */
-static void rxrpc_send_version_request(struct rxrpc_local *local,
-				       struct rxrpc_host_header *hdr,
-				       struct sk_buff *skb)
+void rxrpc_send_version_request(struct rxrpc_local *local,
+				struct rxrpc_host_header *hdr,
+				struct sk_buff *skb)
 {
 	struct rxrpc_wire_header whdr;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -75,41 +75,3 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
 
 	_leave("");
 }
-
-/*
- * Process event packets targeted at a local endpoint.
- */
-void rxrpc_process_local_events(struct rxrpc_local *local)
-{
-	struct sk_buff *skb;
-	char v;
-
-	_enter("");
-
-	skb = skb_dequeue(&local->event_queue);
-	if (skb) {
-		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-
-		rxrpc_see_skb(skb, rxrpc_skb_seen);
-		_debug("{%d},{%u}", local->debug_id, sp->hdr.type);
-
-		switch (sp->hdr.type) {
-		case RXRPC_PACKET_TYPE_VERSION:
-			if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
-					  &v, 1) < 0)
-				return;
-			_proto("Rx VERSION { %02x }", v);
-			if (v == 0)
-				rxrpc_send_version_request(local, &sp->hdr, skb);
-			break;
-
-		default:
-			/* Just ignore anything we don't understand */
-			break;
-		}
-
-		rxrpc_free_skb(skb, rxrpc_skb_freed);
-	}
-
-	_leave("");
-}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 38ea98f..3098fe9 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -20,7 +20,6 @@
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
-static void rxrpc_local_processor(struct work_struct *);
 static void rxrpc_local_rcu(struct rcu_head *);
 
 /*
@@ -83,10 +82,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
 		atomic_set(&local->active_users, 1);
 		local->rxnet = rxnet;
 		INIT_HLIST_NODE(&local->link);
-		INIT_WORK(&local->processor, rxrpc_local_processor);
 		init_rwsem(&local->defrag_sem);
-		skb_queue_head_init(&local->reject_queue);
-		skb_queue_head_init(&local->event_queue);
+		skb_queue_head_init(&local->rx_queue);
 		local->client_bundles = RB_ROOT;
 		spin_lock_init(&local->client_bundles_lock);
 		spin_lock_init(&local->lock);
@@ -110,6 +107,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 	struct udp_tunnel_sock_cfg tuncfg = {NULL};
 	struct sockaddr_rxrpc *srx = &local->srx;
 	struct udp_port_cfg udp_conf = {0};
+	struct task_struct *io_thread;
 	struct sock *usk;
 	int ret;
 
@@ -169,8 +167,23 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 		BUG();
 	}
 
+	io_thread = kthread_run(rxrpc_io_thread, local,
+				"krxrpcio/%u", ntohs(udp_conf.local_udp_port));
+	if (IS_ERR(io_thread)) {
+		ret = PTR_ERR(io_thread);
+		goto error_sock;
+	}
+
+	local->io_thread = io_thread;
 	_leave(" = 0");
 	return 0;
+
+error_sock:
+	kernel_sock_shutdown(local->socket, SHUT_RDWR);
+	local->socket->sk->sk_user_data = NULL;
+	sock_release(local->socket);
+	local->socket = NULL;
+	return ret;
 }
 
 /*
@@ -292,21 +305,6 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
 }
 
 /*
- * Queue a local endpoint and pass the caller's reference to the work item.
- */
-void rxrpc_queue_local(struct rxrpc_local *local)
-{
-	const void *here = __builtin_return_address(0);
-	unsigned int debug_id = local->debug_id;
-	int r = refcount_read(&local->ref);
-
-	if (rxrpc_queue_work(&local->processor))
-		trace_rxrpc_local(debug_id, rxrpc_local_queued, r + 1, here);
-	else
-		rxrpc_put_local(local);
-}
-
-/*
  * Drop a ref on a local endpoint.
  */
 void rxrpc_put_local(struct rxrpc_local *local)
@@ -346,16 +344,12 @@ struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local)
 
 /*
  * Cease using a local endpoint.  Once the number of active users reaches 0, we
- * start the closure of the transport in the work processor.
+ * start the closure of the transport in the I/O thread..
  */
 void rxrpc_unuse_local(struct rxrpc_local *local)
 {
-	if (local) {
-		if (__rxrpc_unuse_local(local)) {
-			rxrpc_get_local(local);
-			rxrpc_queue_local(local);
-		}
-	}
+	if (local && __rxrpc_unuse_local(local))
+		kthread_stop(local->io_thread);
 }
 
 /*
@@ -365,7 +359,7 @@ void rxrpc_unuse_local(struct rxrpc_local *local)
  * Closing the socket cannot be done from bottom half context or RCU callback
  * context because it might sleep.
  */
-static void rxrpc_local_destroyer(struct rxrpc_local *local)
+void rxrpc_destroy_local(struct rxrpc_local *local)
 {
 	struct socket *socket = local->socket;
 	struct rxrpc_net *rxnet = local->rxnet;
@@ -388,51 +382,6 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
 		socket->sk->sk_user_data = NULL;
 		sock_release(socket);
 	}
-
-	/* At this point, there should be no more packets coming in to the
-	 * local endpoint.
-	 */
-	rxrpc_purge_queue(&local->reject_queue);
-	rxrpc_purge_queue(&local->event_queue);
-}
-
-/*
- * Process events on an endpoint.  The work item carries a ref which
- * we must release.
- */
-static void rxrpc_local_processor(struct work_struct *work)
-{
-	struct rxrpc_local *local =
-		container_of(work, struct rxrpc_local, processor);
-	bool again;
-
-	if (local->dead)
-		return;
-
-	trace_rxrpc_local(local->debug_id, rxrpc_local_processing,
-			  refcount_read(&local->ref), NULL);
-
-	do {
-		again = false;
-		if (!__rxrpc_use_local(local)) {
-			rxrpc_local_destroyer(local);
-			break;
-		}
-
-		if (!skb_queue_empty(&local->reject_queue)) {
-			rxrpc_reject_packets(local);
-			again = true;
-		}
-
-		if (!skb_queue_empty(&local->event_queue)) {
-			rxrpc_process_local_events(local);
-			again = true;
-		}
-
-		__rxrpc_unuse_local(local);
-	} while (again);
-
-	rxrpc_put_local(local);
 }
 
 /*
@@ -444,8 +393,6 @@ static void rxrpc_local_rcu(struct rcu_head *rcu)
 
 	_enter("%d", local->debug_id);
 
-	ASSERT(!work_pending(&local->processor));
-
 	_net("DESTROY LOCAL %d", local->debug_id);
 	kfree(local);
 	_leave("");
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 33eb5f9..f823a0e 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -543,14 +543,13 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
 }
 
 /*
- * reject packets through the local endpoint
+ * Reject a packet through the local endpoint.
  */
-void rxrpc_reject_packets(struct rxrpc_local *local)
+void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
 {
-	struct sockaddr_rxrpc srx;
-	struct rxrpc_skb_priv *sp;
 	struct rxrpc_wire_header whdr;
-	struct sk_buff *skb;
+	struct sockaddr_rxrpc srx;
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	struct msghdr msg;
 	struct kvec iov[2];
 	size_t size;
@@ -558,6 +557,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
 	int ret, ioc;
 
 	_enter("%d", local->debug_id);
+	rxrpc_see_skb(skb, rxrpc_skb_seen);
 
 	iov[0].iov_base = &whdr;
 	iov[0].iov_len = sizeof(whdr);
@@ -571,51 +571,45 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
 
 	memset(&whdr, 0, sizeof(whdr));
 
-	while ((skb = skb_dequeue(&local->reject_queue))) {
-		rxrpc_see_skb(skb, rxrpc_skb_seen);
-		sp = rxrpc_skb(skb);
-
-		switch (skb->mark) {
-		case RXRPC_SKB_MARK_REJECT_BUSY:
-			whdr.type = RXRPC_PACKET_TYPE_BUSY;
-			size = sizeof(whdr);
-			ioc = 1;
-			break;
-		case RXRPC_SKB_MARK_REJECT_ABORT:
-			whdr.type = RXRPC_PACKET_TYPE_ABORT;
-			code = htonl(skb->priority);
-			size = sizeof(whdr) + sizeof(code);
-			ioc = 2;
-			break;
-		default:
-			rxrpc_free_skb(skb, rxrpc_skb_freed);
-			continue;
-		}
-
-		if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
-			msg.msg_namelen = srx.transport_len;
-
-			whdr.epoch	= htonl(sp->hdr.epoch);
-			whdr.cid	= htonl(sp->hdr.cid);
-			whdr.callNumber	= htonl(sp->hdr.callNumber);
-			whdr.serviceId	= htons(sp->hdr.serviceId);
-			whdr.flags	= sp->hdr.flags;
-			whdr.flags	^= RXRPC_CLIENT_INITIATED;
-			whdr.flags	&= RXRPC_CLIENT_INITIATED;
-
-			iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
-			ret = do_udp_sendmsg(local->socket, &msg, size);
-			if (ret < 0)
-				trace_rxrpc_tx_fail(local->debug_id, 0, ret,
-						    rxrpc_tx_point_reject);
-			else
-				trace_rxrpc_tx_packet(local->debug_id, &whdr,
-						      rxrpc_tx_point_reject);
-		}
-
-		rxrpc_free_skb(skb, rxrpc_skb_freed);
+	switch (skb->mark) {
+	case RXRPC_SKB_MARK_REJECT_BUSY:
+		whdr.type = RXRPC_PACKET_TYPE_BUSY;
+		size = sizeof(whdr);
+		ioc = 1;
+		break;
+	case RXRPC_SKB_MARK_REJECT_ABORT:
+		whdr.type = RXRPC_PACKET_TYPE_ABORT;
+		code = htonl(skb->priority);
+		size = sizeof(whdr) + sizeof(code);
+		ioc = 2;
+		break;
+	default:
+		goto out;
 	}
 
+	if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
+		msg.msg_namelen = srx.transport_len;
+
+		whdr.epoch	= htonl(sp->hdr.epoch);
+		whdr.cid	= htonl(sp->hdr.cid);
+		whdr.callNumber	= htonl(sp->hdr.callNumber);
+		whdr.serviceId	= htons(sp->hdr.serviceId);
+		whdr.flags	= sp->hdr.flags;
+		whdr.flags	^= RXRPC_CLIENT_INITIATED;
+		whdr.flags	&= RXRPC_CLIENT_INITIATED;
+
+		iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
+		ret = do_udp_sendmsg(local->socket, &msg, size);
+		if (ret < 0)
+			trace_rxrpc_tx_fail(local->debug_id, 0, ret,
+					    rxrpc_tx_point_reject);
+		else
+			trace_rxrpc_tx_packet(local->debug_id, &whdr,
+					      rxrpc_tx_point_reject);
+	}
+
+out:
+	rxrpc_free_skb(skb, rxrpc_skb_freed);
 	_leave("");
 }
 
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index 087e7d4..a6ee82f 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -509,7 +509,7 @@ static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error_report)
 		rxrpc_see_call(call);
 		if (call->state < RXRPC_CALL_COMPLETE &&
 		    cmpxchg(&call->error_report, 0, error_report) == 0)
-			rxrpc_queue_call(call);
+			rxrpc_poke_call(call, rxrpc_call_poke_error);
 	}
 }
 
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 4f0fa73..f7f4261 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -341,8 +341,7 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
 	if (v == SEQ_START_TOKEN) {
 		seq_printf(seq,
 			   "Proto Local                                          "
-			   " Use Act (txb=%d)\n",
-			   atomic_read(&rxrpc_nr_txbuf));
+			   " Use Act RxQ\n");
 		return 0;
 	}
 
@@ -351,10 +350,11 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
 	sprintf(lbuff, "%pISpc", &local->srx.transport);
 
 	seq_printf(seq,
-		   "UDP   %-47.47s %3u %3u\n",
+		   "UDP   %-47.47s %3u %3u %3u\n",
 		   lbuff,
 		   refcount_read(&local->ref),
-		   atomic_read(&local->active_users));
+		   atomic_read(&local->active_users),
+		   local->rx_queue.qlen);
 
 	return 0;
 }
diff --git a/net/rxrpc/receive.c b/net/rxrpc/receive.c
index edfb2f6..013f42d 100644
--- a/net/rxrpc/receive.c
+++ b/net/rxrpc/receive.c
@@ -1067,27 +1067,35 @@ static void rxrpc_receive_call_packet(struct rxrpc_call *call, struct sk_buff *s
  *
  * TODO: If callNumber > call_id + 1, renegotiate security.
  */
-static void rxrpc_receive_implicit_end_call(struct rxrpc_sock *rx,
-					    struct rxrpc_call *call)
+void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn = call->conn;
+	struct rxrpc_sock *rx = rcu_access_pointer(call->socket);
 
-	switch (READ_ONCE(call->state)) {
-	case RXRPC_CALL_SERVER_AWAIT_ACK:
-		rxrpc_call_completed(call);
-		fallthrough;
-	case RXRPC_CALL_COMPLETE:
-		break;
-	default:
-		if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN))
-			rxrpc_send_abort_packet(call);
-		trace_rxrpc_improper_term(call);
-		break;
+	/* We present the message to the call we're terminating to make sure it
+	 * gets terminated.
+	 */
+	if (call->state < RXRPC_CALL_COMPLETE) {
+		set_bit(RXRPC_CALL_IS_DEAD, &call->flags);
+		rxrpc_input_call_packet(call, skb);
+
+		switch (READ_ONCE(call->state)) {
+		case RXRPC_CALL_SERVER_AWAIT_ACK:
+			rxrpc_call_completed(call);
+			fallthrough;
+		case RXRPC_CALL_COMPLETE:
+			break;
+		default:
+			if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN))
+				rxrpc_send_abort_packet(call);
+			trace_rxrpc_improper_term(call);
+			break;
+		}
+
+		spin_lock_bh(&rx->incoming_lock);
+		__rxrpc_disconnect_call(conn, call);
+		spin_unlock_bh(&rx->incoming_lock);
 	}
-
-	spin_lock_bh(&rx->incoming_lock);
-	__rxrpc_disconnect_call(conn, call);
-	spin_unlock_bh(&rx->incoming_lock);
 }
 
 /*
@@ -1096,13 +1104,6 @@ static void rxrpc_receive_implicit_end_call(struct rxrpc_sock *rx,
 void rxrpc_receive(struct rxrpc_call *call, struct sk_buff *skb)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-	struct rxrpc_sock *rx = rcu_access_pointer(call->socket);
-
-	if (sp->hdr.callNumber != call->call_id) {
-		rxrpc_free_skb(skb, rxrpc_skb_freed);
-		rxrpc_receive_implicit_end_call(rx, call);
-		return;
-	}
 
 	if (sp->hdr.serviceId != call->service_id)
 		call->service_id = sp->hdr.serviceId;
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 8ca444e..0e3f089 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -267,7 +267,7 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 				  &call->ackr_nr_consumed);
 	if (acked > 2 &&
 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
-		rxrpc_queue_call(call);
+		rxrpc_poke_call(call, rxrpc_call_poke_idle);
 }
 
 /*
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 11cbaf0..5c5fb97 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -170,7 +170,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 {
 	unsigned long now;
 	rxrpc_seq_t seq = txb->seq;
-	bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
+	bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
 
 	rxrpc_inc_stat(call->rxnet, stat_tx_data);
 
@@ -188,6 +188,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 
 	/* Add the packet to the call's output buffer */
 	spin_lock(&call->tx_lock);
+	poke = list_empty(&call->tx_sendmsg);
 	list_add_tail(&txb->call_link, &call->tx_sendmsg);
 	call->tx_prepared = seq;
 	spin_unlock(&call->tx_lock);
@@ -220,11 +221,8 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 		write_unlock(&call->state_lock);
 	}
 
-
-	/* Stick the packet on the crypto queue or the transmission queue as
-	 * appropriate.
-	 */
-	rxrpc_queue_call(call);
+	if (poke)
+		rxrpc_poke_call(call, rxrpc_call_poke_start);
 }
 
 /*
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index 1062fd4..0c827d5 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -66,7 +66,6 @@ void rxrpc_free_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
 {
 	const void *here = __builtin_return_address(0);
 	if (skb) {
-		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 		int n;
 		n = atomic_dec_return(select_skb_count(skb));
 		trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n, here);
@@ -82,7 +81,6 @@ void rxrpc_purge_queue(struct sk_buff_head *list)
 	const void *here = __builtin_return_address(0);
 	struct sk_buff *skb;
 	while ((skb = skb_dequeue((list))) != NULL) {
-		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 		int n = atomic_dec_return(select_skb_count(skb));
 		trace_rxrpc_skb(skb, rxrpc_skb_purged,
 				refcount_read(&skb->users), n, here);