rxrpc: Implement a single I/O thread per local endpoint
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index ddf12f8..f87bc8f 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -16,6 +16,13 @@
/*
* Declare tracing information enums and their string mappings for display.
*/
+#define rxrpc_call_poke_traces \
+ EM(rxrpc_call_poke_error, "Error") \
+ EM(rxrpc_call_poke_idle, "Idle") \
+ EM(rxrpc_call_poke_start, "Start") \
+ EM(rxrpc_call_poke_timer, "Timer") \
+ E_(rxrpc_call_poke_timer_now, "Timer-now")
+
#define rxrpc_skb_traces \
EM(rxrpc_skb_ack, "ACK") \
EM(rxrpc_skb_cleaned, "CLE") \
@@ -75,6 +82,7 @@
EM(rxrpc_call_error, "*E*") \
EM(rxrpc_call_got, "GOT") \
EM(rxrpc_call_got_kernel, "Gke") \
+ EM(rxrpc_call_got_poke, "Gpo") \
EM(rxrpc_call_got_timer, "GTM") \
EM(rxrpc_call_got_tx, "Gtx") \
EM(rxrpc_call_got_userid, "Gus") \
@@ -84,6 +92,7 @@
EM(rxrpc_call_put_kernel, "Pke") \
EM(rxrpc_call_put_noqueue, "PnQ") \
EM(rxrpc_call_put_notimer, "PnT") \
+ EM(rxrpc_call_put_poke, "Ppo") \
EM(rxrpc_call_put_timer, "PTM") \
EM(rxrpc_call_put_tx, "Ptx") \
EM(rxrpc_call_put_userid, "Pus") \
@@ -289,6 +298,7 @@
#define EM(a, b) a,
#define E_(a, b) a
+enum rxrpc_call_poke_trace { rxrpc_call_poke_traces } __mode(byte);
enum rxrpc_call_trace { rxrpc_call_traces } __mode(byte);
enum rxrpc_client_trace { rxrpc_client_traces } __mode(byte);
enum rxrpc_congest_change { rxrpc_congest_changes } __mode(byte);
@@ -318,6 +328,7 @@ enum rxrpc_txqueue_trace { rxrpc_txqueue_traces } __mode(byte);
#define EM(a, b) TRACE_DEFINE_ENUM(a);
#define E_(a, b) TRACE_DEFINE_ENUM(a);
+rxrpc_call_poke_traces;
rxrpc_call_traces;
rxrpc_client_traces;
rxrpc_congest_changes;
@@ -1598,6 +1609,47 @@ TRACE_EVENT(rxrpc_txbuf,
__entry->ref)
);
+TRACE_EVENT(rxrpc_poke_call,
+ TP_PROTO(struct rxrpc_call *call, bool busy,
+ enum rxrpc_call_poke_trace what),
+
+ TP_ARGS(call, busy, what),
+
+ TP_STRUCT__entry(
+ __field(unsigned int, call_debug_id )
+ __field(bool, busy )
+ __field(enum rxrpc_call_poke_trace, what )
+ ),
+
+ TP_fast_assign(
+ __entry->call_debug_id = call->debug_id;
+ __entry->busy = busy;
+ __entry->what = what;
+ ),
+
+ TP_printk("c=%08x %s%s",
+ __entry->call_debug_id,
+ __print_symbolic(__entry->what, rxrpc_call_poke_traces),
+ __entry->busy ? "!" : "")
+ );
+
+TRACE_EVENT(rxrpc_call_poked,
+ TP_PROTO(struct rxrpc_call *call),
+
+ TP_ARGS(call),
+
+ TP_STRUCT__entry(
+ __field(unsigned int, call_debug_id )
+ ),
+
+ TP_fast_assign(
+ __entry->call_debug_id = call->debug_id;
+ ),
+
+ TP_printk("c=%08x",
+ __entry->call_debug_id)
+ );
+
#undef EM
#undef E_
#endif /* _TRACE_RXRPC_H */
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index fbab970..f3385fa 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -195,6 +195,7 @@ struct rxrpc_host_header {
* - max 48 bytes (struct sk_buff::cb)
*/
struct rxrpc_skb_priv {
+ struct rxrpc_call *call; /* Call referred to (poke packet) */
u16 offset; /* Offset of data */
u16 len; /* Length of data */
u8 flags;
@@ -279,11 +280,10 @@ struct rxrpc_local {
struct rxrpc_net *rxnet; /* The network ns in which this resides */
struct hlist_node link;
struct socket *socket; /* my UDP socket */
- struct work_struct processor;
+ struct task_struct *io_thread;
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
- struct sk_buff_head reject_queue; /* packets awaiting rejection */
- struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
+ struct sk_buff_head rx_queue; /* Received packets */
struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */
spinlock_t lock; /* access lock */
@@ -504,15 +504,13 @@ enum rxrpc_call_flag {
RXRPC_CALL_UPGRADE, /* Service upgrade was requested for the call */
RXRPC_CALL_RX_IS_IDLE, /* Reception is idle - send an ACK */
RXRPC_CALL_IS_DEAD, /* The call is not interested in any further packets */
+ RXRPC_CALL_START_TIMERS, /* The timers need starting */
};
/*
* Events that can be raised on a call.
*/
enum rxrpc_call_event {
- RXRPC_CALL_EV_ABORT, /* need to generate abort */
- RXRPC_CALL_EV_RESEND, /* Tx resend required */
- RXRPC_CALL_EV_EXPIRED, /* Expiry occurred */
RXRPC_CALL_EV_ACK_LOST, /* ACK may be lost, send ping */
RXRPC_CALL_EV_INITIAL_PING, /* Send initial ping for a new service call */
};
@@ -583,7 +581,7 @@ struct rxrpc_call {
u32 next_req_timo; /* Timeout for next Rx request packet (jif) */
struct skcipher_request *cipher_req; /* Packet cipher request buffer */
struct timer_list timer; /* Combined event timer */
- struct work_struct processor; /* Event processor */
+ struct work_struct destructor; /* Call destructor */
rxrpc_notify_rx_t notify_rx; /* kernel service Rx notification function */
struct list_head link; /* link in master call list */
struct list_head chan_wait_link; /* Link in conn->bundle->waiting_calls */
@@ -592,6 +590,7 @@ struct rxrpc_call {
struct list_head recvmsg_link; /* Link in rx->recvmsg_q */
struct list_head sock_link; /* Link in rx->sock_calls */
struct rb_node sock_node; /* Node in rx->calls */
+ struct sk_buff *poke; /* Socket buffer used to poke the call */
struct rxrpc_txbuf *tx_pending; /* Tx buffer being filled */
wait_queue_head_t waitq; /* Wait queue for channel or Tx */
s64 tx_total_len; /* Total length left to be transmitted (or -1) */
@@ -631,7 +630,6 @@ struct rxrpc_call {
ktime_t tx_last_sent; /* Last time a transmission occurred */
/* Received data tracking */
- struct sk_buff_head input_queue; /* Received call-level packets */
struct sk_buff_head rx_queue; /* Queue of packets ready for recvmsg() */
struct sk_buff_head rx_oos_queue; /* Queue of out of sequence packets */
@@ -816,6 +814,7 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
enum rxrpc_timer_trace why);
void rxrpc_delete_call_timer(struct rxrpc_call *call);
+void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb);
/*
* call_object.c
@@ -824,6 +823,7 @@ extern const char *const rxrpc_call_states[];
extern const char *const rxrpc_call_completions[];
extern struct kmem_cache *rxrpc_call_jar;
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
@@ -835,8 +835,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
struct sk_buff *);
void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
-bool __rxrpc_queue_call(struct rxrpc_call *);
-bool rxrpc_queue_call(struct rxrpc_call *);
void rxrpc_see_call(struct rxrpc_call *);
bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op);
void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
@@ -880,6 +878,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *);
*/
void rxrpc_process_connection(struct work_struct *);
void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
+int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
/*
* conn_object.c
@@ -943,6 +942,11 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
* input.c
*/
int rxrpc_input_packet(struct sock *, struct sk_buff *);
+int rxrpc_io_thread(void *data);
+static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
+{
+ wake_up_process(local->io_thread);
+}
/*
* insecure.c
@@ -961,7 +965,9 @@ int rxrpc_get_server_data_key(struct rxrpc_connection *, const void *, time64_t,
/*
* local_event.c
*/
-extern void rxrpc_process_local_events(struct rxrpc_local *);
+void rxrpc_send_version_request(struct rxrpc_local *local,
+ struct rxrpc_host_header *hdr,
+ struct sk_buff *skb);
/*
* local_object.c
@@ -972,7 +978,7 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *);
void rxrpc_put_local(struct rxrpc_local *);
struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *);
void rxrpc_unuse_local(struct rxrpc_local *);
-void rxrpc_queue_local(struct rxrpc_local *);
+void rxrpc_destroy_local(struct rxrpc_local *local);
void rxrpc_destroy_all_locals(struct rxrpc_net *);
static inline bool __rxrpc_unuse_local(struct rxrpc_local *local)
@@ -1012,7 +1018,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
int rxrpc_send_ack_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
int rxrpc_send_abort_packet(struct rxrpc_call *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
-void rxrpc_reject_packets(struct rxrpc_local *);
+void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
void rxrpc_send_keepalive(struct rxrpc_peer *);
void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
@@ -1050,6 +1056,7 @@ extern const struct seq_operations rxrpc_local_seq_ops;
/*
* receive.c
*/
+void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb);
void rxrpc_receive(struct rxrpc_call *call, struct sk_buff *skb);
/*
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 459855a..f4b88d5 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -290,30 +290,6 @@ static void rxrpc_send_initial_ping(struct rxrpc_call *call)
rxrpc_propose_ack_ping_for_params);
}
-/*
- * Decant some if the sendmsg prepared queue into the transmission buffer.
- */
-static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
-{
- struct rxrpc_txbuf *txb;
-
- if (rxrpc_is_client_call(call) &&
- !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
- rxrpc_expose_client_call(call);
-
- if ((txb = list_first_entry_or_null(&call->tx_sendmsg,
- struct rxrpc_txbuf, call_link))) {
- spin_lock(&call->tx_lock);
- list_del(&txb->call_link);
- spin_unlock(&call->tx_lock);
-
- call->tx_top = txb->seq;
- list_add_tail(&txb->call_link, &call->tx_buffer);
-
- rxrpc_transmit_one(call, txb);
- }
-}
-
static bool rxrpc_can_decant(struct rxrpc_call *call)
{
unsigned int winsize;
@@ -329,16 +305,39 @@ static bool rxrpc_can_decant(struct rxrpc_call *call)
}
/*
+ * Decant some if the sendmsg prepared queue into the transmission buffer.
+ */
+static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+{
+ struct rxrpc_txbuf *txb;
+
+ if (rxrpc_is_client_call(call) &&
+ !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
+ rxrpc_expose_client_call(call);
+
+ while (rxrpc_can_decant(call) &&
+ (txb = list_first_entry_or_null(&call->tx_sendmsg,
+ struct rxrpc_txbuf, call_link))) {
+ spin_lock(&call->tx_lock);
+ list_del(&txb->call_link);
+ spin_unlock(&call->tx_lock);
+
+ call->tx_top = txb->seq;
+ list_add_tail(&txb->call_link, &call->tx_buffer);
+
+ rxrpc_transmit_one(call, txb);
+ }
+}
+
+/*
* Handle retransmission and deferred ACK/abort generation.
*/
-void rxrpc_process_call(struct work_struct *work)
+void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
{
- struct rxrpc_call *call =
- container_of(work, struct rxrpc_call, processor);
- struct sk_buff *skb;
unsigned long now, next, t;
- unsigned int iterations = 0, error_report;
+ unsigned int error_report;
rxrpc_serial_t ackr_serial;
+ bool resend = false, expired = false;
rxrpc_see_call(call);
@@ -346,26 +345,12 @@ void rxrpc_process_call(struct work_struct *work)
_enter("{%d,%s,%lx}",
call->debug_id, rxrpc_call_states[call->state], call->events);
-recheck_state:
if (call->acks_hard_ack != call->tx_bottom)
rxrpc_shrink_call_tx_buffer(call);
- /* Limit the number of times we do this before returning to the manager */
- if (!rxrpc_can_decant(call) &&
- skb_queue_empty(&call->input_queue)) {
- iterations++;
- if (iterations > 5)
- goto requeue;
- }
-
- if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
- rxrpc_send_abort_packet(call);
- goto recheck_state;
- }
-
if (call->state == RXRPC_CALL_COMPLETE) {
rxrpc_delete_call_timer(call);
- goto out_put;
+ return;
}
error_report = READ_ONCE(call->error_report);
@@ -378,11 +363,75 @@ void rxrpc_process_call(struct work_struct *work)
rxrpc_set_call_completion(
call, RXRPC_CALL_NETWORK_ERROR, 0, -error_report);
}
- goto out_put;
+ goto out;
}
- if ((skb = skb_dequeue(&call->input_queue)))
+ //if (test_and_clear_bit(RXRPC_CALL_START_TIMERS, &call->flags))
+ // rxrpc_start_call_timer(call);
+
+ /* If we see our async-event poke, check for timeout trippage. */
+ if (skb == call->poke) {
+ now = jiffies;
+ t = READ_ONCE(call->expect_rx_by);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now);
+ expired = true;
+ }
+
+ t = READ_ONCE(call->expect_req_by);
+ if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
+ time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
+ expired = true;
+ }
+
+ t = READ_ONCE(call->expect_term_by);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now);
+ expired = true;
+ }
+
+ t = READ_ONCE(call->delay_ack_at);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
+ cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET);
+ ackr_serial = xchg(&call->ackr_serial, 0);
+ rxrpc_send_ACK(call, RXRPC_ACK_DELAY, ackr_serial,
+ rxrpc_propose_ack_ping_for_lost_ack);
+ }
+
+ t = READ_ONCE(call->ack_lost_at);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now);
+ cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET);
+ set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events);
+ }
+
+ t = READ_ONCE(call->keepalive_at);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
+ cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
+ rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+ rxrpc_propose_ack_ping_for_keepalive);
+ }
+
+ t = READ_ONCE(call->ping_at);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
+ cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
+ rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
+ rxrpc_propose_ack_ping_for_keepalive);
+ }
+
+ t = READ_ONCE(call->resend_at);
+ if (time_after_eq(now, t)) {
+ trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now);
+ cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET);
+ resend = true;
+ }
+ } else {
rxrpc_receive(call, skb);
+ }
if (rxrpc_can_decant(call))
rxrpc_decant_prepared_tx(call);
@@ -390,68 +439,8 @@ void rxrpc_process_call(struct work_struct *work)
if (!test_and_set_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
rxrpc_send_initial_ping(call);
- /* Work out if any timeouts tripped */
- now = jiffies;
- t = READ_ONCE(call->expect_rx_by);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now);
- set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
- }
-
- t = READ_ONCE(call->expect_req_by);
- if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST &&
- time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now);
- set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
- }
-
- t = READ_ONCE(call->expect_term_by);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now);
- set_bit(RXRPC_CALL_EV_EXPIRED, &call->events);
- }
-
- t = READ_ONCE(call->delay_ack_at);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
- cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET);
- ackr_serial = xchg(&call->ackr_serial, 0);
- rxrpc_send_ACK(call, RXRPC_ACK_DELAY, ackr_serial,
- rxrpc_propose_ack_ping_for_lost_ack);
- }
-
- t = READ_ONCE(call->ack_lost_at);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now);
- cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET);
- set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events);
- }
-
- t = READ_ONCE(call->keepalive_at);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
- cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
- rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
- rxrpc_propose_ack_ping_for_keepalive);
- }
-
- t = READ_ONCE(call->ping_at);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
- cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
- rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
- rxrpc_propose_ack_ping_for_keepalive);
- }
-
- t = READ_ONCE(call->resend_at);
- if (time_after_eq(now, t)) {
- trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now);
- cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET);
- set_bit(RXRPC_CALL_EV_RESEND, &call->events);
- }
-
/* Process events */
- if (test_and_clear_bit(RXRPC_CALL_EV_EXPIRED, &call->events)) {
+ if (expired) {
if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
(int)call->conn->hi_serial - (int)call->rx_serial > 0) {
trace_rxrpc_call_reset(call);
@@ -459,19 +448,16 @@ void rxrpc_process_call(struct work_struct *work)
} else {
rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
}
- set_bit(RXRPC_CALL_EV_ABORT, &call->events);
- goto recheck_state;
+ rxrpc_send_abort_packet(call);
+ goto out;
}
if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_lost_ack);
- if (test_and_clear_bit(RXRPC_CALL_EV_RESEND, &call->events) &&
- call->state != RXRPC_CALL_CLIENT_RECV_REPLY) {
- rxrpc_resend(call, skb);
- goto recheck_state;
- }
+ if (resend && call->state != RXRPC_CALL_CLIENT_RECV_REPLY)
+ rxrpc_resend(call, NULL);
if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
@@ -482,35 +468,28 @@ void rxrpc_process_call(struct work_struct *work)
rxrpc_propose_ack_input_data);
/* Make sure the timer is restarted */
- next = call->expect_rx_by;
+ if (call->state != RXRPC_CALL_COMPLETE) {
+ next = call->expect_rx_by;
#define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; }
- set(call->expect_req_by);
- set(call->expect_term_by);
- set(call->delay_ack_at);
- set(call->ack_lost_at);
- set(call->resend_at);
- set(call->keepalive_at);
- set(call->ping_at);
+ set(call->expect_req_by);
+ set(call->expect_term_by);
+ set(call->delay_ack_at);
+ set(call->ack_lost_at);
+ set(call->resend_at);
+ set(call->keepalive_at);
+ set(call->ping_at);
- now = jiffies;
- if (time_after_eq(now, next))
- goto recheck_state;
+ now = jiffies;
+ if (time_after_eq(now, next))
+ rxrpc_poke_call(call, rxrpc_call_poke_timer_now);
- rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
+ rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
+ }
- /* other events may have been raised since we started checking */
- if (call->events && call->state < RXRPC_CALL_COMPLETE)
- goto requeue;
-
-out_put:
- rxrpc_put_call(call, rxrpc_call_put);
out:
+ if (call->state == RXRPC_CALL_COMPLETE)
+ rxrpc_delete_call_timer(call);
_leave("");
- return;
-
-requeue:
- __rxrpc_queue_call(call);
- goto out;
}
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 8b8937f..ca7601b 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -45,6 +45,33 @@ static struct semaphore rxrpc_call_limiter =
static struct semaphore rxrpc_kernel_call_limiter =
__SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
+static void rxrpc_destroy_call(struct work_struct *work);
+
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
+{
+ struct rxrpc_local *local;
+ struct rxrpc_peer *peer = call->peer;
+ unsigned long flags;
+ bool busy;
+
+ if (WARN_ON_ONCE(!peer))
+ return;
+ local = peer->local;
+
+ if (call->state < RXRPC_CALL_COMPLETE) {
+ spin_lock_irqsave(&local->rx_queue.lock, flags);
+ busy = (call->poke->next != NULL);
+ trace_rxrpc_poke_call(call, busy, what);
+ if (!busy) {
+ rxrpc_get_call(call, rxrpc_call_got_poke);
+ rxrpc_get_skb(call->poke, rxrpc_skb_got);
+ __skb_queue_tail(&local->rx_queue, call->poke);
+ }
+ spin_unlock_irqrestore(&local->rx_queue.lock, flags);
+ rxrpc_wake_up_io_thread(local);
+ }
+}
+
static void rxrpc_call_timer_expired(struct timer_list *t)
{
struct rxrpc_call *call = from_timer(call, t, timer);
@@ -53,9 +80,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t)
if (call->state < RXRPC_CALL_COMPLETE) {
trace_rxrpc_timer_expired(call, jiffies);
- __rxrpc_queue_call(call);
- } else {
- rxrpc_put_call(call, rxrpc_call_put);
+ rxrpc_poke_call(call, rxrpc_call_poke_timer);
}
}
@@ -64,17 +89,13 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
unsigned long now,
enum rxrpc_timer_trace why)
{
- if (rxrpc_try_get_call(call, rxrpc_call_got_timer)) {
- trace_rxrpc_timer(call, why, now);
- if (timer_reduce(&call->timer, expire_at))
- rxrpc_put_call(call, rxrpc_call_put_notimer);
- }
+ trace_rxrpc_timer(call, why, now);
+ timer_reduce(&call->timer, expire_at);
}
void rxrpc_delete_call_timer(struct rxrpc_call *call)
{
- if (del_timer_sync(&call->timer))
- rxrpc_put_call(call, rxrpc_call_put_timer);
+ del_timer_sync(&call->timer);
}
static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
@@ -122,6 +143,7 @@ struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
unsigned int debug_id)
{
+ struct rxrpc_skb_priv *sp;
struct rxrpc_call *call;
struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
@@ -129,6 +151,16 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
if (!call)
return NULL;
+ call->poke = alloc_skb(0, gfp);
+ if (!call->poke) {
+ kmem_cache_free(rxrpc_call_jar, call);
+ return NULL;
+ }
+
+ rxrpc_new_skb(call->poke, rxrpc_skb_new);
+ sp = rxrpc_skb(call->poke);
+ sp->call = call;
+
mutex_init(&call->user_mutex);
/* Prevent lockdep reporting a deadlock false positive between the afs
@@ -139,7 +171,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
&rxrpc_call_user_mutex_lock_class_key);
timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
- INIT_WORK(&call->processor, &rxrpc_process_call);
+ INIT_WORK(&call->destructor, rxrpc_destroy_call);
INIT_LIST_HEAD(&call->link);
INIT_LIST_HEAD(&call->chan_wait_link);
INIT_LIST_HEAD(&call->accept_link);
@@ -147,7 +179,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
INIT_LIST_HEAD(&call->sock_link);
INIT_LIST_HEAD(&call->tx_sendmsg);
INIT_LIST_HEAD(&call->tx_buffer);
- skb_queue_head_init(&call->input_queue);
skb_queue_head_init(&call->rx_queue);
skb_queue_head_init(&call->rx_oos_queue);
init_waitqueue_head(&call->waitq);
@@ -160,6 +191,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
call->next_rx_timo = 20 * HZ;
call->next_req_timo = 1 * HZ;
atomic64_set(&call->ackr_window, 0x100000001ULL);
+ __set_bit(RXRPC_CALL_START_TIMERS, &call->flags);
memset(&call->sock_node, 0xed, sizeof(call->sock_node));
@@ -219,6 +251,7 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
call->ack_lost_at = j;
call->resend_at = j;
call->ping_at = j;
+ call->keepalive_at = j;
call->expect_rx_by = j;
call->expect_req_by = j;
call->expect_term_by = j;
@@ -347,7 +380,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
trace_rxrpc_call(call->debug_id, rxrpc_call_connected,
refcount_read(&call->ref), here, NULL);
-
rxrpc_start_call_timer(call);
_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
@@ -455,40 +487,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
}
/*
- * Queue a call's work processor, getting a ref to pass to the work queue.
- */
-bool rxrpc_queue_call(struct rxrpc_call *call)
-{
- const void *here = __builtin_return_address(0);
- int n;
-
- if (!__refcount_inc_not_zero(&call->ref, &n))
- return false;
- if (rxrpc_queue_work(&call->processor))
- trace_rxrpc_call(call->debug_id, rxrpc_call_queued, n + 1,
- here, NULL);
- else
- rxrpc_put_call(call, rxrpc_call_put_noqueue);
- return true;
-}
-
-/*
- * Queue a call's work processor, passing the callers ref to the work queue.
- */
-bool __rxrpc_queue_call(struct rxrpc_call *call)
-{
- const void *here = __builtin_return_address(0);
- int n = refcount_read(&call->ref);
- ASSERTCMP(n, >=, 1);
- if (rxrpc_queue_work(&call->processor))
- trace_rxrpc_call(call->debug_id, rxrpc_call_queued_ref, n,
- here, NULL);
- else
- rxrpc_put_call(call, rxrpc_call_put_noqueue);
- return true;
-}
-
-/*
* Note the re-emergence of a call.
*/
void rxrpc_see_call(struct rxrpc_call *call)
@@ -659,11 +657,12 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
*/
static void rxrpc_destroy_call(struct work_struct *work)
{
- struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
+ struct rxrpc_call *call = container_of(work, struct rxrpc_call, destructor);
struct rxrpc_net *rxnet = call->rxnet;
rxrpc_delete_call_timer(call);
+ rxrpc_free_skb(call->poke, rxrpc_skb_cleaned);
rxrpc_put_connection(call->conn);
rxrpc_put_peer(call->peer);
kmem_cache_free(rxrpc_call_jar, call);
@@ -679,11 +678,10 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
if (in_softirq()) {
- INIT_WORK(&call->processor, rxrpc_destroy_call);
- if (!rxrpc_queue_work(&call->processor))
+ if (!rxrpc_queue_work(&call->destructor))
BUG();
} else {
- rxrpc_destroy_call(&call->processor);
+ rxrpc_destroy_call(&call->destructor);
}
}
@@ -713,8 +711,6 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
}
rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
- rxrpc_purge_queue(&call->input_queue);
-
call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
}
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index f606ccd..111108a 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -497,3 +497,77 @@ void rxrpc_process_connection(struct work_struct *work)
_leave("");
return;
}
+
+/*
+ * post connection-level events to the connection
+ * - this includes challenges, responses, some aborts and call terminal packet
+ * retransmission.
+ */
+static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
+ struct sk_buff *skb)
+{
+ _enter("%p,%p", conn, skb);
+
+ skb_queue_tail(&conn->rx_queue, skb);
+ rxrpc_queue_conn(conn);
+}
+
+/*
+ * Input a connection-level packet.
+ */
+int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
+{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ __be32 wtmp;
+ u32 abort_code;
+
+ if (conn->state >= RXRPC_CONN_REMOTELY_ABORTED) {
+ _leave(" = -ECONNABORTED [%u]", conn->state);
+ return -ECONNABORTED;
+ }
+
+ _enter("{%d},{%u,%%%u},", conn->debug_id, sp->hdr.type, sp->hdr.serial);
+
+ switch (sp->hdr.type) {
+ case RXRPC_PACKET_TYPE_DATA:
+ case RXRPC_PACKET_TYPE_ACK:
+ rxrpc_conn_retransmit_call(conn, skb,
+ sp->hdr.cid & RXRPC_CHANNELMASK);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ return 0;
+
+ case RXRPC_PACKET_TYPE_BUSY:
+ /* Just ignore BUSY packets for now. */
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ return 0;
+
+ case RXRPC_PACKET_TYPE_ABORT:
+ if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
+ &wtmp, sizeof(wtmp)) < 0) {
+ trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
+ tracepoint_string("bad_abort"));
+ return -EPROTO;
+ }
+ abort_code = ntohl(wtmp);
+ _proto("Rx ABORT %%%u { ac=%d }", sp->hdr.serial, abort_code);
+
+ conn->error = -ECONNABORTED;
+ conn->abort_code = abort_code;
+ conn->state = RXRPC_CONN_REMOTELY_ABORTED;
+ set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
+ rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED, sp->hdr.serial);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ return -ECONNABORTED;
+
+ case RXRPC_PACKET_TYPE_CHALLENGE:
+ case RXRPC_PACKET_TYPE_RESPONSE:
+ rxrpc_post_packet_to_conn(conn, skb);
+ return 0;
+
+ default:
+ trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
+ tracepoint_string("bad_conn_pkt"));
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ return -EPROTO;
+ }
+}
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 5a0a7b2..05894a9 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/* RxRPC packet reception
*
- * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
* Written by David Howells (dhowells@redhat.com)
*/
@@ -10,103 +10,58 @@
#include "ar-internal.h"
/*
- * Post a call-level packet to a call.
- */
-static void rxrpc_post_packet_to_call(struct rxrpc_call *call, struct sk_buff *skb)
-{
- if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
- set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
- if (!test_bit(RXRPC_CALL_IS_DEAD, &call->flags)) {
- skb_queue_tail(&call->input_queue, skb);
- rxrpc_queue_call(call);
- } else {
- rxrpc_free_skb(skb, rxrpc_skb_freed);
- }
-}
-
-/*
- * Handle a new service call on a channel implicitly completing the preceding
- * call on that channel. This does not apply to client conns.
+ * handle data received on the local endpoint
+ * - may be called in interrupt context
*
- * TODO: If callNumber > call_id + 1, renegotiate security.
+ * [!] Note that as this is called from the encap_rcv hook, the socket is not
+ * held locked by the caller and nothing prevents sk_user_data on the UDP from
+ * being cleared in the middle of processing this function.
+ *
+ * Called with the RCU read lock held from the IP layer via UDP.
*/
-static struct sk_buff *rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
- struct rxrpc_channel *chan,
- struct rxrpc_call *call,
- struct sk_buff *skb)
+int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
{
- struct sk_buff *copy;
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
- /* We post a copy of the message to the call we're terminating to make
- * sure it gets terminated. If we fail to copy the message, we steal
- * the original and hope the server sends us a new copy.
- */
- if (call->state < RXRPC_CALL_COMPLETE &&
- cmpxchg(&chan->call, call, NULL) == call) {
- set_bit(RXRPC_CALL_IS_DEAD, &call->flags);
- copy = skb_clone(skb, GFP_ATOMIC | __GFP_NOWARN);
- if (copy) {
- rxrpc_new_skb(skb, rxrpc_skb_new);
- } else {
- copy = skb;
- skb = NULL;
- }
- skb_queue_tail(&call->input_queue, copy);
- rxrpc_queue_call(call);
+ if (unlikely(!local)) {
+ kfree_skb(skb);
+ return 0;
}
+ if (skb->tstamp == 0)
+ skb->tstamp = ktime_get_real();
- return skb;
+ rxrpc_new_skb(skb, rxrpc_skb_received);
+ memset(sp, 0, sizeof(*sp));
+ skb_queue_tail(&local->rx_queue, skb);
+ rxrpc_wake_up_io_thread(local);
+ return 0;
}
/*
- * post connection-level events to the connection
- * - this includes challenges, responses, some aborts and call terminal packet
- * retransmission.
+ * Process event packets targeted at a local endpoint.
*/
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
- struct sk_buff *skb)
+static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
{
- _enter("%p,%p", conn, skb);
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ char v;
- skb_queue_tail(&conn->rx_queue, skb);
- rxrpc_queue_conn(conn);
-}
+ _enter("");
-/*
- * post endpoint-level events to the local endpoint
- * - this includes debug and version messages
- */
-static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
- struct sk_buff *skb)
-{
- _enter("%p,%p", local, skb);
-
- if (rxrpc_get_local_maybe(local)) {
- skb_queue_tail(&local->event_queue, skb);
- rxrpc_queue_local(local);
- } else {
- rxrpc_free_skb(skb, rxrpc_skb_freed);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
+ if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
+ _proto("Rx VERSION { %02x }", v);
+ if (v == 0)
+ rxrpc_send_version_request(local, &sp->hdr, skb);
}
-}
-/*
- * put a packet up for transport-level abort
- */
-static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
-{
- if (rxrpc_get_local_maybe(local)) {
- skb_queue_tail(&local->reject_queue, skb);
- rxrpc_queue_local(local);
- } else {
- rxrpc_free_skb(skb, rxrpc_skb_freed);
- }
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
}
/*
* Extract the wire header from a packet and translate the byte order.
*/
-static noinline
-int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
+static int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
{
struct rxrpc_wire_header whdr;
@@ -117,7 +72,6 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
return -EBADMSG;
}
- memset(sp, 0, sizeof(*sp));
sp->hdr.epoch = ntohl(whdr.epoch);
sp->hdr.cid = ntohl(whdr.cid);
sp->hdr.callNumber = ntohl(whdr.callNumber);
@@ -133,18 +87,11 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
}
/*
- * handle data received on the local endpoint
- * - may be called in interrupt context
- *
- * [!] Note that as this is called from the encap_rcv hook, the socket is not
- * held locked by the caller and nothing prevents sk_user_data on the UDP from
- * being cleared in the middle of processing this function.
- *
- * Called with the RCU read lock held from the IP layer via UDP.
+ * Process a socket buffer, distributing it to the appropriate connection or
+ * call.
*/
-int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
+static void rxrpc_input_one_packet(struct rxrpc_local *local, struct sk_buff *skb)
{
- struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
struct rxrpc_connection *conn;
struct rxrpc_channel *chan;
struct rxrpc_call *call = NULL;
@@ -153,23 +100,17 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
struct rxrpc_sock *rx = NULL;
unsigned int channel;
- _enter("%p", udp_sk);
-
- if (unlikely(!local)) {
- kfree_skb(skb);
- return 0;
- }
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
-
- rxrpc_new_skb(skb, rxrpc_skb_received);
+ _enter("");
skb_pull(skb, sizeof(struct udphdr));
- /* The UDP protocol already released all skb resources;
- * we are free to add our own data there.
- */
sp = rxrpc_skb(skb);
+ if (sp->call) {
+ trace_rxrpc_call_poked(sp->call);
+ rxrpc_input_call_packet(sp->call, skb);
+ rxrpc_put_call(sp->call, rxrpc_call_put_poke);
+ goto discard;
+ }
/* dig out the RxRPC connection details */
if (rxrpc_extract_header(sp, skb) < 0)
@@ -180,19 +121,17 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
if ((lose++ & 7) == 7) {
trace_rxrpc_rx_lose(sp);
rxrpc_free_skb(skb, rxrpc_skb_lost);
- return 0;
+ return;
}
}
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
trace_rxrpc_rx_packet(sp);
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_VERSION:
if (rxrpc_to_client(sp))
goto discard;
- rxrpc_post_packet_to_local(local, skb);
+ rxrpc_input_version(local, skb);
goto out;
case RXRPC_PACKET_TYPE_BUSY:
@@ -272,7 +211,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
if (sp->hdr.callNumber == 0) {
/* Connection-level packet */
_debug("CONN %p {%d}", conn, conn->debug_id);
- rxrpc_post_packet_to_conn(conn, skb);
+ rxrpc_input_conn_packet(conn, skb);
goto out;
}
@@ -307,7 +246,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
sp->hdr.seq,
sp->hdr.serial,
sp->hdr.flags);
- rxrpc_post_packet_to_conn(conn, skb);
+ rxrpc_input_conn_packet(conn, skb);
goto out;
}
@@ -317,11 +256,10 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
if (rxrpc_to_client(sp))
goto reject_packet;
if (call) {
- skb = rxrpc_input_implicit_end_call(rx, chan, call, skb);
- if (!skb)
- goto out;
+ rxrpc_implicit_end_call(call, skb);
+ chan->call = NULL;
+ call = NULL;
}
- call = NULL;
}
}
@@ -339,14 +277,16 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
/* Process a call packet; this either discards or passes on the ref
* elsewhere.
*/
- rxrpc_post_packet_to_call(call, skb);
+ if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
+ set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
+ rxrpc_input_call_packet(call, skb);
goto out;
discard:
rxrpc_free_skb(skb, rxrpc_skb_freed);
out:
trace_rxrpc_rx_done(0, 0);
- return 0;
+ return;
wrong_security:
trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
@@ -376,5 +316,44 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
trace_rxrpc_rx_done(skb->mark, skb->priority);
rxrpc_reject_packet(local, skb);
_leave(" [badmsg]");
+ return;
+}
+
+/*
+ * I/O and event handling thread.
+ */
+int rxrpc_io_thread(void *data)
+{
+ struct sk_buff_head rx_queue;
+ struct rxrpc_local *local = data;
+ struct sk_buff *skb;
+
+ skb_queue_head_init(&rx_queue);
+
+ set_user_nice(current, MIN_NICE);
+
+ for (;;) {
+ if (!skb_queue_empty(&local->rx_queue)) {
+ spin_lock_irq(&local->rx_queue.lock);
+ skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
+ spin_unlock_irq(&local->rx_queue.lock);
+ }
+
+ while ((skb = __skb_dequeue(&rx_queue)))
+ rxrpc_input_one_packet(local, skb);
+
+ set_current_state(TASK_INTERRUPTIBLE);
+ if (kthread_should_stop())
+ break;
+ if (!skb_queue_empty(&local->rx_queue)) {
+ __set_current_state(TASK_RUNNING);
+ continue;
+ }
+ schedule();
+ }
+
+ __set_current_state(TASK_RUNNING);
+ rxrpc_destroy_local(local);
+ local->io_thread = NULL;
return 0;
}
diff --git a/net/rxrpc/local_event.c b/net/rxrpc/local_event.c
index 19e929c..47474a1 100644
--- a/net/rxrpc/local_event.c
+++ b/net/rxrpc/local_event.c
@@ -21,9 +21,9 @@ static const char rxrpc_version_string[65] = "linux-" UTS_RELEASE " AF_RXRPC";
/*
* Reply to a version request
*/
-static void rxrpc_send_version_request(struct rxrpc_local *local,
- struct rxrpc_host_header *hdr,
- struct sk_buff *skb)
+void rxrpc_send_version_request(struct rxrpc_local *local,
+ struct rxrpc_host_header *hdr,
+ struct sk_buff *skb)
{
struct rxrpc_wire_header whdr;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -75,41 +75,3 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
_leave("");
}
-
-/*
- * Process event packets targeted at a local endpoint.
- */
-void rxrpc_process_local_events(struct rxrpc_local *local)
-{
- struct sk_buff *skb;
- char v;
-
- _enter("");
-
- skb = skb_dequeue(&local->event_queue);
- if (skb) {
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-
- rxrpc_see_skb(skb, rxrpc_skb_seen);
- _debug("{%d},{%u}", local->debug_id, sp->hdr.type);
-
- switch (sp->hdr.type) {
- case RXRPC_PACKET_TYPE_VERSION:
- if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
- &v, 1) < 0)
- return;
- _proto("Rx VERSION { %02x }", v);
- if (v == 0)
- rxrpc_send_version_request(local, &sp->hdr, skb);
- break;
-
- default:
- /* Just ignore anything we don't understand */
- break;
- }
-
- rxrpc_free_skb(skb, rxrpc_skb_freed);
- }
-
- _leave("");
-}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 38ea98f..3098fe9 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -20,7 +20,6 @@
#include <net/af_rxrpc.h>
#include "ar-internal.h"
-static void rxrpc_local_processor(struct work_struct *);
static void rxrpc_local_rcu(struct rcu_head *);
/*
@@ -83,10 +82,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
atomic_set(&local->active_users, 1);
local->rxnet = rxnet;
INIT_HLIST_NODE(&local->link);
- INIT_WORK(&local->processor, rxrpc_local_processor);
init_rwsem(&local->defrag_sem);
- skb_queue_head_init(&local->reject_queue);
- skb_queue_head_init(&local->event_queue);
+ skb_queue_head_init(&local->rx_queue);
local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock);
spin_lock_init(&local->lock);
@@ -110,6 +107,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
struct udp_tunnel_sock_cfg tuncfg = {NULL};
struct sockaddr_rxrpc *srx = &local->srx;
struct udp_port_cfg udp_conf = {0};
+ struct task_struct *io_thread;
struct sock *usk;
int ret;
@@ -169,8 +167,23 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
BUG();
}
+ io_thread = kthread_run(rxrpc_io_thread, local,
+ "krxrpcio/%u", ntohs(udp_conf.local_udp_port));
+ if (IS_ERR(io_thread)) {
+ ret = PTR_ERR(io_thread);
+ goto error_sock;
+ }
+
+ local->io_thread = io_thread;
_leave(" = 0");
return 0;
+
+error_sock:
+ kernel_sock_shutdown(local->socket, SHUT_RDWR);
+ local->socket->sk->sk_user_data = NULL;
+ sock_release(local->socket);
+ local->socket = NULL;
+ return ret;
}
/*
@@ -292,21 +305,6 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
}
/*
- * Queue a local endpoint and pass the caller's reference to the work item.
- */
-void rxrpc_queue_local(struct rxrpc_local *local)
-{
- const void *here = __builtin_return_address(0);
- unsigned int debug_id = local->debug_id;
- int r = refcount_read(&local->ref);
-
- if (rxrpc_queue_work(&local->processor))
- trace_rxrpc_local(debug_id, rxrpc_local_queued, r + 1, here);
- else
- rxrpc_put_local(local);
-}
-
-/*
* Drop a ref on a local endpoint.
*/
void rxrpc_put_local(struct rxrpc_local *local)
@@ -346,16 +344,12 @@ struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local)
/*
* Cease using a local endpoint. Once the number of active users reaches 0, we
- * start the closure of the transport in the work processor.
+ * start the closure of the transport in the I/O thread..
*/
void rxrpc_unuse_local(struct rxrpc_local *local)
{
- if (local) {
- if (__rxrpc_unuse_local(local)) {
- rxrpc_get_local(local);
- rxrpc_queue_local(local);
- }
- }
+ if (local && __rxrpc_unuse_local(local))
+ kthread_stop(local->io_thread);
}
/*
@@ -365,7 +359,7 @@ void rxrpc_unuse_local(struct rxrpc_local *local)
* Closing the socket cannot be done from bottom half context or RCU callback
* context because it might sleep.
*/
-static void rxrpc_local_destroyer(struct rxrpc_local *local)
+void rxrpc_destroy_local(struct rxrpc_local *local)
{
struct socket *socket = local->socket;
struct rxrpc_net *rxnet = local->rxnet;
@@ -388,51 +382,6 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
socket->sk->sk_user_data = NULL;
sock_release(socket);
}
-
- /* At this point, there should be no more packets coming in to the
- * local endpoint.
- */
- rxrpc_purge_queue(&local->reject_queue);
- rxrpc_purge_queue(&local->event_queue);
-}
-
-/*
- * Process events on an endpoint. The work item carries a ref which
- * we must release.
- */
-static void rxrpc_local_processor(struct work_struct *work)
-{
- struct rxrpc_local *local =
- container_of(work, struct rxrpc_local, processor);
- bool again;
-
- if (local->dead)
- return;
-
- trace_rxrpc_local(local->debug_id, rxrpc_local_processing,
- refcount_read(&local->ref), NULL);
-
- do {
- again = false;
- if (!__rxrpc_use_local(local)) {
- rxrpc_local_destroyer(local);
- break;
- }
-
- if (!skb_queue_empty(&local->reject_queue)) {
- rxrpc_reject_packets(local);
- again = true;
- }
-
- if (!skb_queue_empty(&local->event_queue)) {
- rxrpc_process_local_events(local);
- again = true;
- }
-
- __rxrpc_unuse_local(local);
- } while (again);
-
- rxrpc_put_local(local);
}
/*
@@ -444,8 +393,6 @@ static void rxrpc_local_rcu(struct rcu_head *rcu)
_enter("%d", local->debug_id);
- ASSERT(!work_pending(&local->processor));
-
_net("DESTROY LOCAL %d", local->debug_id);
kfree(local);
_leave("");
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 33eb5f9..f823a0e 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -543,14 +543,13 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
}
/*
- * reject packets through the local endpoint
+ * Reject a packet through the local endpoint.
*/
-void rxrpc_reject_packets(struct rxrpc_local *local)
+void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
{
- struct sockaddr_rxrpc srx;
- struct rxrpc_skb_priv *sp;
struct rxrpc_wire_header whdr;
- struct sk_buff *skb;
+ struct sockaddr_rxrpc srx;
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct msghdr msg;
struct kvec iov[2];
size_t size;
@@ -558,6 +557,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
int ret, ioc;
_enter("%d", local->debug_id);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
iov[0].iov_base = &whdr;
iov[0].iov_len = sizeof(whdr);
@@ -571,51 +571,45 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
memset(&whdr, 0, sizeof(whdr));
- while ((skb = skb_dequeue(&local->reject_queue))) {
- rxrpc_see_skb(skb, rxrpc_skb_seen);
- sp = rxrpc_skb(skb);
-
- switch (skb->mark) {
- case RXRPC_SKB_MARK_REJECT_BUSY:
- whdr.type = RXRPC_PACKET_TYPE_BUSY;
- size = sizeof(whdr);
- ioc = 1;
- break;
- case RXRPC_SKB_MARK_REJECT_ABORT:
- whdr.type = RXRPC_PACKET_TYPE_ABORT;
- code = htonl(skb->priority);
- size = sizeof(whdr) + sizeof(code);
- ioc = 2;
- break;
- default:
- rxrpc_free_skb(skb, rxrpc_skb_freed);
- continue;
- }
-
- if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
- msg.msg_namelen = srx.transport_len;
-
- whdr.epoch = htonl(sp->hdr.epoch);
- whdr.cid = htonl(sp->hdr.cid);
- whdr.callNumber = htonl(sp->hdr.callNumber);
- whdr.serviceId = htons(sp->hdr.serviceId);
- whdr.flags = sp->hdr.flags;
- whdr.flags ^= RXRPC_CLIENT_INITIATED;
- whdr.flags &= RXRPC_CLIENT_INITIATED;
-
- iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
- ret = do_udp_sendmsg(local->socket, &msg, size);
- if (ret < 0)
- trace_rxrpc_tx_fail(local->debug_id, 0, ret,
- rxrpc_tx_point_reject);
- else
- trace_rxrpc_tx_packet(local->debug_id, &whdr,
- rxrpc_tx_point_reject);
- }
-
- rxrpc_free_skb(skb, rxrpc_skb_freed);
+ switch (skb->mark) {
+ case RXRPC_SKB_MARK_REJECT_BUSY:
+ whdr.type = RXRPC_PACKET_TYPE_BUSY;
+ size = sizeof(whdr);
+ ioc = 1;
+ break;
+ case RXRPC_SKB_MARK_REJECT_ABORT:
+ whdr.type = RXRPC_PACKET_TYPE_ABORT;
+ code = htonl(skb->priority);
+ size = sizeof(whdr) + sizeof(code);
+ ioc = 2;
+ break;
+ default:
+ goto out;
}
+ if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
+ msg.msg_namelen = srx.transport_len;
+
+ whdr.epoch = htonl(sp->hdr.epoch);
+ whdr.cid = htonl(sp->hdr.cid);
+ whdr.callNumber = htonl(sp->hdr.callNumber);
+ whdr.serviceId = htons(sp->hdr.serviceId);
+ whdr.flags = sp->hdr.flags;
+ whdr.flags ^= RXRPC_CLIENT_INITIATED;
+ whdr.flags &= RXRPC_CLIENT_INITIATED;
+
+ iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
+ ret = do_udp_sendmsg(local->socket, &msg, size);
+ if (ret < 0)
+ trace_rxrpc_tx_fail(local->debug_id, 0, ret,
+ rxrpc_tx_point_reject);
+ else
+ trace_rxrpc_tx_packet(local->debug_id, &whdr,
+ rxrpc_tx_point_reject);
+ }
+
+out:
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
_leave("");
}
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index 087e7d4..a6ee82f 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -509,7 +509,7 @@ static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error_report)
rxrpc_see_call(call);
if (call->state < RXRPC_CALL_COMPLETE &&
cmpxchg(&call->error_report, 0, error_report) == 0)
- rxrpc_queue_call(call);
+ rxrpc_poke_call(call, rxrpc_call_poke_error);
}
}
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index 4f0fa73..f7f4261 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -341,8 +341,7 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
if (v == SEQ_START_TOKEN) {
seq_printf(seq,
"Proto Local "
- " Use Act (txb=%d)\n",
- atomic_read(&rxrpc_nr_txbuf));
+ " Use Act RxQ\n");
return 0;
}
@@ -351,10 +350,11 @@ static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
sprintf(lbuff, "%pISpc", &local->srx.transport);
seq_printf(seq,
- "UDP %-47.47s %3u %3u\n",
+ "UDP %-47.47s %3u %3u %3u\n",
lbuff,
refcount_read(&local->ref),
- atomic_read(&local->active_users));
+ atomic_read(&local->active_users),
+ local->rx_queue.qlen);
return 0;
}
diff --git a/net/rxrpc/receive.c b/net/rxrpc/receive.c
index edfb2f6..013f42d 100644
--- a/net/rxrpc/receive.c
+++ b/net/rxrpc/receive.c
@@ -1067,27 +1067,35 @@ static void rxrpc_receive_call_packet(struct rxrpc_call *call, struct sk_buff *s
*
* TODO: If callNumber > call_id + 1, renegotiate security.
*/
-static void rxrpc_receive_implicit_end_call(struct rxrpc_sock *rx,
- struct rxrpc_call *call)
+void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_connection *conn = call->conn;
+ struct rxrpc_sock *rx = rcu_access_pointer(call->socket);
- switch (READ_ONCE(call->state)) {
- case RXRPC_CALL_SERVER_AWAIT_ACK:
- rxrpc_call_completed(call);
- fallthrough;
- case RXRPC_CALL_COMPLETE:
- break;
- default:
- if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN))
- rxrpc_send_abort_packet(call);
- trace_rxrpc_improper_term(call);
- break;
+ /* We present the message to the call we're terminating to make sure it
+ * gets terminated.
+ */
+ if (call->state < RXRPC_CALL_COMPLETE) {
+ set_bit(RXRPC_CALL_IS_DEAD, &call->flags);
+ rxrpc_input_call_packet(call, skb);
+
+ switch (READ_ONCE(call->state)) {
+ case RXRPC_CALL_SERVER_AWAIT_ACK:
+ rxrpc_call_completed(call);
+ fallthrough;
+ case RXRPC_CALL_COMPLETE:
+ break;
+ default:
+ if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN))
+ rxrpc_send_abort_packet(call);
+ trace_rxrpc_improper_term(call);
+ break;
+ }
+
+ spin_lock_bh(&rx->incoming_lock);
+ __rxrpc_disconnect_call(conn, call);
+ spin_unlock_bh(&rx->incoming_lock);
}
-
- spin_lock_bh(&rx->incoming_lock);
- __rxrpc_disconnect_call(conn, call);
- spin_unlock_bh(&rx->incoming_lock);
}
/*
@@ -1096,13 +1104,6 @@ static void rxrpc_receive_implicit_end_call(struct rxrpc_sock *rx,
void rxrpc_receive(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
- struct rxrpc_sock *rx = rcu_access_pointer(call->socket);
-
- if (sp->hdr.callNumber != call->call_id) {
- rxrpc_free_skb(skb, rxrpc_skb_freed);
- rxrpc_receive_implicit_end_call(rx, call);
- return;
- }
if (sp->hdr.serviceId != call->service_id)
call->service_id = sp->hdr.serviceId;
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 8ca444e..0e3f089 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -267,7 +267,7 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
&call->ackr_nr_consumed);
if (acked > 2 &&
!test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
- rxrpc_queue_call(call);
+ rxrpc_poke_call(call, rxrpc_call_poke_idle);
}
/*
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 11cbaf0..5c5fb97 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -170,7 +170,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
{
unsigned long now;
rxrpc_seq_t seq = txb->seq;
- bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
+ bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
rxrpc_inc_stat(call->rxnet, stat_tx_data);
@@ -188,6 +188,7 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
/* Add the packet to the call's output buffer */
spin_lock(&call->tx_lock);
+ poke = list_empty(&call->tx_sendmsg);
list_add_tail(&txb->call_link, &call->tx_sendmsg);
call->tx_prepared = seq;
spin_unlock(&call->tx_lock);
@@ -220,11 +221,8 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
write_unlock(&call->state_lock);
}
-
- /* Stick the packet on the crypto queue or the transmission queue as
- * appropriate.
- */
- rxrpc_queue_call(call);
+ if (poke)
+ rxrpc_poke_call(call, rxrpc_call_poke_start);
}
/*
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index 1062fd4..0c827d5 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -66,7 +66,6 @@ void rxrpc_free_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
{
const void *here = __builtin_return_address(0);
if (skb) {
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
int n;
n = atomic_dec_return(select_skb_count(skb));
trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n, here);
@@ -82,7 +81,6 @@ void rxrpc_purge_queue(struct sk_buff_head *list)
const void *here = __builtin_return_address(0);
struct sk_buff *skb;
while ((skb = skb_dequeue((list))) != NULL) {
- struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
int n = atomic_dec_return(select_skb_count(skb));
trace_rxrpc_skb(skb, rxrpc_skb_purged,
refcount_read(&skb->users), n, here);