Merge branches 'doc.2021.07.20c', 'fixes.2021.08.06a', 'nocb.2021.07.20c', 'nolibc.2021.07.20c', 'tasks.2021.07.20c', 'torture.2021.07.27a' and 'torturescript.2021.07.27a' into HEAD

doc.2021.07.20c: Documentation updates.
fixes.2021.08.06a: Miscellaneous fixes.
nocb.2021.07.20c: Callback-offloading (NOCB CPU) updates.
nolibc.2021.07.20c: Tiny userspace library updates.
tasks.2021.07.20c: Tasks RCU updates.
torture.2021.07.27a: In-kernel torture-test updates.
torturescript.2021.07.27a: Torture-test scripting updates.
diff --git a/include/linux/rculist.h b/include/linux/rculist.h
index f8633d3..d29740b 100644
--- a/include/linux/rculist.h
+++ b/include/linux/rculist.h
@@ -11,15 +11,6 @@
 #include <linux/rcupdate.h>
 
 /*
- * Why is there no list_empty_rcu()?  Because list_empty() serves this
- * purpose.  The list_empty() function fetches the RCU-protected pointer
- * and compares it to the address of the list head, but neither dereferences
- * this pointer itself nor provides this pointer to the caller.  Therefore,
- * it is not necessary to use rcu_dereference(), so that list_empty() can
- * be used anywhere you would want to use a list_empty_rcu().
- */
-
-/*
  * INIT_LIST_HEAD_RCU - Initialize a list_head visible to RCU readers
  * @list: list to be initialized
  *
@@ -318,21 +309,29 @@
 /*
  * Where are list_empty_rcu() and list_first_entry_rcu()?
  *
- * Implementing those functions following their counterparts list_empty() and
- * list_first_entry() is not advisable because they lead to subtle race
- * conditions as the following snippet shows:
+ * They do not exist because they would lead to subtle race conditions:
  *
  * if (!list_empty_rcu(mylist)) {
  *	struct foo *bar = list_first_entry_rcu(mylist, struct foo, list_member);
  *	do_something(bar);
  * }
  *
- * The list may not be empty when list_empty_rcu checks it, but it may be when
- * list_first_entry_rcu rereads the ->next pointer.
+ * The list might be non-empty when list_empty_rcu() checks it, but it
+ * might have become empty by the time that list_first_entry_rcu() rereads
+ * the ->next pointer, which would result in a SEGV.
  *
- * Rereading the ->next pointer is not a problem for list_empty() and
- * list_first_entry() because they would be protected by a lock that blocks
- * writers.
+ * When not using RCU, it is OK for list_first_entry() to re-read that
+ * pointer because both functions should be protected by some lock that
+ * blocks writers.
+ *
+ * When using RCU, list_empty() uses READ_ONCE() to fetch the
+ * RCU-protected ->next pointer and then compares it to the address of the
+ * list head.  However, it neither dereferences this pointer nor provides
+ * this pointer to its caller.  Thus, READ_ONCE() suffices (that is,
+ * rcu_dereference() is not needed), which means that list_empty() can be
+ * used anywhere you would want to use list_empty_rcu().  Just don't
+ * expect anything useful to happen if you do a subsequent lockless
+ * call to list_first_entry_rcu()!!!
  *
  * See list_first_or_null_rcu for an alternative.
  */
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index d9680b7..434d12f 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -53,7 +53,7 @@
  * nesting depth, but makes sense only if CONFIG_PREEMPT_RCU -- in other
  * types of kernel builds, the rcu_read_lock() nesting depth is unknowable.
  */
-#define rcu_preempt_depth() (current->rcu_read_lock_nesting)
+#define rcu_preempt_depth() READ_ONCE(current->rcu_read_lock_nesting)
 
 #else /* #ifdef CONFIG_PREEMPT_RCU */
 
@@ -167,7 +167,7 @@
 # define synchronize_rcu_tasks synchronize_rcu
 # endif
 
-# ifdef CONFIG_TASKS_RCU_TRACE
+# ifdef CONFIG_TASKS_TRACE_RCU
 # define rcu_tasks_trace_qs(t)						\
 	do {								\
 		if (!likely(READ_ONCE((t)->trc_reader_checked)) &&	\
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 953e70f..9be0153 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -14,9 +14,6 @@
 
 #include <asm/param.h> /* for HZ */
 
-/* Never flag non-existent other CPUs! */
-static inline bool rcu_eqs_special_set(int cpu) { return false; }
-
 unsigned long get_state_synchronize_rcu(void);
 unsigned long start_poll_synchronize_rcu(void);
 bool poll_state_synchronize_rcu(unsigned long oldstate);
diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h
index 0e0cf4d..6cfaa0a 100644
--- a/include/linux/srcutiny.h
+++ b/include/linux/srcutiny.h
@@ -61,7 +61,7 @@
 	int idx;
 
 	idx = ((READ_ONCE(ssp->srcu_idx) + 1) & 0x2) >> 1;
-	WRITE_ONCE(ssp->srcu_lock_nesting[idx], ssp->srcu_lock_nesting[idx] + 1);
+	WRITE_ONCE(ssp->srcu_lock_nesting[idx], READ_ONCE(ssp->srcu_lock_nesting[idx]) + 1);
 	return idx;
 }
 
@@ -81,11 +81,11 @@
 {
 	int idx;
 
-	idx = ((READ_ONCE(ssp->srcu_idx) + 1) & 0x2) >> 1;
+	idx = ((data_race(READ_ONCE(ssp->srcu_idx)) + 1) & 0x2) >> 1;
 	pr_alert("%s%s Tiny SRCU per-CPU(idx=%d): (%hd,%hd)\n",
 		 tt, tf, idx,
-		 READ_ONCE(ssp->srcu_lock_nesting[!idx]),
-		 READ_ONCE(ssp->srcu_lock_nesting[idx]));
+		 data_race(READ_ONCE(ssp->srcu_lock_nesting[!idx])),
+		 data_race(READ_ONCE(ssp->srcu_lock_nesting[idx])));
 }
 
 #endif
diff --git a/kernel/locking/locktorture.c b/kernel/locking/locktorture.c
index b3adb40..7c5a4a0 100644
--- a/kernel/locking/locktorture.c
+++ b/kernel/locking/locktorture.c
@@ -59,7 +59,7 @@
 static struct task_struct **reader_tasks;
 
 static bool lock_is_write_held;
-static bool lock_is_read_held;
+static atomic_t lock_is_read_held;
 static unsigned long last_lock_release;
 
 struct lock_stress_stats {
@@ -682,7 +682,7 @@
 		if (WARN_ON_ONCE(lock_is_write_held))
 			lwsp->n_lock_fail++;
 		lock_is_write_held = true;
-		if (WARN_ON_ONCE(lock_is_read_held))
+		if (WARN_ON_ONCE(atomic_read(&lock_is_read_held)))
 			lwsp->n_lock_fail++; /* rare, but... */
 
 		lwsp->n_lock_acquired++;
@@ -717,13 +717,13 @@
 			schedule_timeout_uninterruptible(1);
 
 		cxt.cur_ops->readlock(tid);
-		lock_is_read_held = true;
+		atomic_inc(&lock_is_read_held);
 		if (WARN_ON_ONCE(lock_is_write_held))
 			lrsp->n_lock_fail++; /* rare, but... */
 
 		lrsp->n_lock_acquired++;
 		cxt.cur_ops->read_delay(&rand);
-		lock_is_read_held = false;
+		atomic_dec(&lock_is_read_held);
 		cxt.cur_ops->readunlock(tid);
 
 		stutter_wait("lock_torture_reader");
@@ -738,20 +738,22 @@
 static void __torture_print_stats(char *page,
 				  struct lock_stress_stats *statp, bool write)
 {
+	long cur;
 	bool fail = false;
 	int i, n_stress;
-	long max = 0, min = statp ? statp[0].n_lock_acquired : 0;
+	long max = 0, min = statp ? data_race(statp[0].n_lock_acquired) : 0;
 	long long sum = 0;
 
 	n_stress = write ? cxt.nrealwriters_stress : cxt.nrealreaders_stress;
 	for (i = 0; i < n_stress; i++) {
-		if (statp[i].n_lock_fail)
+		if (data_race(statp[i].n_lock_fail))
 			fail = true;
-		sum += statp[i].n_lock_acquired;
-		if (max < statp[i].n_lock_acquired)
-			max = statp[i].n_lock_acquired;
-		if (min > statp[i].n_lock_acquired)
-			min = statp[i].n_lock_acquired;
+		cur = data_race(statp[i].n_lock_acquired);
+		sum += cur;
+		if (max < cur)
+			max = cur;
+		if (min > cur)
+			min = cur;
 	}
 	page += sprintf(page,
 			"%s:  Total: %lld  Max/Min: %ld/%ld %s  Fail: %d %s\n",
@@ -996,7 +998,6 @@
 		}
 
 		if (nreaders_stress) {
-			lock_is_read_held = false;
 			cxt.lrsa = kmalloc_array(cxt.nrealreaders_stress,
 						 sizeof(*cxt.lrsa),
 						 GFP_KERNEL);
diff --git a/kernel/rcu/rcuscale.c b/kernel/rcu/rcuscale.c
index dca51fe9..2cc34a2 100644
--- a/kernel/rcu/rcuscale.c
+++ b/kernel/rcu/rcuscale.c
@@ -487,7 +487,7 @@
 	if (gp_async) {
 		cur_ops->gp_barrier();
 	}
-	writer_n_durations[me] = i_max;
+	writer_n_durations[me] = i_max + 1;
 	torture_kthread_stopping("rcu_scale_writer");
 	return 0;
 }
@@ -561,7 +561,7 @@
 			wdpp = writer_durations[i];
 			if (!wdpp)
 				continue;
-			for (j = 0; j <= writer_n_durations[i]; j++) {
+			for (j = 0; j < writer_n_durations[i]; j++) {
 				wdp = &wdpp[j];
 				pr_alert("%s%s %4d writer-duration: %5d %llu\n",
 					scale_type, SCALE_FLAG,
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 40ef541..ab421526 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -2022,8 +2022,13 @@
 			  __func__, raw_smp_processor_id());
 		while (ULONG_CMP_LT((unsigned long)ktime_get_seconds(),
 				    stop_at))
-			if (stall_cpu_block)
+			if (stall_cpu_block) {
+#ifdef CONFIG_PREEMPTION
+				preempt_schedule();
+#else
 				schedule_timeout_uninterruptible(HZ);
+#endif
+			}
 		if (stall_cpu_irqsoff)
 			local_irq_enable();
 		else if (!stall_cpu_block)
diff --git a/kernel/rcu/refscale.c b/kernel/rcu/refscale.c
index d998a76..66dc14c 100644
--- a/kernel/rcu/refscale.c
+++ b/kernel/rcu/refscale.c
@@ -467,6 +467,40 @@
 	.name		= "acqrel"
 };
 
+static volatile u64 stopopts;
+
+static void ref_clock_section(const int nloops)
+{
+	u64 x = 0;
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--)
+		x += ktime_get_real_fast_ns();
+	preempt_enable();
+	stopopts = x;
+}
+
+static void ref_clock_delay_section(const int nloops, const int udl, const int ndl)
+{
+	u64 x = 0;
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		x += ktime_get_real_fast_ns();
+		un_delay(udl, ndl);
+	}
+	preempt_enable();
+	stopopts = x;
+}
+
+static struct ref_scale_ops clock_ops = {
+	.readsection	= ref_clock_section,
+	.delaysection	= ref_clock_delay_section,
+	.name		= "clock"
+};
+
 static void rcu_scale_one_reader(void)
 {
 	if (readdelay <= 0)
@@ -759,7 +793,7 @@
 	int firsterr = 0;
 	static struct ref_scale_ops *scale_ops[] = {
 		&rcu_ops, &srcu_ops, &rcu_trace_ops, &rcu_tasks_ops, &refcnt_ops, &rwlock_ops,
-		&rwsem_ops, &lock_ops, &lock_irq_ops, &acqrel_ops,
+		&rwsem_ops, &lock_ops, &lock_irq_ops, &acqrel_ops, &clock_ops,
 	};
 
 	if (!torture_init_begin(scale_type, verbose))
diff --git a/kernel/rcu/srcutiny.c b/kernel/rcu/srcutiny.c
index 26344dc..a0ba2ed49 100644
--- a/kernel/rcu/srcutiny.c
+++ b/kernel/rcu/srcutiny.c
@@ -96,7 +96,7 @@
  */
 void __srcu_read_unlock(struct srcu_struct *ssp, int idx)
 {
-	int newval = ssp->srcu_lock_nesting[idx] - 1;
+	int newval = READ_ONCE(ssp->srcu_lock_nesting[idx]) - 1;
 
 	WRITE_ONCE(ssp->srcu_lock_nesting[idx], newval);
 	if (!newval && READ_ONCE(ssp->srcu_gp_waiting))
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index 8536c55..806160c 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -643,8 +643,8 @@
 //
 // "Rude" variant of Tasks RCU, inspired by Steve Rostedt's trick of
 // passing an empty function to schedule_on_each_cpu().  This approach
-// provides an asynchronous call_rcu_tasks_rude() API and batching
-// of concurrent calls to the synchronous synchronize_rcu_rude() API.
+// provides an asynchronous call_rcu_tasks_rude() API and batching of
+// concurrent calls to the synchronous synchronize_rcu_tasks_rude() API.
 // This invokes schedule_on_each_cpu() in order to send IPIs far and wide
 // and induces otherwise unnecessary context switches on all online CPUs,
 // whether idle or not.
@@ -785,7 +785,10 @@
 //	set that task's .need_qs flag so that task's next outermost
 //	rcu_read_unlock_trace() will report the quiescent state (in which
 //	case the count of readers is incremented).  If both attempts fail,
-//	the task is added to a "holdout" list.
+//	the task is added to a "holdout" list.  Note that IPIs are used
+//	to invoke trc_read_check_handler() in the context of running tasks
+//	in order to avoid ordering overhead on common-case shared-variable
+//	accessses.
 // rcu_tasks_trace_postscan():
 //	Initialize state and attempt to identify an immediate quiescent
 //	state as above (but only for idle tasks), unblock CPU-hotplug
@@ -847,7 +850,7 @@
 /* If we are the last reader, wake up the grace-period kthread. */
 void rcu_read_unlock_trace_special(struct task_struct *t, int nesting)
 {
-	int nq = t->trc_reader_special.b.need_qs;
+	int nq = READ_ONCE(t->trc_reader_special.b.need_qs);
 
 	if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB) &&
 	    t->trc_reader_special.b.need_mb)
@@ -894,7 +897,7 @@
 
 	// If the task is not in a read-side critical section, and
 	// if this is the last reader, awaken the grace-period kthread.
-	if (likely(!t->trc_reader_nesting)) {
+	if (likely(!READ_ONCE(t->trc_reader_nesting))) {
 		if (WARN_ON_ONCE(atomic_dec_and_test(&trc_n_readers_need_end)))
 			wake_up(&trc_wait);
 		// Mark as checked after decrement to avoid false
@@ -903,7 +906,7 @@
 		goto reset_ipi;
 	}
 	// If we are racing with an rcu_read_unlock_trace(), try again later.
-	if (unlikely(t->trc_reader_nesting < 0)) {
+	if (unlikely(READ_ONCE(t->trc_reader_nesting) < 0)) {
 		if (WARN_ON_ONCE(atomic_dec_and_test(&trc_n_readers_need_end)))
 			wake_up(&trc_wait);
 		goto reset_ipi;
@@ -913,14 +916,14 @@
 	// Get here if the task is in a read-side critical section.  Set
 	// its state so that it will awaken the grace-period kthread upon
 	// exit from that critical section.
-	WARN_ON_ONCE(t->trc_reader_special.b.need_qs);
+	WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs));
 	WRITE_ONCE(t->trc_reader_special.b.need_qs, true);
 
 reset_ipi:
 	// Allow future IPIs to be sent on CPU and for task.
 	// Also order this IPI handler against any later manipulations of
 	// the intended task.
-	smp_store_release(&per_cpu(trc_ipi_to_cpu, smp_processor_id()), false); // ^^^
+	smp_store_release(per_cpu_ptr(&trc_ipi_to_cpu, smp_processor_id()), false); // ^^^
 	smp_store_release(&texp->trc_ipi_to_cpu, -1); // ^^^
 }
 
@@ -950,6 +953,7 @@
 			n_heavy_reader_ofl_updates++;
 		in_qs = true;
 	} else {
+		// The task is not running, so C-language access is safe.
 		in_qs = likely(!t->trc_reader_nesting);
 	}
 
@@ -964,7 +968,7 @@
 	// state so that it will awaken the grace-period kthread upon exit
 	// from that critical section.
 	atomic_inc(&trc_n_readers_need_end); // One more to wait on.
-	WARN_ON_ONCE(t->trc_reader_special.b.need_qs);
+	WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs));
 	WRITE_ONCE(t->trc_reader_special.b.need_qs, true);
 	return true;
 }
@@ -982,7 +986,7 @@
 	// The current task had better be in a quiescent state.
 	if (t == current) {
 		t->trc_reader_checked = true;
-		WARN_ON_ONCE(t->trc_reader_nesting);
+		WARN_ON_ONCE(READ_ONCE(t->trc_reader_nesting));
 		return;
 	}
 
@@ -994,6 +998,12 @@
 	}
 	put_task_struct(t);
 
+	// If this task is not yet on the holdout list, then we are in
+	// an RCU read-side critical section.  Otherwise, the invocation of
+	// rcu_add_holdout() that added it to the list did the necessary
+	// get_task_struct().  Either way, the task cannot be freed out
+	// from under this code.
+
 	// If currently running, send an IPI, either way, add to list.
 	trc_add_holdout(t, bhp);
 	if (task_curr(t) &&
@@ -1092,8 +1102,8 @@
 		 ".I"[READ_ONCE(t->trc_ipi_to_cpu) > 0],
 		 ".i"[is_idle_task(t)],
 		 ".N"[cpu > 0 && tick_nohz_full_cpu(cpu)],
-		 t->trc_reader_nesting,
-		 " N"[!!t->trc_reader_special.b.need_qs],
+		 READ_ONCE(t->trc_reader_nesting),
+		 " N"[!!READ_ONCE(t->trc_reader_special.b.need_qs)],
 		 cpu);
 	sched_show_task(t);
 }
@@ -1187,7 +1197,7 @@
 static void exit_tasks_rcu_finish_trace(struct task_struct *t)
 {
 	WRITE_ONCE(t->trc_reader_checked, true);
-	WARN_ON_ONCE(t->trc_reader_nesting);
+	WARN_ON_ONCE(READ_ONCE(t->trc_reader_nesting));
 	WRITE_ONCE(t->trc_reader_nesting, 0);
 	if (WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs)))
 		rcu_read_unlock_trace_special(t, 0);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 51f24ec..bce848e 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -74,17 +74,10 @@
 
 /* Data structures. */
 
-/*
- * Steal a bit from the bottom of ->dynticks for idle entry/exit
- * control.  Initially this is for TLB flushing.
- */
-#define RCU_DYNTICK_CTRL_MASK 0x1
-#define RCU_DYNTICK_CTRL_CTR  (RCU_DYNTICK_CTRL_MASK + 1)
-
 static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = {
 	.dynticks_nesting = 1,
 	.dynticks_nmi_nesting = DYNTICK_IRQ_NONIDLE,
-	.dynticks = ATOMIC_INIT(RCU_DYNTICK_CTRL_CTR),
+	.dynticks = ATOMIC_INIT(1),
 #ifdef CONFIG_RCU_NOCB_CPU
 	.cblist.flags = SEGCBLIST_SOFTIRQ_ONLY,
 #endif
@@ -259,6 +252,15 @@
 }
 
 /*
+ * Increment the current CPU's rcu_data structure's ->dynticks field
+ * with ordering.  Return the new value.
+ */
+static noinline noinstr unsigned long rcu_dynticks_inc(int incby)
+{
+	return arch_atomic_add_return(incby, this_cpu_ptr(&rcu_data.dynticks));
+}
+
+/*
  * Record entry into an extended quiescent state.  This is only to be
  * called when not already in an extended quiescent state, that is,
  * RCU is watching prior to the call to this function and is no longer
@@ -266,7 +268,6 @@
  */
 static noinstr void rcu_dynticks_eqs_enter(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	int seq;
 
 	/*
@@ -275,13 +276,9 @@
 	 * next idle sojourn.
 	 */
 	rcu_dynticks_task_trace_enter();  // Before ->dynticks update!
-	seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	seq = rcu_dynticks_inc(1);
 	// RCU is no longer watching.  Better be in extended quiescent state!
-	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-		     (seq & RCU_DYNTICK_CTRL_CTR));
-	/* Better not have special action (TLB flush) pending! */
-	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-		     (seq & RCU_DYNTICK_CTRL_MASK));
+	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && (seq & 0x1));
 }
 
 /*
@@ -291,7 +288,6 @@
  */
 static noinstr void rcu_dynticks_eqs_exit(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	int seq;
 
 	/*
@@ -299,15 +295,10 @@
 	 * and we also must force ordering with the next RCU read-side
 	 * critical section.
 	 */
-	seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	seq = rcu_dynticks_inc(1);
 	// RCU is now watching.  Better not be in an extended quiescent state!
 	rcu_dynticks_task_trace_exit();  // After ->dynticks update!
-	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-		     !(seq & RCU_DYNTICK_CTRL_CTR));
-	if (seq & RCU_DYNTICK_CTRL_MASK) {
-		arch_atomic_andnot(RCU_DYNTICK_CTRL_MASK, &rdp->dynticks);
-		smp_mb__after_atomic(); /* _exit after clearing mask. */
-	}
+	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && !(seq & 0x1));
 }
 
 /*
@@ -324,9 +315,9 @@
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
-	if (atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR)
+	if (atomic_read(&rdp->dynticks) & 0x1)
 		return;
-	atomic_add(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	rcu_dynticks_inc(1);
 }
 
 /*
@@ -336,9 +327,7 @@
  */
 static __always_inline bool rcu_dynticks_curr_cpu_in_eqs(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
-
-	return !(arch_atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR);
+	return !(atomic_read(this_cpu_ptr(&rcu_data.dynticks)) & 0x1);
 }
 
 /*
@@ -347,9 +336,8 @@
  */
 static int rcu_dynticks_snap(struct rcu_data *rdp)
 {
-	int snap = atomic_add_return(0, &rdp->dynticks);
-
-	return snap & ~RCU_DYNTICK_CTRL_MASK;
+	smp_mb();  // Fundamental RCU ordering guarantee.
+	return atomic_read_acquire(&rdp->dynticks);
 }
 
 /*
@@ -358,7 +346,7 @@
  */
 static bool rcu_dynticks_in_eqs(int snap)
 {
-	return !(snap & RCU_DYNTICK_CTRL_CTR);
+	return !(snap & 0x1);
 }
 
 /* Return true if the specified CPU is currently idle from an RCU viewpoint.  */
@@ -389,8 +377,7 @@
 	int snap;
 
 	// If not quiescent, force back to earlier extended quiescent state.
-	snap = atomic_read(&rdp->dynticks) & ~(RCU_DYNTICK_CTRL_MASK |
-					       RCU_DYNTICK_CTRL_CTR);
+	snap = atomic_read(&rdp->dynticks) & ~0x1;
 
 	smp_rmb(); // Order ->dynticks and *vp reads.
 	if (READ_ONCE(*vp))
@@ -398,32 +385,7 @@
 	smp_rmb(); // Order *vp read and ->dynticks re-read.
 
 	// If still in the same extended quiescent state, we are good!
-	return snap == (atomic_read(&rdp->dynticks) & ~RCU_DYNTICK_CTRL_MASK);
-}
-
-/*
- * Set the special (bottom) bit of the specified CPU so that it
- * will take special action (such as flushing its TLB) on the
- * next exit from an extended quiescent state.  Returns true if
- * the bit was successfully set, or false if the CPU was not in
- * an extended quiescent state.
- */
-bool rcu_eqs_special_set(int cpu)
-{
-	int old;
-	int new;
-	int new_old;
-	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
-
-	new_old = atomic_read(&rdp->dynticks);
-	do {
-		old = new_old;
-		if (old & RCU_DYNTICK_CTRL_CTR)
-			return false;
-		new = old | RCU_DYNTICK_CTRL_MASK;
-		new_old = atomic_cmpxchg(&rdp->dynticks, old, new);
-	} while (new_old != old);
-	return true;
+	return snap == atomic_read(&rdp->dynticks);
 }
 
 /*
@@ -439,13 +401,12 @@
  */
 notrace void rcu_momentary_dyntick_idle(void)
 {
-	int special;
+	int seq;
 
 	raw_cpu_write(rcu_data.rcu_need_heavy_qs, false);
-	special = atomic_add_return(2 * RCU_DYNTICK_CTRL_CTR,
-				    &this_cpu_ptr(&rcu_data)->dynticks);
+	seq = rcu_dynticks_inc(2);
 	/* It is illegal to call this from idle state. */
-	WARN_ON_ONCE(!(special & RCU_DYNTICK_CTRL_CTR));
+	WARN_ON_ONCE(!(seq & 0x1));
 	rcu_preempt_deferred_qs(current);
 }
 EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle);
@@ -1325,7 +1286,7 @@
 	 */
 	jtsq = READ_ONCE(jiffies_to_sched_qs);
 	ruqp = per_cpu_ptr(&rcu_data.rcu_urgent_qs, rdp->cpu);
-	rnhqp = &per_cpu(rcu_data.rcu_need_heavy_qs, rdp->cpu);
+	rnhqp = per_cpu_ptr(&rcu_data.rcu_need_heavy_qs, rdp->cpu);
 	if (!READ_ONCE(*rnhqp) &&
 	    (time_after(jiffies, rcu_state.gp_start + jtsq * 2) ||
 	     time_after(jiffies, rcu_state.jiffies_resched) ||
@@ -1772,7 +1733,7 @@
 /*
  * Initialize a new grace period.  Return false if no grace period required.
  */
-static bool rcu_gp_init(void)
+static noinline_for_stack bool rcu_gp_init(void)
 {
 	unsigned long firstseq;
 	unsigned long flags;
@@ -1966,7 +1927,7 @@
 /*
  * Loop doing repeated quiescent-state forcing until the grace period ends.
  */
-static void rcu_gp_fqs_loop(void)
+static noinline_for_stack void rcu_gp_fqs_loop(void)
 {
 	bool first_gp_fqs;
 	int gf = 0;
@@ -1993,8 +1954,8 @@
 		trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 				       TPS("fqswait"));
 		WRITE_ONCE(rcu_state.gp_state, RCU_GP_WAIT_FQS);
-		ret = swait_event_idle_timeout_exclusive(
-				rcu_state.gp_wq, rcu_gp_fqs_check_wake(&gf), j);
+		(void)swait_event_idle_timeout_exclusive(rcu_state.gp_wq,
+				 rcu_gp_fqs_check_wake(&gf), j);
 		rcu_gp_torture_wait();
 		WRITE_ONCE(rcu_state.gp_state, RCU_GP_DOING_FQS);
 		/* Locking provides needed memory barriers. */
@@ -2471,9 +2432,6 @@
 	WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus - 1);
 	/* Adjust any no-longer-needed kthreads. */
 	rcu_boost_kthread_setaffinity(rnp, -1);
-	/* Do any needed no-CB deferred wakeups from this CPU. */
-	do_nocb_deferred_wakeup(per_cpu_ptr(&rcu_data, cpu));
-
 	// Stop-machine done, so allow nohz_full to disable tick.
 	tick_dep_clear(TICK_DEP_BIT_RCU);
 	return 0;
@@ -4050,7 +4008,7 @@
 	 */
 	init_completion(&rcu_state.barrier_completion);
 	atomic_set(&rcu_state.barrier_cpu_count, 2);
-	get_online_cpus();
+	cpus_read_lock();
 
 	/*
 	 * Force each CPU with callbacks to register a new callback.
@@ -4081,7 +4039,7 @@
 					  rcu_state.barrier_sequence);
 		}
 	}
-	put_online_cpus();
+	cpus_read_unlock();
 
 	/*
 	 * Now that we have an rcu_barrier_callback() callback on each
@@ -4784,4 +4742,5 @@
 
 #include "tree_stall.h"
 #include "tree_exp.h"
+#include "tree_nocb.h"
 #include "tree_plugin.h"
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
new file mode 100644
index 0000000..8fdf44f
--- /dev/null
+++ b/kernel/rcu/tree_nocb.h
@@ -0,0 +1,1496 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
+/*
+ * Read-Copy Update mechanism for mutual exclusion (tree-based version)
+ * Internal non-public definitions that provide either classic
+ * or preemptible semantics.
+ *
+ * Copyright Red Hat, 2009
+ * Copyright IBM Corporation, 2009
+ * Copyright SUSE, 2021
+ *
+ * Author: Ingo Molnar <mingo@elte.hu>
+ *	   Paul E. McKenney <paulmck@linux.ibm.com>
+ *	   Frederic Weisbecker <frederic@kernel.org>
+ */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
+static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
+{
+	return lockdep_is_held(&rdp->nocb_lock);
+}
+
+static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
+{
+	/* Race on early boot between thread creation and assignment */
+	if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
+		return true;
+
+	if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
+		if (in_task())
+			return true;
+	return false;
+}
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
+ * created that pull the callbacks from the corresponding CPU, wait for
+ * a grace period to elapse, and invoke the callbacks.  These kthreads
+ * are organized into GP kthreads, which manage incoming callbacks, wait for
+ * grace periods, and awaken CB kthreads, and the CB kthreads, which only
+ * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
+ * do a wake_up() on their GP kthread when they insert a callback into any
+ * empty list, unless the rcu_nocb_poll boot parameter has been specified,
+ * in which case each kthread actively polls its CPU.  (Which isn't so great
+ * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callbacks can also be used as an energy-efficiency
+ * measure because CPUs with no RCU callbacks queued are more aggressive
+ * about entering dyntick-idle mode.
+ */
+
+
+/*
+ * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
+ * If the list is invalid, a warning is emitted and all CPUs are offloaded.
+ */
+static int __init rcu_nocb_setup(char *str)
+{
+	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+	if (cpulist_parse(str, rcu_nocb_mask)) {
+		pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
+		cpumask_setall(rcu_nocb_mask);
+	}
+	return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+static int __init parse_rcu_nocb_poll(char *arg)
+{
+	rcu_nocb_poll = true;
+	return 0;
+}
+early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
+
+/*
+ * Don't bother bypassing ->cblist if the call_rcu() rate is low.
+ * After all, the main point of bypassing is to avoid lock contention
+ * on ->nocb_lock, which only can happen at high call_rcu() rates.
+ */
+static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
+module_param(nocb_nobypass_lim_per_jiffy, int, 0);
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
+ * lock isn't immediately available, increment ->nocb_lock_contended to
+ * flag the contention.
+ */
+static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
+	__acquires(&rdp->nocb_bypass_lock)
+{
+	lockdep_assert_irqs_disabled();
+	if (raw_spin_trylock(&rdp->nocb_bypass_lock))
+		return;
+	atomic_inc(&rdp->nocb_lock_contended);
+	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+	smp_mb__after_atomic(); /* atomic_inc() before lock. */
+	raw_spin_lock(&rdp->nocb_bypass_lock);
+	smp_mb__before_atomic(); /* atomic_dec() after lock. */
+	atomic_dec(&rdp->nocb_lock_contended);
+}
+
+/*
+ * Spinwait until the specified rcu_data structure's ->nocb_lock is
+ * not contended.  Please note that this is extremely special-purpose,
+ * relying on the fact that at most two kthreads and one CPU contend for
+ * this lock, and also that the two kthreads are guaranteed to have frequent
+ * grace-period-duration time intervals between successive acquisitions
+ * of the lock.  This allows us to use an extremely simple throttling
+ * mechanism, and further to apply it only to the CPU doing floods of
+ * call_rcu() invocations.  Don't try this at home!
+ */
+static void rcu_nocb_wait_contended(struct rcu_data *rdp)
+{
+	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+	while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
+		cpu_relax();
+}
+
+/*
+ * Conditionally acquire the specified rcu_data structure's
+ * ->nocb_bypass_lock.
+ */
+static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+	return raw_spin_trylock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_bypass_lock.
+ */
+static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
+	__releases(&rdp->nocb_bypass_lock)
+{
+	lockdep_assert_irqs_disabled();
+	raw_spin_unlock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+	if (!rcu_rdp_is_offloaded(rdp))
+		return;
+	raw_spin_lock(&rdp->nocb_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+	if (rcu_rdp_is_offloaded(rdp)) {
+		lockdep_assert_irqs_disabled();
+		raw_spin_unlock(&rdp->nocb_lock);
+	}
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock and restore
+ * interrupts, but only if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+				       unsigned long flags)
+{
+	if (rcu_rdp_is_offloaded(rdp)) {
+		lockdep_assert_irqs_disabled();
+		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+	} else {
+		local_irq_restore(flags);
+	}
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+	if (rcu_rdp_is_offloaded(rdp))
+		lockdep_assert_held(&rdp->nocb_lock);
+}
+
+/*
+ * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
+ * grace period.
+ */
+static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
+{
+	swake_up_all(sq);
+}
+
+static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
+{
+	return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
+}
+
+static void rcu_init_one_nocb(struct rcu_node *rnp)
+{
+	init_swait_queue_head(&rnp->nocb_gp_wq[0]);
+	init_swait_queue_head(&rnp->nocb_gp_wq[1]);
+}
+
+/* Is the specified CPU a no-CBs CPU? */
+bool rcu_is_nocb_cpu(int cpu)
+{
+	if (cpumask_available(rcu_nocb_mask))
+		return cpumask_test_cpu(cpu, rcu_nocb_mask);
+	return false;
+}
+
+static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
+			   struct rcu_data *rdp,
+			   bool force, unsigned long flags)
+	__releases(rdp_gp->nocb_gp_lock)
+{
+	bool needwake = false;
+
+	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
+		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+				    TPS("AlreadyAwake"));
+		return false;
+	}
+
+	if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+		del_timer(&rdp_gp->nocb_timer);
+	}
+
+	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
+		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
+		needwake = true;
+	}
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+	if (needwake) {
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
+		wake_up_process(rdp_gp->nocb_gp_kthread);
+	}
+
+	return needwake;
+}
+
+/*
+ * Kick the GP kthread for this NOCB group.
+ */
+static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
+}
+
+/*
+ * Arrange to wake the GP kthread for this NOCB group at some future
+ * time when it is safe to do so.
+ */
+static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
+			       const char *reason)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+
+	/*
+	 * Bypass wakeup overrides previous deferments. In case
+	 * of callback storm, no need to wake up too early.
+	 */
+	if (waketype == RCU_NOCB_WAKE_BYPASS) {
+		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	} else {
+		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
+			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
+		if (rdp_gp->nocb_defer_wakeup < waketype)
+			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	}
+
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				     unsigned long j)
+{
+	struct rcu_cblist rcl;
+
+	WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
+	rcu_lockdep_assert_cblist_protected(rdp);
+	lockdep_assert_held(&rdp->nocb_bypass_lock);
+	if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
+		raw_spin_unlock(&rdp->nocb_bypass_lock);
+		return false;
+	}
+	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
+	if (rhp)
+		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
+	WRITE_ONCE(rdp->nocb_bypass_first, j);
+	rcu_nocb_bypass_unlock(rdp);
+	return true;
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				  unsigned long j)
+{
+	if (!rcu_rdp_is_offloaded(rdp))
+		return true;
+	rcu_lockdep_assert_cblist_protected(rdp);
+	rcu_nocb_bypass_lock(rdp);
+	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+}
+
+/*
+ * If the ->nocb_bypass_lock is immediately available, flush the
+ * ->nocb_bypass queue into ->cblist.
+ */
+static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
+{
+	rcu_lockdep_assert_cblist_protected(rdp);
+	if (!rcu_rdp_is_offloaded(rdp) ||
+	    !rcu_nocb_bypass_trylock(rdp))
+		return;
+	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
+}
+
+/*
+ * See whether it is appropriate to use the ->nocb_bypass list in order
+ * to control contention on ->nocb_lock.  A limited number of direct
+ * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
+ * is non-empty, further callbacks must be placed into ->nocb_bypass,
+ * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
+ * back to direct use of ->cblist.  However, ->nocb_bypass should not be
+ * used if ->cblist is empty, because otherwise callbacks can be stranded
+ * on ->nocb_bypass because we cannot count on the current CPU ever again
+ * invoking call_rcu().  The general rule is that if ->nocb_bypass is
+ * non-empty, the corresponding no-CBs grace-period kthread must not be
+ * in an indefinite sleep state.
+ *
+ * Finally, it is not permitted to use the bypass during early boot,
+ * as doing so would confuse the auto-initialization code.  Besides
+ * which, there is no point in worrying about lock contention while
+ * there is only one CPU in operation.
+ */
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				bool *was_alldone, unsigned long flags)
+{
+	unsigned long c;
+	unsigned long cur_gp_seq;
+	unsigned long j = jiffies;
+	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+
+	lockdep_assert_irqs_disabled();
+
+	// Pure softirq/rcuc based processing: no bypassing, no
+	// locking.
+	if (!rcu_rdp_is_offloaded(rdp)) {
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		return false;
+	}
+
+	// In the process of (de-)offloading: no bypassing, but
+	// locking.
+	if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
+		rcu_nocb_lock(rdp);
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		return false; /* Not offloaded, no bypassing. */
+	}
+
+	// Don't use ->nocb_bypass during early boot.
+	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
+		rcu_nocb_lock(rdp);
+		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		return false;
+	}
+
+	// If we have advanced to a new jiffy, reset counts to allow
+	// moving back from ->nocb_bypass to ->cblist.
+	if (j == rdp->nocb_nobypass_last) {
+		c = rdp->nocb_nobypass_count + 1;
+	} else {
+		WRITE_ONCE(rdp->nocb_nobypass_last, j);
+		c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
+		if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
+				 nocb_nobypass_lim_per_jiffy))
+			c = 0;
+		else if (c > nocb_nobypass_lim_per_jiffy)
+			c = nocb_nobypass_lim_per_jiffy;
+	}
+	WRITE_ONCE(rdp->nocb_nobypass_count, c);
+
+	// If there hasn't yet been all that many ->cblist enqueues
+	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
+	// ->nocb_bypass first.
+	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+		rcu_nocb_lock(rdp);
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		if (*was_alldone)
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("FirstQ"));
+		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+		return false; // Caller must enqueue the callback.
+	}
+
+	// If ->nocb_bypass has been used too long or is too full,
+	// flush ->nocb_bypass to ->cblist.
+	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+	    ncbs >= qhimark) {
+		rcu_nocb_lock(rdp);
+		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+			if (*was_alldone)
+				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+						    TPS("FirstQ"));
+			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+			return false; // Caller must enqueue the callback.
+		}
+		if (j != rdp->nocb_gp_adv_time &&
+		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+			rcu_advance_cbs_nowake(rdp->mynode, rdp);
+			rdp->nocb_gp_adv_time = j;
+		}
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		return true; // Callback already enqueued.
+	}
+
+	// We need to use the bypass.
+	rcu_nocb_wait_contended(rdp);
+	rcu_nocb_bypass_lock(rdp);
+	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+	if (!ncbs) {
+		WRITE_ONCE(rdp->nocb_bypass_first, j);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
+	}
+	rcu_nocb_bypass_unlock(rdp);
+	smp_mb(); /* Order enqueue before wake. */
+	if (ncbs) {
+		local_irq_restore(flags);
+	} else {
+		// No-CBs GP kthread might be indefinitely asleep, if so, wake.
+		rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
+		if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("FirstBQwake"));
+			__call_rcu_nocb_wake(rdp, true, flags);
+		} else {
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("FirstBQnoWake"));
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+		}
+	}
+	return true; // Callback already enqueued.
+}
+
+/*
+ * Awaken the no-CBs grace-period kthread if needed, either due to it
+ * legitimately being asleep or due to overload conditions.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
+				 unsigned long flags)
+				 __releases(rdp->nocb_lock)
+{
+	unsigned long cur_gp_seq;
+	unsigned long j;
+	long len;
+	struct task_struct *t;
+
+	// If we are being polled or there is no kthread, just leave.
+	t = READ_ONCE(rdp->nocb_gp_kthread);
+	if (rcu_nocb_poll || !t) {
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+				    TPS("WakeNotPoll"));
+		return;
+	}
+	// Need to actually to a wakeup.
+	len = rcu_segcblist_n_cbs(&rdp->cblist);
+	if (was_alldone) {
+		rdp->qlen_last_fqs_check = len;
+		if (!irqs_disabled_flags(flags)) {
+			/* ... if queue was empty ... */
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp(rdp, false);
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("WakeEmpty"));
+		} else {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
+					   TPS("WakeEmptyIsDeferred"));
+		}
+	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
+		/* ... or if many callbacks queued. */
+		rdp->qlen_last_fqs_check = len;
+		j = jiffies;
+		if (j != rdp->nocb_gp_adv_time &&
+		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+			rcu_advance_cbs_nowake(rdp->mynode, rdp);
+			rdp->nocb_gp_adv_time = j;
+		}
+		smp_mb(); /* Enqueue before timer_pending(). */
+		if ((rdp->nocb_cb_sleep ||
+		     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
+		    !timer_pending(&rdp->nocb_timer)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
+					   TPS("WakeOvfIsDeferred"));
+		} else {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+		}
+	} else {
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+	}
+	return;
+}
+
+/*
+ * Check if we ignore this rdp.
+ *
+ * We check that without holding the nocb lock but
+ * we make sure not to miss a freshly offloaded rdp
+ * with the current ordering:
+ *
+ *  rdp_offload_toggle()        nocb_gp_enabled_cb()
+ * -------------------------   ----------------------------
+ *    WRITE flags                 LOCK nocb_gp_lock
+ *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
+ *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
+ *    UNLOCK nocb_gp_lock         READ flags
+ */
+static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
+{
+	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
+
+	return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
+						     bool *needwake_state)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+
+	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
+			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
+			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+				*needwake_state = true;
+		}
+		return false;
+	}
+
+	/*
+	 * De-offloading. Clear our flag and notify the de-offload worker.
+	 * We will ignore this rdp until it ever gets re-offloaded.
+	 */
+	WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+	rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
+	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+		*needwake_state = true;
+	return true;
+}
+
+
+/*
+ * No-CBs GP kthreads come here to wait for additional callbacks to show up
+ * or for grace periods to end.
+ */
+static void nocb_gp_wait(struct rcu_data *my_rdp)
+{
+	bool bypass = false;
+	long bypass_ncbs;
+	int __maybe_unused cpu = my_rdp->cpu;
+	unsigned long cur_gp_seq;
+	unsigned long flags;
+	bool gotcbs = false;
+	unsigned long j = jiffies;
+	bool needwait_gp = false; // This prevents actual uninitialized use.
+	bool needwake;
+	bool needwake_gp;
+	struct rcu_data *rdp;
+	struct rcu_node *rnp;
+	unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
+	bool wasempty = false;
+
+	/*
+	 * Each pass through the following loop checks for CBs and for the
+	 * nearest grace period (if any) to wait for next.  The CB kthreads
+	 * and the global grace-period kthread are awakened if needed.
+	 */
+	WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
+	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
+		bool needwake_state = false;
+
+		if (!nocb_gp_enabled_cb(rdp))
+			continue;
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
+		rcu_nocb_lock_irqsave(rdp, flags);
+		if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			if (needwake_state)
+				swake_up_one(&rdp->nocb_state_wq);
+			continue;
+		}
+		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+		if (bypass_ncbs &&
+		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
+		     bypass_ncbs > 2 * qhimark)) {
+			// Bypass full or old, so flush it.
+			(void)rcu_nocb_try_flush_bypass(rdp, j);
+			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			if (needwake_state)
+				swake_up_one(&rdp->nocb_state_wq);
+			continue; /* No callbacks here, try next. */
+		}
+		if (bypass_ncbs) {
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("Bypass"));
+			bypass = true;
+		}
+		rnp = rdp->mynode;
+
+		// Advance callbacks if helpful and low contention.
+		needwake_gp = false;
+		if (!rcu_segcblist_restempty(&rdp->cblist,
+					     RCU_NEXT_READY_TAIL) ||
+		    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+		     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
+			raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
+			needwake_gp = rcu_advance_cbs(rnp, rdp);
+			wasempty = rcu_segcblist_restempty(&rdp->cblist,
+							   RCU_NEXT_READY_TAIL);
+			raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
+		}
+		// Need to wait on some grace period?
+		WARN_ON_ONCE(wasempty &&
+			     !rcu_segcblist_restempty(&rdp->cblist,
+						      RCU_NEXT_READY_TAIL));
+		if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
+			if (!needwait_gp ||
+			    ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
+				wait_gp_seq = cur_gp_seq;
+			needwait_gp = true;
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("NeedWaitGP"));
+		}
+		if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
+			needwake = rdp->nocb_cb_sleep;
+			WRITE_ONCE(rdp->nocb_cb_sleep, false);
+			smp_mb(); /* CB invocation -after- GP end. */
+		} else {
+			needwake = false;
+		}
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		if (needwake) {
+			swake_up_one(&rdp->nocb_cb_wq);
+			gotcbs = true;
+		}
+		if (needwake_gp)
+			rcu_gp_kthread_wake();
+		if (needwake_state)
+			swake_up_one(&rdp->nocb_state_wq);
+	}
+
+	my_rdp->nocb_gp_bypass = bypass;
+	my_rdp->nocb_gp_gp = needwait_gp;
+	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
+
+	if (bypass && !rcu_nocb_poll) {
+		// At least one child with non-empty ->nocb_bypass, so set
+		// timer in order to avoid stranding its callbacks.
+		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+				   TPS("WakeBypassIsDeferred"));
+	}
+	if (rcu_nocb_poll) {
+		/* Polling, so trace if first poll in the series. */
+		if (gotcbs)
+			trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
+		schedule_timeout_idle(1);
+	} else if (!needwait_gp) {
+		/* Wait for callbacks to appear. */
+		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
+		swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
+				!READ_ONCE(my_rdp->nocb_gp_sleep));
+		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
+	} else {
+		rnp = my_rdp->mynode;
+		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
+		swait_event_interruptible_exclusive(
+			rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
+			rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
+			!READ_ONCE(my_rdp->nocb_gp_sleep));
+		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
+	}
+	if (!rcu_nocb_poll) {
+		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
+		if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+			WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+			del_timer(&my_rdp->nocb_timer);
+		}
+		WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
+		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
+	}
+	my_rdp->nocb_gp_seq = -1;
+	WARN_ON(signal_pending(current));
+}
+
+/*
+ * No-CBs grace-period-wait kthread.  There is one of these per group
+ * of CPUs, but only once at least one CPU in that group has come online
+ * at least once since boot.  This kthread checks for newly posted
+ * callbacks from any of the CPUs it is responsible for, waits for a
+ * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
+ * that then have callback-invocation work to do.
+ */
+static int rcu_nocb_gp_kthread(void *arg)
+{
+	struct rcu_data *rdp = arg;
+
+	for (;;) {
+		WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
+		nocb_gp_wait(rdp);
+		cond_resched_tasks_rcu_qs();
+	}
+	return 0;
+}
+
+static inline bool nocb_cb_can_run(struct rcu_data *rdp)
+{
+	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
+	return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
+{
+	return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
+}
+
+/*
+ * Invoke any ready callbacks from the corresponding no-CBs CPU,
+ * then, if there are no more, wait for more to appear.
+ */
+static void nocb_cb_wait(struct rcu_data *rdp)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long cur_gp_seq;
+	unsigned long flags;
+	bool needwake_state = false;
+	bool needwake_gp = false;
+	bool can_sleep = true;
+	struct rcu_node *rnp = rdp->mynode;
+
+	local_irq_save(flags);
+	rcu_momentary_dyntick_idle();
+	local_irq_restore(flags);
+	/*
+	 * Disable BH to provide the expected environment.  Also, when
+	 * transitioning to/from NOCB mode, a self-requeuing callback might
+	 * be invoked from softirq.  A short grace period could cause both
+	 * instances of this callback would execute concurrently.
+	 */
+	local_bh_disable();
+	rcu_do_batch(rdp);
+	local_bh_enable();
+	lockdep_assert_irqs_enabled();
+	rcu_nocb_lock_irqsave(rdp, flags);
+	if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
+	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
+	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
+		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
+		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
+	}
+
+	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
+			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
+			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+				needwake_state = true;
+		}
+		if (rcu_segcblist_ready_cbs(cblist))
+			can_sleep = false;
+	} else {
+		/*
+		 * De-offloading. Clear our flag and notify the de-offload worker.
+		 * We won't touch the callbacks and keep sleeping until we ever
+		 * get re-offloaded.
+		 */
+		WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
+		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+			needwake_state = true;
+	}
+
+	WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
+
+	if (rdp->nocb_cb_sleep)
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
+
+	rcu_nocb_unlock_irqrestore(rdp, flags);
+	if (needwake_gp)
+		rcu_gp_kthread_wake();
+
+	if (needwake_state)
+		swake_up_one(&rdp->nocb_state_wq);
+
+	do {
+		swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
+						    nocb_cb_wait_cond(rdp));
+
+		// VVV Ensure CB invocation follows _sleep test.
+		if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
+			WARN_ON(signal_pending(current));
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+		}
+	} while (!nocb_cb_can_run(rdp));
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
+ * nocb_cb_wait() to do the dirty work.
+ */
+static int rcu_nocb_cb_kthread(void *arg)
+{
+	struct rcu_data *rdp = arg;
+
+	// Each pass through this loop does one callback batch, and,
+	// if there are no more ready callbacks, waits for them.
+	for (;;) {
+		nocb_cb_wait(rdp);
+		cond_resched_tasks_rcu_qs();
+	}
+	return 0;
+}
+
+/* Is a deferred wakeup of rcu_nocb_kthread() required? */
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
+{
+	return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread(). */
+static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
+					   struct rcu_data *rdp, int level,
+					   unsigned long flags)
+	__releases(rdp_gp->nocb_gp_lock)
+{
+	int ndw;
+	int ret;
+
+	if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
+		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+		return false;
+	}
+
+	ndw = rdp_gp->nocb_defer_wakeup;
+	ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
+
+	return ret;
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
+static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
+{
+	unsigned long flags;
+	struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
+
+	WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
+
+	raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
+	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
+	do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
+}
+
+/*
+ * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
+ * This means we do an inexact common-case check.  Note that if
+ * we miss, ->nocb_timer will eventually clean things up.
+ */
+static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
+		return false;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
+}
+
+void rcu_nocb_flush_deferred_wakeup(void)
+{
+	do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
+
+static int rdp_offload_toggle(struct rcu_data *rdp,
+			       bool offload, unsigned long flags)
+	__releases(rdp->nocb_lock)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+	bool wake_gp = false;
+
+	rcu_segcblist_offload(cblist, offload);
+
+	if (rdp->nocb_cb_sleep)
+		rdp->nocb_cb_sleep = false;
+	rcu_nocb_unlock_irqrestore(rdp, flags);
+
+	/*
+	 * Ignore former value of nocb_cb_sleep and force wake up as it could
+	 * have been spuriously set to false already.
+	 */
+	swake_up_one(&rdp->nocb_cb_wq);
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	if (rdp_gp->nocb_gp_sleep) {
+		rdp_gp->nocb_gp_sleep = false;
+		wake_gp = true;
+	}
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+	if (wake_gp)
+		wake_up_process(rdp_gp->nocb_gp_kthread);
+
+	return 0;
+}
+
+static long rcu_nocb_rdp_deoffload(void *arg)
+{
+	struct rcu_data *rdp = arg;
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long flags;
+	int ret;
+
+	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+
+	pr_info("De-offloading %d\n", rdp->cpu);
+
+	rcu_nocb_lock_irqsave(rdp, flags);
+	/*
+	 * Flush once and for all now. This suffices because we are
+	 * running on the target CPU holding ->nocb_lock (thus having
+	 * interrupts disabled), and because rdp_offload_toggle()
+	 * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
+	 * Thus future calls to rcu_segcblist_completely_offloaded() will
+	 * return false, which means that future calls to rcu_nocb_try_bypass()
+	 * will refuse to put anything into the bypass.
+	 */
+	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+	ret = rdp_offload_toggle(rdp, false, flags);
+	swait_event_exclusive(rdp->nocb_state_wq,
+			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
+							SEGCBLIST_KTHREAD_GP));
+	/*
+	 * Lock one last time to acquire latest callback updates from kthreads
+	 * so we can later handle callbacks locally without locking.
+	 */
+	rcu_nocb_lock_irqsave(rdp, flags);
+	/*
+	 * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
+	 * lock is released but how about being paranoid for once?
+	 */
+	rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
+	/*
+	 * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
+	 * rcu_nocb_unlock_irqrestore() anymore.
+	 */
+	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+
+	/* Sanity check */
+	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+
+
+	return ret;
+}
+
+int rcu_nocb_cpu_deoffload(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int ret = 0;
+
+	mutex_lock(&rcu_state.barrier_mutex);
+	cpus_read_lock();
+	if (rcu_rdp_is_offloaded(rdp)) {
+		if (cpu_online(cpu)) {
+			ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
+			if (!ret)
+				cpumask_clear_cpu(cpu, rcu_nocb_mask);
+		} else {
+			pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
+			ret = -EINVAL;
+		}
+	}
+	cpus_read_unlock();
+	mutex_unlock(&rcu_state.barrier_mutex);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
+
+static long rcu_nocb_rdp_offload(void *arg)
+{
+	struct rcu_data *rdp = arg;
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long flags;
+	int ret;
+
+	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+	/*
+	 * For now we only support re-offload, ie: the rdp must have been
+	 * offloaded on boot first.
+	 */
+	if (!rdp->nocb_gp_rdp)
+		return -EINVAL;
+
+	pr_info("Offloading %d\n", rdp->cpu);
+	/*
+	 * Can't use rcu_nocb_lock_irqsave() while we are in
+	 * SEGCBLIST_SOFTIRQ_ONLY mode.
+	 */
+	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+
+	/*
+	 * We didn't take the nocb lock while working on the
+	 * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
+	 * Every modifications that have been done previously on
+	 * rdp->cblist must be visible remotely by the nocb kthreads
+	 * upon wake up after reading the cblist flags.
+	 *
+	 * The layout against nocb_lock enforces that ordering:
+	 *
+	 *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
+	 * -------------------------   ----------------------------
+	 *      WRITE callbacks           rcu_nocb_lock()
+	 *      rcu_nocb_lock()           READ flags
+	 *      WRITE flags               READ callbacks
+	 *      rcu_nocb_unlock()         rcu_nocb_unlock()
+	 */
+	ret = rdp_offload_toggle(rdp, true, flags);
+	swait_event_exclusive(rdp->nocb_state_wq,
+			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
+			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+
+	return ret;
+}
+
+int rcu_nocb_cpu_offload(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int ret = 0;
+
+	mutex_lock(&rcu_state.barrier_mutex);
+	cpus_read_lock();
+	if (!rcu_rdp_is_offloaded(rdp)) {
+		if (cpu_online(cpu)) {
+			ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
+			if (!ret)
+				cpumask_set_cpu(cpu, rcu_nocb_mask);
+		} else {
+			pr_info("NOCB: Can't CB-offload an offline CPU\n");
+			ret = -EINVAL;
+		}
+	}
+	cpus_read_unlock();
+	mutex_unlock(&rcu_state.barrier_mutex);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
+
+void __init rcu_init_nohz(void)
+{
+	int cpu;
+	bool need_rcu_nocb_mask = false;
+	struct rcu_data *rdp;
+
+#if defined(CONFIG_NO_HZ_FULL)
+	if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
+		need_rcu_nocb_mask = true;
+#endif /* #if defined(CONFIG_NO_HZ_FULL) */
+
+	if (!cpumask_available(rcu_nocb_mask) && need_rcu_nocb_mask) {
+		if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
+			pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
+			return;
+		}
+	}
+	if (!cpumask_available(rcu_nocb_mask))
+		return;
+
+#if defined(CONFIG_NO_HZ_FULL)
+	if (tick_nohz_full_running)
+		cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
+#endif /* #if defined(CONFIG_NO_HZ_FULL) */
+
+	if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
+		pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
+		cpumask_and(rcu_nocb_mask, cpu_possible_mask,
+			    rcu_nocb_mask);
+	}
+	if (cpumask_empty(rcu_nocb_mask))
+		pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
+	else
+		pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
+			cpumask_pr_args(rcu_nocb_mask));
+	if (rcu_nocb_poll)
+		pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
+
+	for_each_cpu(cpu, rcu_nocb_mask) {
+		rdp = per_cpu_ptr(&rcu_data, cpu);
+		if (rcu_segcblist_empty(&rdp->cblist))
+			rcu_segcblist_init(&rdp->cblist);
+		rcu_segcblist_offload(&rdp->cblist, true);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
+	}
+	rcu_organize_nocb_kthreads();
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+	init_swait_queue_head(&rdp->nocb_cb_wq);
+	init_swait_queue_head(&rdp->nocb_gp_wq);
+	init_swait_queue_head(&rdp->nocb_state_wq);
+	raw_spin_lock_init(&rdp->nocb_lock);
+	raw_spin_lock_init(&rdp->nocb_bypass_lock);
+	raw_spin_lock_init(&rdp->nocb_gp_lock);
+	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
+	rcu_cblist_init(&rdp->nocb_bypass);
+}
+
+/*
+ * If the specified CPU is a no-CBs CPU that does not already have its
+ * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
+ * for this CPU's group has not yet been created, spawn it as well.
+ */
+static void rcu_spawn_one_nocb_kthread(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	struct rcu_data *rdp_gp;
+	struct task_struct *t;
+
+	/*
+	 * If this isn't a no-CBs CPU or if it already has an rcuo kthread,
+	 * then nothing to do.
+	 */
+	if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
+		return;
+
+	/* If we didn't spawn the GP kthread first, reorganize! */
+	rdp_gp = rdp->nocb_gp_rdp;
+	if (!rdp_gp->nocb_gp_kthread) {
+		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
+				"rcuog/%d", rdp_gp->cpu);
+		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
+			return;
+		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
+	}
+
+	/* Spawn the kthread for this CPU. */
+	t = kthread_run(rcu_nocb_cb_kthread, rdp,
+			"rcuo%c/%d", rcu_state.abbr, cpu);
+	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
+		return;
+	WRITE_ONCE(rdp->nocb_cb_kthread, t);
+	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
+}
+
+/*
+ * If the specified CPU is a no-CBs CPU that does not already have its
+ * rcuo kthread, spawn it.
+ */
+static void rcu_spawn_cpu_nocb_kthread(int cpu)
+{
+	if (rcu_scheduler_fully_active)
+		rcu_spawn_one_nocb_kthread(cpu);
+}
+
+/*
+ * Once the scheduler is running, spawn rcuo kthreads for all online
+ * no-CBs CPUs.  This assumes that the early_initcall()s happen before
+ * non-boot CPUs come online -- if this changes, we will need to add
+ * some mutual exclusion.
+ */
+static void __init rcu_spawn_nocb_kthreads(void)
+{
+	int cpu;
+
+	for_each_online_cpu(cpu)
+		rcu_spawn_cpu_nocb_kthread(cpu);
+}
+
+/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
+static int rcu_nocb_gp_stride = -1;
+module_param(rcu_nocb_gp_stride, int, 0444);
+
+/*
+ * Initialize GP-CB relationships for all no-CBs CPU.
+ */
+static void __init rcu_organize_nocb_kthreads(void)
+{
+	int cpu;
+	bool firsttime = true;
+	bool gotnocbs = false;
+	bool gotnocbscbs = true;
+	int ls = rcu_nocb_gp_stride;
+	int nl = 0;  /* Next GP kthread. */
+	struct rcu_data *rdp;
+	struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
+	struct rcu_data *rdp_prev = NULL;
+
+	if (!cpumask_available(rcu_nocb_mask))
+		return;
+	if (ls == -1) {
+		ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
+		rcu_nocb_gp_stride = ls;
+	}
+
+	/*
+	 * Each pass through this loop sets up one rcu_data structure.
+	 * Should the corresponding CPU come online in the future, then
+	 * we will spawn the needed set of rcu_nocb_kthread() kthreads.
+	 */
+	for_each_cpu(cpu, rcu_nocb_mask) {
+		rdp = per_cpu_ptr(&rcu_data, cpu);
+		if (rdp->cpu >= nl) {
+			/* New GP kthread, set up for CBs & next GP. */
+			gotnocbs = true;
+			nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
+			rdp->nocb_gp_rdp = rdp;
+			rdp_gp = rdp;
+			if (dump_tree) {
+				if (!firsttime)
+					pr_cont("%s\n", gotnocbscbs
+							? "" : " (self only)");
+				gotnocbscbs = false;
+				firsttime = false;
+				pr_alert("%s: No-CB GP kthread CPU %d:",
+					 __func__, cpu);
+			}
+		} else {
+			/* Another CB kthread, link to previous GP kthread. */
+			gotnocbscbs = true;
+			rdp->nocb_gp_rdp = rdp_gp;
+			rdp_prev->nocb_next_cb_rdp = rdp;
+			if (dump_tree)
+				pr_cont(" %d", cpu);
+		}
+		rdp_prev = rdp;
+	}
+	if (gotnocbs && dump_tree)
+		pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
+}
+
+/*
+ * Bind the current task to the offloaded CPUs.  If there are no offloaded
+ * CPUs, leave the task unbound.  Splat if the bind attempt fails.
+ */
+void rcu_bind_current_to_nocb(void)
+{
+	if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
+		WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
+}
+EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
+
+// The ->on_cpu field is available only in CONFIG_SMP=y, so...
+#ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+	return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
+}
+#else // #ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+	return "";
+}
+#endif // #else #ifdef CONFIG_SMP
+
+/*
+ * Dump out nocb grace-period kthread state for the specified rcu_data
+ * structure.
+ */
+static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
+{
+	struct rcu_node *rnp = rdp->mynode;
+
+	pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
+		rdp->cpu,
+		"kK"[!!rdp->nocb_gp_kthread],
+		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
+		"dD"[!!rdp->nocb_defer_wakeup],
+		"tT"[timer_pending(&rdp->nocb_timer)],
+		"sS"[!!rdp->nocb_gp_sleep],
+		".W"[swait_active(&rdp->nocb_gp_wq)],
+		".W"[swait_active(&rnp->nocb_gp_wq[0])],
+		".W"[swait_active(&rnp->nocb_gp_wq[1])],
+		".B"[!!rdp->nocb_gp_bypass],
+		".G"[!!rdp->nocb_gp_gp],
+		(long)rdp->nocb_gp_seq,
+		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
+		rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
+		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
+}
+
+/* Dump out nocb kthread state for the specified rcu_data structure. */
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+	char bufw[20];
+	char bufr[20];
+	struct rcu_segcblist *rsclp = &rdp->cblist;
+	bool waslocked;
+	bool wassleep;
+
+	if (rdp->nocb_gp_rdp == rdp)
+		show_rcu_nocb_gp_state(rdp);
+
+	sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
+	sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
+	pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
+		rdp->cpu, rdp->nocb_gp_rdp->cpu,
+		rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
+		"kK"[!!rdp->nocb_cb_kthread],
+		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
+		"cC"[!!atomic_read(&rdp->nocb_lock_contended)],
+		"lL"[raw_spin_is_locked(&rdp->nocb_lock)],
+		"sS"[!!rdp->nocb_cb_sleep],
+		".W"[swait_active(&rdp->nocb_cb_wq)],
+		jiffies - rdp->nocb_bypass_first,
+		jiffies - rdp->nocb_nobypass_last,
+		rdp->nocb_nobypass_count,
+		".D"[rcu_segcblist_ready_cbs(rsclp)],
+		".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
+		rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
+		".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
+		rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
+		".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
+		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
+		rcu_segcblist_n_cbs(&rdp->cblist),
+		rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
+		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
+
+	/* It is OK for GP kthreads to have GP state. */
+	if (rdp->nocb_gp_rdp == rdp)
+		return;
+
+	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
+	wassleep = swait_active(&rdp->nocb_gp_wq);
+	if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
+		return;  /* Nothing untoward. */
+
+	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
+		"lL"[waslocked],
+		"dD"[!!rdp->nocb_defer_wakeup],
+		"sS"[!!rdp->nocb_gp_sleep],
+		".W"[wassleep]);
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
+{
+	return 0;
+}
+
+static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
+{
+	return false;
+}
+
+/* No ->nocb_lock to acquire.  */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release.  */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release.  */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+				       unsigned long flags)
+{
+	local_irq_restore(flags);
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+}
+
+static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
+{
+}
+
+static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
+{
+	return NULL;
+}
+
+static void rcu_init_one_nocb(struct rcu_node *rnp)
+{
+}
+
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				  unsigned long j)
+{
+	return true;
+}
+
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				bool *was_alldone, unsigned long flags)
+{
+	return false;
+}
+
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
+				 unsigned long flags)
+{
+	WARN_ON_ONCE(1);  /* Should be dead code! */
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
+{
+	return false;
+}
+
+static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+	return false;
+}
+
+static void rcu_spawn_cpu_nocb_kthread(int cpu)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(void)
+{
+}
+
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index de1dc3b..7a4876a 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -13,39 +13,6 @@
 
 #include "../locking/rtmutex_common.h"
 
-#ifdef CONFIG_RCU_NOCB_CPU
-static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
-static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
-static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
-{
-	return lockdep_is_held(&rdp->nocb_lock);
-}
-
-static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
-{
-	/* Race on early boot between thread creation and assignment */
-	if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
-		return true;
-
-	if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
-		if (in_task())
-			return true;
-	return false;
-}
-
-#else
-static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
-{
-	return 0;
-}
-
-static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
-{
-	return false;
-}
-
-#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
-
 static bool rcu_rdp_is_offloaded(struct rcu_data *rdp)
 {
 	/*
@@ -346,7 +313,7 @@
 
 	trace_rcu_utilization(TPS("Start context switch"));
 	lockdep_assert_irqs_disabled();
-	WARN_ON_ONCE(!preempt && rcu_preempt_depth() > 0);
+	WARN_ONCE(!preempt && rcu_preempt_depth() > 0, "Voluntary context switch within RCU read-side critical section!");
 	if (rcu_preempt_depth() > 0 &&
 	    !t->rcu_read_unlock_special.b.blocked) {
 
@@ -405,17 +372,20 @@
 
 static void rcu_preempt_read_enter(void)
 {
-	current->rcu_read_lock_nesting++;
+	WRITE_ONCE(current->rcu_read_lock_nesting, READ_ONCE(current->rcu_read_lock_nesting) + 1);
 }
 
 static int rcu_preempt_read_exit(void)
 {
-	return --current->rcu_read_lock_nesting;
+	int ret = READ_ONCE(current->rcu_read_lock_nesting) - 1;
+
+	WRITE_ONCE(current->rcu_read_lock_nesting, ret);
+	return ret;
 }
 
 static void rcu_preempt_depth_set(int val)
 {
-	current->rcu_read_lock_nesting = val;
+	WRITE_ONCE(current->rcu_read_lock_nesting, val);
 }
 
 /*
@@ -1479,1460 +1449,6 @@
 
 #endif /* #else #if !defined(CONFIG_RCU_FAST_NO_HZ) */
 
-#ifdef CONFIG_RCU_NOCB_CPU
-
-/*
- * Offload callback processing from the boot-time-specified set of CPUs
- * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
- * created that pull the callbacks from the corresponding CPU, wait for
- * a grace period to elapse, and invoke the callbacks.  These kthreads
- * are organized into GP kthreads, which manage incoming callbacks, wait for
- * grace periods, and awaken CB kthreads, and the CB kthreads, which only
- * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
- * do a wake_up() on their GP kthread when they insert a callback into any
- * empty list, unless the rcu_nocb_poll boot parameter has been specified,
- * in which case each kthread actively polls its CPU.  (Which isn't so great
- * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
- *
- * This is intended to be used in conjunction with Frederic Weisbecker's
- * adaptive-idle work, which would seriously reduce OS jitter on CPUs
- * running CPU-bound user-mode computations.
- *
- * Offloading of callbacks can also be used as an energy-efficiency
- * measure because CPUs with no RCU callbacks queued are more aggressive
- * about entering dyntick-idle mode.
- */
-
-
-/*
- * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
- * If the list is invalid, a warning is emitted and all CPUs are offloaded.
- */
-static int __init rcu_nocb_setup(char *str)
-{
-	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
-	if (cpulist_parse(str, rcu_nocb_mask)) {
-		pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
-		cpumask_setall(rcu_nocb_mask);
-	}
-	return 1;
-}
-__setup("rcu_nocbs=", rcu_nocb_setup);
-
-static int __init parse_rcu_nocb_poll(char *arg)
-{
-	rcu_nocb_poll = true;
-	return 0;
-}
-early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
-
-/*
- * Don't bother bypassing ->cblist if the call_rcu() rate is low.
- * After all, the main point of bypassing is to avoid lock contention
- * on ->nocb_lock, which only can happen at high call_rcu() rates.
- */
-static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
-module_param(nocb_nobypass_lim_per_jiffy, int, 0);
-
-/*
- * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
- * lock isn't immediately available, increment ->nocb_lock_contended to
- * flag the contention.
- */
-static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
-	__acquires(&rdp->nocb_bypass_lock)
-{
-	lockdep_assert_irqs_disabled();
-	if (raw_spin_trylock(&rdp->nocb_bypass_lock))
-		return;
-	atomic_inc(&rdp->nocb_lock_contended);
-	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
-	smp_mb__after_atomic(); /* atomic_inc() before lock. */
-	raw_spin_lock(&rdp->nocb_bypass_lock);
-	smp_mb__before_atomic(); /* atomic_dec() after lock. */
-	atomic_dec(&rdp->nocb_lock_contended);
-}
-
-/*
- * Spinwait until the specified rcu_data structure's ->nocb_lock is
- * not contended.  Please note that this is extremely special-purpose,
- * relying on the fact that at most two kthreads and one CPU contend for
- * this lock, and also that the two kthreads are guaranteed to have frequent
- * grace-period-duration time intervals between successive acquisitions
- * of the lock.  This allows us to use an extremely simple throttling
- * mechanism, and further to apply it only to the CPU doing floods of
- * call_rcu() invocations.  Don't try this at home!
- */
-static void rcu_nocb_wait_contended(struct rcu_data *rdp)
-{
-	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
-	while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
-		cpu_relax();
-}
-
-/*
- * Conditionally acquire the specified rcu_data structure's
- * ->nocb_bypass_lock.
- */
-static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-	return raw_spin_trylock(&rdp->nocb_bypass_lock);
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_bypass_lock.
- */
-static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
-	__releases(&rdp->nocb_bypass_lock)
-{
-	lockdep_assert_irqs_disabled();
-	raw_spin_unlock(&rdp->nocb_bypass_lock);
-}
-
-/*
- * Acquire the specified rcu_data structure's ->nocb_lock, but only
- * if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_lock(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-	if (!rcu_rdp_is_offloaded(rdp))
-		return;
-	raw_spin_lock(&rdp->nocb_lock);
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_lock, but only
- * if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_unlock(struct rcu_data *rdp)
-{
-	if (rcu_rdp_is_offloaded(rdp)) {
-		lockdep_assert_irqs_disabled();
-		raw_spin_unlock(&rdp->nocb_lock);
-	}
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_lock and restore
- * interrupts, but only if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
-				       unsigned long flags)
-{
-	if (rcu_rdp_is_offloaded(rdp)) {
-		lockdep_assert_irqs_disabled();
-		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
-	} else {
-		local_irq_restore(flags);
-	}
-}
-
-/* Lockdep check that ->cblist may be safely accessed. */
-static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-	if (rcu_rdp_is_offloaded(rdp))
-		lockdep_assert_held(&rdp->nocb_lock);
-}
-
-/*
- * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
- * grace period.
- */
-static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
-{
-	swake_up_all(sq);
-}
-
-static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
-{
-	return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
-}
-
-static void rcu_init_one_nocb(struct rcu_node *rnp)
-{
-	init_swait_queue_head(&rnp->nocb_gp_wq[0]);
-	init_swait_queue_head(&rnp->nocb_gp_wq[1]);
-}
-
-/* Is the specified CPU a no-CBs CPU? */
-bool rcu_is_nocb_cpu(int cpu)
-{
-	if (cpumask_available(rcu_nocb_mask))
-		return cpumask_test_cpu(cpu, rcu_nocb_mask);
-	return false;
-}
-
-static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
-			   struct rcu_data *rdp,
-			   bool force, unsigned long flags)
-	__releases(rdp_gp->nocb_gp_lock)
-{
-	bool needwake = false;
-
-	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
-		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-				    TPS("AlreadyAwake"));
-		return false;
-	}
-
-	if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
-		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-		del_timer(&rdp_gp->nocb_timer);
-	}
-
-	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
-		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
-		needwake = true;
-	}
-	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-	if (needwake) {
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
-		wake_up_process(rdp_gp->nocb_gp_kthread);
-	}
-
-	return needwake;
-}
-
-/*
- * Kick the GP kthread for this NOCB group.
- */
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
-{
-	unsigned long flags;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
-}
-
-/*
- * Arrange to wake the GP kthread for this NOCB group at some future
- * time when it is safe to do so.
- */
-static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
-			       const char *reason)
-{
-	unsigned long flags;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-
-	/*
-	 * Bypass wakeup overrides previous deferments. In case
-	 * of callback storm, no need to wake up too early.
-	 */
-	if (waketype == RCU_NOCB_WAKE_BYPASS) {
-		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
-		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
-	} else {
-		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
-			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
-		if (rdp_gp->nocb_defer_wakeup < waketype)
-			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
-	}
-
-	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
-}
-
-/*
- * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
- * However, if there is a callback to be enqueued and if ->nocb_bypass
- * proves to be initially empty, just return false because the no-CB GP
- * kthread may need to be awakened in this case.
- *
- * Note that this function always returns true if rhp is NULL.
- */
-static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				     unsigned long j)
-{
-	struct rcu_cblist rcl;
-
-	WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
-	rcu_lockdep_assert_cblist_protected(rdp);
-	lockdep_assert_held(&rdp->nocb_bypass_lock);
-	if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
-		raw_spin_unlock(&rdp->nocb_bypass_lock);
-		return false;
-	}
-	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
-	if (rhp)
-		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
-	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
-	WRITE_ONCE(rdp->nocb_bypass_first, j);
-	rcu_nocb_bypass_unlock(rdp);
-	return true;
-}
-
-/*
- * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
- * However, if there is a callback to be enqueued and if ->nocb_bypass
- * proves to be initially empty, just return false because the no-CB GP
- * kthread may need to be awakened in this case.
- *
- * Note that this function always returns true if rhp is NULL.
- */
-static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j)
-{
-	if (!rcu_rdp_is_offloaded(rdp))
-		return true;
-	rcu_lockdep_assert_cblist_protected(rdp);
-	rcu_nocb_bypass_lock(rdp);
-	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
-}
-
-/*
- * If the ->nocb_bypass_lock is immediately available, flush the
- * ->nocb_bypass queue into ->cblist.
- */
-static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
-{
-	rcu_lockdep_assert_cblist_protected(rdp);
-	if (!rcu_rdp_is_offloaded(rdp) ||
-	    !rcu_nocb_bypass_trylock(rdp))
-		return;
-	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
-}
-
-/*
- * See whether it is appropriate to use the ->nocb_bypass list in order
- * to control contention on ->nocb_lock.  A limited number of direct
- * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
- * is non-empty, further callbacks must be placed into ->nocb_bypass,
- * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
- * back to direct use of ->cblist.  However, ->nocb_bypass should not be
- * used if ->cblist is empty, because otherwise callbacks can be stranded
- * on ->nocb_bypass because we cannot count on the current CPU ever again
- * invoking call_rcu().  The general rule is that if ->nocb_bypass is
- * non-empty, the corresponding no-CBs grace-period kthread must not be
- * in an indefinite sleep state.
- *
- * Finally, it is not permitted to use the bypass during early boot,
- * as doing so would confuse the auto-initialization code.  Besides
- * which, there is no point in worrying about lock contention while
- * there is only one CPU in operation.
- */
-static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags)
-{
-	unsigned long c;
-	unsigned long cur_gp_seq;
-	unsigned long j = jiffies;
-	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-
-	lockdep_assert_irqs_disabled();
-
-	// Pure softirq/rcuc based processing: no bypassing, no
-	// locking.
-	if (!rcu_rdp_is_offloaded(rdp)) {
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		return false;
-	}
-
-	// In the process of (de-)offloading: no bypassing, but
-	// locking.
-	if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
-		rcu_nocb_lock(rdp);
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		return false; /* Not offloaded, no bypassing. */
-	}
-
-	// Don't use ->nocb_bypass during early boot.
-	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
-		rcu_nocb_lock(rdp);
-		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		return false;
-	}
-
-	// If we have advanced to a new jiffy, reset counts to allow
-	// moving back from ->nocb_bypass to ->cblist.
-	if (j == rdp->nocb_nobypass_last) {
-		c = rdp->nocb_nobypass_count + 1;
-	} else {
-		WRITE_ONCE(rdp->nocb_nobypass_last, j);
-		c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
-		if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
-				 nocb_nobypass_lim_per_jiffy))
-			c = 0;
-		else if (c > nocb_nobypass_lim_per_jiffy)
-			c = nocb_nobypass_lim_per_jiffy;
-	}
-	WRITE_ONCE(rdp->nocb_nobypass_count, c);
-
-	// If there hasn't yet been all that many ->cblist enqueues
-	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
-	// ->nocb_bypass first.
-	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
-		rcu_nocb_lock(rdp);
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		if (*was_alldone)
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstQ"));
-		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
-		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-		return false; // Caller must enqueue the callback.
-	}
-
-	// If ->nocb_bypass has been used too long or is too full,
-	// flush ->nocb_bypass to ->cblist.
-	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
-	    ncbs >= qhimark) {
-		rcu_nocb_lock(rdp);
-		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
-			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-			if (*was_alldone)
-				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-						    TPS("FirstQ"));
-			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-			return false; // Caller must enqueue the callback.
-		}
-		if (j != rdp->nocb_gp_adv_time &&
-		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
-			rcu_advance_cbs_nowake(rdp->mynode, rdp);
-			rdp->nocb_gp_adv_time = j;
-		}
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		return true; // Callback already enqueued.
-	}
-
-	// We need to use the bypass.
-	rcu_nocb_wait_contended(rdp);
-	rcu_nocb_bypass_lock(rdp);
-	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
-	if (!ncbs) {
-		WRITE_ONCE(rdp->nocb_bypass_first, j);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
-	}
-	rcu_nocb_bypass_unlock(rdp);
-	smp_mb(); /* Order enqueue before wake. */
-	if (ncbs) {
-		local_irq_restore(flags);
-	} else {
-		// No-CBs GP kthread might be indefinitely asleep, if so, wake.
-		rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
-		if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstBQwake"));
-			__call_rcu_nocb_wake(rdp, true, flags);
-		} else {
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstBQnoWake"));
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-		}
-	}
-	return true; // Callback already enqueued.
-}
-
-/*
- * Awaken the no-CBs grace-period kthread if needed, either due to it
- * legitimately being asleep or due to overload conditions.
- *
- * If warranted, also wake up the kthread servicing this CPUs queues.
- */
-static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
-				 unsigned long flags)
-				 __releases(rdp->nocb_lock)
-{
-	unsigned long cur_gp_seq;
-	unsigned long j;
-	long len;
-	struct task_struct *t;
-
-	// If we are being polled or there is no kthread, just leave.
-	t = READ_ONCE(rdp->nocb_gp_kthread);
-	if (rcu_nocb_poll || !t) {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-				    TPS("WakeNotPoll"));
-		return;
-	}
-	// Need to actually to a wakeup.
-	len = rcu_segcblist_n_cbs(&rdp->cblist);
-	if (was_alldone) {
-		rdp->qlen_last_fqs_check = len;
-		if (!irqs_disabled_flags(flags)) {
-			/* ... if queue was empty ... */
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			wake_nocb_gp(rdp, false);
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("WakeEmpty"));
-		} else {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
-					   TPS("WakeEmptyIsDeferred"));
-		}
-	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
-		/* ... or if many callbacks queued. */
-		rdp->qlen_last_fqs_check = len;
-		j = jiffies;
-		if (j != rdp->nocb_gp_adv_time &&
-		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
-			rcu_advance_cbs_nowake(rdp->mynode, rdp);
-			rdp->nocb_gp_adv_time = j;
-		}
-		smp_mb(); /* Enqueue before timer_pending(). */
-		if ((rdp->nocb_cb_sleep ||
-		     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
-		    !timer_pending(&rdp->nocb_timer)) {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
-					   TPS("WakeOvfIsDeferred"));
-		} else {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
-		}
-	} else {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
-	}
-	return;
-}
-
-/*
- * Check if we ignore this rdp.
- *
- * We check that without holding the nocb lock but
- * we make sure not to miss a freshly offloaded rdp
- * with the current ordering:
- *
- *  rdp_offload_toggle()        nocb_gp_enabled_cb()
- * -------------------------   ----------------------------
- *    WRITE flags                 LOCK nocb_gp_lock
- *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
- *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
- *    UNLOCK nocb_gp_lock         READ flags
- */
-static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
-{
-	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
-
-	return rcu_segcblist_test_flags(&rdp->cblist, flags);
-}
-
-static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
-						     bool *needwake_state)
-{
-	struct rcu_segcblist *cblist = &rdp->cblist;
-
-	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
-		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
-			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
-			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
-				*needwake_state = true;
-		}
-		return false;
-	}
-
-	/*
-	 * De-offloading. Clear our flag and notify the de-offload worker.
-	 * We will ignore this rdp until it ever gets re-offloaded.
-	 */
-	WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
-	rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
-	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
-		*needwake_state = true;
-	return true;
-}
-
-
-/*
- * No-CBs GP kthreads come here to wait for additional callbacks to show up
- * or for grace periods to end.
- */
-static void nocb_gp_wait(struct rcu_data *my_rdp)
-{
-	bool bypass = false;
-	long bypass_ncbs;
-	int __maybe_unused cpu = my_rdp->cpu;
-	unsigned long cur_gp_seq;
-	unsigned long flags;
-	bool gotcbs = false;
-	unsigned long j = jiffies;
-	bool needwait_gp = false; // This prevents actual uninitialized use.
-	bool needwake;
-	bool needwake_gp;
-	struct rcu_data *rdp;
-	struct rcu_node *rnp;
-	unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
-	bool wasempty = false;
-
-	/*
-	 * Each pass through the following loop checks for CBs and for the
-	 * nearest grace period (if any) to wait for next.  The CB kthreads
-	 * and the global grace-period kthread are awakened if needed.
-	 */
-	WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
-	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
-		bool needwake_state = false;
-
-		if (!nocb_gp_enabled_cb(rdp))
-			continue;
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
-		rcu_nocb_lock_irqsave(rdp, flags);
-		if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			if (needwake_state)
-				swake_up_one(&rdp->nocb_state_wq);
-			continue;
-		}
-		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-		if (bypass_ncbs &&
-		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
-		     bypass_ncbs > 2 * qhimark)) {
-			// Bypass full or old, so flush it.
-			(void)rcu_nocb_try_flush_bypass(rdp, j);
-			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			if (needwake_state)
-				swake_up_one(&rdp->nocb_state_wq);
-			continue; /* No callbacks here, try next. */
-		}
-		if (bypass_ncbs) {
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("Bypass"));
-			bypass = true;
-		}
-		rnp = rdp->mynode;
-
-		// Advance callbacks if helpful and low contention.
-		needwake_gp = false;
-		if (!rcu_segcblist_restempty(&rdp->cblist,
-					     RCU_NEXT_READY_TAIL) ||
-		    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-		     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
-			raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
-			needwake_gp = rcu_advance_cbs(rnp, rdp);
-			wasempty = rcu_segcblist_restempty(&rdp->cblist,
-							   RCU_NEXT_READY_TAIL);
-			raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
-		}
-		// Need to wait on some grace period?
-		WARN_ON_ONCE(wasempty &&
-			     !rcu_segcblist_restempty(&rdp->cblist,
-						      RCU_NEXT_READY_TAIL));
-		if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
-			if (!needwait_gp ||
-			    ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
-				wait_gp_seq = cur_gp_seq;
-			needwait_gp = true;
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("NeedWaitGP"));
-		}
-		if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
-			needwake = rdp->nocb_cb_sleep;
-			WRITE_ONCE(rdp->nocb_cb_sleep, false);
-			smp_mb(); /* CB invocation -after- GP end. */
-		} else {
-			needwake = false;
-		}
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		if (needwake) {
-			swake_up_one(&rdp->nocb_cb_wq);
-			gotcbs = true;
-		}
-		if (needwake_gp)
-			rcu_gp_kthread_wake();
-		if (needwake_state)
-			swake_up_one(&rdp->nocb_state_wq);
-	}
-
-	my_rdp->nocb_gp_bypass = bypass;
-	my_rdp->nocb_gp_gp = needwait_gp;
-	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
-
-	if (bypass && !rcu_nocb_poll) {
-		// At least one child with non-empty ->nocb_bypass, so set
-		// timer in order to avoid stranding its callbacks.
-		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
-				   TPS("WakeBypassIsDeferred"));
-	}
-	if (rcu_nocb_poll) {
-		/* Polling, so trace if first poll in the series. */
-		if (gotcbs)
-			trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
-		schedule_timeout_idle(1);
-	} else if (!needwait_gp) {
-		/* Wait for callbacks to appear. */
-		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
-		swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
-				!READ_ONCE(my_rdp->nocb_gp_sleep));
-		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
-	} else {
-		rnp = my_rdp->mynode;
-		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
-		swait_event_interruptible_exclusive(
-			rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
-			rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
-			!READ_ONCE(my_rdp->nocb_gp_sleep));
-		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
-	}
-	if (!rcu_nocb_poll) {
-		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
-		if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
-			WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-			del_timer(&my_rdp->nocb_timer);
-		}
-		WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
-		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
-	}
-	my_rdp->nocb_gp_seq = -1;
-	WARN_ON(signal_pending(current));
-}
-
-/*
- * No-CBs grace-period-wait kthread.  There is one of these per group
- * of CPUs, but only once at least one CPU in that group has come online
- * at least once since boot.  This kthread checks for newly posted
- * callbacks from any of the CPUs it is responsible for, waits for a
- * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
- * that then have callback-invocation work to do.
- */
-static int rcu_nocb_gp_kthread(void *arg)
-{
-	struct rcu_data *rdp = arg;
-
-	for (;;) {
-		WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
-		nocb_gp_wait(rdp);
-		cond_resched_tasks_rcu_qs();
-	}
-	return 0;
-}
-
-static inline bool nocb_cb_can_run(struct rcu_data *rdp)
-{
-	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
-	return rcu_segcblist_test_flags(&rdp->cblist, flags);
-}
-
-static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
-{
-	return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
-}
-
-/*
- * Invoke any ready callbacks from the corresponding no-CBs CPU,
- * then, if there are no more, wait for more to appear.
- */
-static void nocb_cb_wait(struct rcu_data *rdp)
-{
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	unsigned long cur_gp_seq;
-	unsigned long flags;
-	bool needwake_state = false;
-	bool needwake_gp = false;
-	bool can_sleep = true;
-	struct rcu_node *rnp = rdp->mynode;
-
-	local_irq_save(flags);
-	rcu_momentary_dyntick_idle();
-	local_irq_restore(flags);
-	/*
-	 * Disable BH to provide the expected environment.  Also, when
-	 * transitioning to/from NOCB mode, a self-requeuing callback might
-	 * be invoked from softirq.  A short grace period could cause both
-	 * instances of this callback would execute concurrently.
-	 */
-	local_bh_disable();
-	rcu_do_batch(rdp);
-	local_bh_enable();
-	lockdep_assert_irqs_enabled();
-	rcu_nocb_lock_irqsave(rdp, flags);
-	if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
-	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
-	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
-		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
-		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
-	}
-
-	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
-		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
-			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
-			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
-				needwake_state = true;
-		}
-		if (rcu_segcblist_ready_cbs(cblist))
-			can_sleep = false;
-	} else {
-		/*
-		 * De-offloading. Clear our flag and notify the de-offload worker.
-		 * We won't touch the callbacks and keep sleeping until we ever
-		 * get re-offloaded.
-		 */
-		WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
-		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
-		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
-			needwake_state = true;
-	}
-
-	WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
-
-	if (rdp->nocb_cb_sleep)
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
-
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-	if (needwake_gp)
-		rcu_gp_kthread_wake();
-
-	if (needwake_state)
-		swake_up_one(&rdp->nocb_state_wq);
-
-	do {
-		swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
-						    nocb_cb_wait_cond(rdp));
-
-		// VVV Ensure CB invocation follows _sleep test.
-		if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
-			WARN_ON(signal_pending(current));
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
-		}
-	} while (!nocb_cb_can_run(rdp));
-}
-
-/*
- * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
- * nocb_cb_wait() to do the dirty work.
- */
-static int rcu_nocb_cb_kthread(void *arg)
-{
-	struct rcu_data *rdp = arg;
-
-	// Each pass through this loop does one callback batch, and,
-	// if there are no more ready callbacks, waits for them.
-	for (;;) {
-		nocb_cb_wait(rdp);
-		cond_resched_tasks_rcu_qs();
-	}
-	return 0;
-}
-
-/* Is a deferred wakeup of rcu_nocb_kthread() required? */
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
-{
-	return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
-}
-
-/* Do a deferred wakeup of rcu_nocb_kthread(). */
-static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
-					   struct rcu_data *rdp, int level,
-					   unsigned long flags)
-	__releases(rdp_gp->nocb_gp_lock)
-{
-	int ndw;
-	int ret;
-
-	if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
-		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-		return false;
-	}
-
-	ndw = rdp_gp->nocb_defer_wakeup;
-	ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
-
-	return ret;
-}
-
-/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
-static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
-{
-	unsigned long flags;
-	struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
-
-	WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
-
-	raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
-	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
-	do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
-}
-
-/*
- * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
- * This means we do an inexact common-case check.  Note that if
- * we miss, ->nocb_timer will eventually clean things up.
- */
-static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
-{
-	unsigned long flags;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-	if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
-		return false;
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-	return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
-}
-
-void rcu_nocb_flush_deferred_wakeup(void)
-{
-	do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
-
-static int rdp_offload_toggle(struct rcu_data *rdp,
-			       bool offload, unsigned long flags)
-	__releases(rdp->nocb_lock)
-{
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-	bool wake_gp = false;
-
-	rcu_segcblist_offload(cblist, offload);
-
-	if (rdp->nocb_cb_sleep)
-		rdp->nocb_cb_sleep = false;
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-
-	/*
-	 * Ignore former value of nocb_cb_sleep and force wake up as it could
-	 * have been spuriously set to false already.
-	 */
-	swake_up_one(&rdp->nocb_cb_wq);
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-	if (rdp_gp->nocb_gp_sleep) {
-		rdp_gp->nocb_gp_sleep = false;
-		wake_gp = true;
-	}
-	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-
-	if (wake_gp)
-		wake_up_process(rdp_gp->nocb_gp_kthread);
-
-	return 0;
-}
-
-static long rcu_nocb_rdp_deoffload(void *arg)
-{
-	struct rcu_data *rdp = arg;
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	unsigned long flags;
-	int ret;
-
-	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
-
-	pr_info("De-offloading %d\n", rdp->cpu);
-
-	rcu_nocb_lock_irqsave(rdp, flags);
-	/*
-	 * Flush once and for all now. This suffices because we are
-	 * running on the target CPU holding ->nocb_lock (thus having
-	 * interrupts disabled), and because rdp_offload_toggle()
-	 * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
-	 * Thus future calls to rcu_segcblist_completely_offloaded() will
-	 * return false, which means that future calls to rcu_nocb_try_bypass()
-	 * will refuse to put anything into the bypass.
-	 */
-	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
-	ret = rdp_offload_toggle(rdp, false, flags);
-	swait_event_exclusive(rdp->nocb_state_wq,
-			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
-							SEGCBLIST_KTHREAD_GP));
-	/*
-	 * Lock one last time to acquire latest callback updates from kthreads
-	 * so we can later handle callbacks locally without locking.
-	 */
-	rcu_nocb_lock_irqsave(rdp, flags);
-	/*
-	 * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
-	 * lock is released but how about being paranoid for once?
-	 */
-	rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
-	/*
-	 * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
-	 * rcu_nocb_unlock_irqrestore() anymore.
-	 */
-	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
-
-	/* Sanity check */
-	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-
-
-	return ret;
-}
-
-int rcu_nocb_cpu_deoffload(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	int ret = 0;
-
-	mutex_lock(&rcu_state.barrier_mutex);
-	cpus_read_lock();
-	if (rcu_rdp_is_offloaded(rdp)) {
-		if (cpu_online(cpu)) {
-			ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
-			if (!ret)
-				cpumask_clear_cpu(cpu, rcu_nocb_mask);
-		} else {
-			pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
-			ret = -EINVAL;
-		}
-	}
-	cpus_read_unlock();
-	mutex_unlock(&rcu_state.barrier_mutex);
-
-	return ret;
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
-
-static long rcu_nocb_rdp_offload(void *arg)
-{
-	struct rcu_data *rdp = arg;
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	unsigned long flags;
-	int ret;
-
-	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
-	/*
-	 * For now we only support re-offload, ie: the rdp must have been
-	 * offloaded on boot first.
-	 */
-	if (!rdp->nocb_gp_rdp)
-		return -EINVAL;
-
-	pr_info("Offloading %d\n", rdp->cpu);
-	/*
-	 * Can't use rcu_nocb_lock_irqsave() while we are in
-	 * SEGCBLIST_SOFTIRQ_ONLY mode.
-	 */
-	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
-
-	/*
-	 * We didn't take the nocb lock while working on the
-	 * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
-	 * Every modifications that have been done previously on
-	 * rdp->cblist must be visible remotely by the nocb kthreads
-	 * upon wake up after reading the cblist flags.
-	 *
-	 * The layout against nocb_lock enforces that ordering:
-	 *
-	 *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
-	 * -------------------------   ----------------------------
-	 *      WRITE callbacks           rcu_nocb_lock()
-	 *      rcu_nocb_lock()           READ flags
-	 *      WRITE flags               READ callbacks
-	 *      rcu_nocb_unlock()         rcu_nocb_unlock()
-	 */
-	ret = rdp_offload_toggle(rdp, true, flags);
-	swait_event_exclusive(rdp->nocb_state_wq,
-			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
-			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
-
-	return ret;
-}
-
-int rcu_nocb_cpu_offload(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	int ret = 0;
-
-	mutex_lock(&rcu_state.barrier_mutex);
-	cpus_read_lock();
-	if (!rcu_rdp_is_offloaded(rdp)) {
-		if (cpu_online(cpu)) {
-			ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
-			if (!ret)
-				cpumask_set_cpu(cpu, rcu_nocb_mask);
-		} else {
-			pr_info("NOCB: Can't CB-offload an offline CPU\n");
-			ret = -EINVAL;
-		}
-	}
-	cpus_read_unlock();
-	mutex_unlock(&rcu_state.barrier_mutex);
-
-	return ret;
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
-
-void __init rcu_init_nohz(void)
-{
-	int cpu;
-	bool need_rcu_nocb_mask = false;
-	struct rcu_data *rdp;
-
-#if defined(CONFIG_NO_HZ_FULL)
-	if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
-		need_rcu_nocb_mask = true;
-#endif /* #if defined(CONFIG_NO_HZ_FULL) */
-
-	if (!cpumask_available(rcu_nocb_mask) && need_rcu_nocb_mask) {
-		if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
-			pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
-			return;
-		}
-	}
-	if (!cpumask_available(rcu_nocb_mask))
-		return;
-
-#if defined(CONFIG_NO_HZ_FULL)
-	if (tick_nohz_full_running)
-		cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
-#endif /* #if defined(CONFIG_NO_HZ_FULL) */
-
-	if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
-		pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
-		cpumask_and(rcu_nocb_mask, cpu_possible_mask,
-			    rcu_nocb_mask);
-	}
-	if (cpumask_empty(rcu_nocb_mask))
-		pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
-	else
-		pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
-			cpumask_pr_args(rcu_nocb_mask));
-	if (rcu_nocb_poll)
-		pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
-
-	for_each_cpu(cpu, rcu_nocb_mask) {
-		rdp = per_cpu_ptr(&rcu_data, cpu);
-		if (rcu_segcblist_empty(&rdp->cblist))
-			rcu_segcblist_init(&rdp->cblist);
-		rcu_segcblist_offload(&rdp->cblist, true);
-		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
-		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
-	}
-	rcu_organize_nocb_kthreads();
-}
-
-/* Initialize per-rcu_data variables for no-CBs CPUs. */
-static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
-{
-	init_swait_queue_head(&rdp->nocb_cb_wq);
-	init_swait_queue_head(&rdp->nocb_gp_wq);
-	init_swait_queue_head(&rdp->nocb_state_wq);
-	raw_spin_lock_init(&rdp->nocb_lock);
-	raw_spin_lock_init(&rdp->nocb_bypass_lock);
-	raw_spin_lock_init(&rdp->nocb_gp_lock);
-	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
-	rcu_cblist_init(&rdp->nocb_bypass);
-}
-
-/*
- * If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
- * for this CPU's group has not yet been created, spawn it as well.
- */
-static void rcu_spawn_one_nocb_kthread(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	struct rcu_data *rdp_gp;
-	struct task_struct *t;
-
-	/*
-	 * If this isn't a no-CBs CPU or if it already has an rcuo kthread,
-	 * then nothing to do.
-	 */
-	if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
-		return;
-
-	/* If we didn't spawn the GP kthread first, reorganize! */
-	rdp_gp = rdp->nocb_gp_rdp;
-	if (!rdp_gp->nocb_gp_kthread) {
-		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
-				"rcuog/%d", rdp_gp->cpu);
-		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
-			return;
-		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
-	}
-
-	/* Spawn the kthread for this CPU. */
-	t = kthread_run(rcu_nocb_cb_kthread, rdp,
-			"rcuo%c/%d", rcu_state.abbr, cpu);
-	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
-		return;
-	WRITE_ONCE(rdp->nocb_cb_kthread, t);
-	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
-}
-
-/*
- * If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo kthread, spawn it.
- */
-static void rcu_spawn_cpu_nocb_kthread(int cpu)
-{
-	if (rcu_scheduler_fully_active)
-		rcu_spawn_one_nocb_kthread(cpu);
-}
-
-/*
- * Once the scheduler is running, spawn rcuo kthreads for all online
- * no-CBs CPUs.  This assumes that the early_initcall()s happen before
- * non-boot CPUs come online -- if this changes, we will need to add
- * some mutual exclusion.
- */
-static void __init rcu_spawn_nocb_kthreads(void)
-{
-	int cpu;
-
-	for_each_online_cpu(cpu)
-		rcu_spawn_cpu_nocb_kthread(cpu);
-}
-
-/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
-static int rcu_nocb_gp_stride = -1;
-module_param(rcu_nocb_gp_stride, int, 0444);
-
-/*
- * Initialize GP-CB relationships for all no-CBs CPU.
- */
-static void __init rcu_organize_nocb_kthreads(void)
-{
-	int cpu;
-	bool firsttime = true;
-	bool gotnocbs = false;
-	bool gotnocbscbs = true;
-	int ls = rcu_nocb_gp_stride;
-	int nl = 0;  /* Next GP kthread. */
-	struct rcu_data *rdp;
-	struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
-	struct rcu_data *rdp_prev = NULL;
-
-	if (!cpumask_available(rcu_nocb_mask))
-		return;
-	if (ls == -1) {
-		ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
-		rcu_nocb_gp_stride = ls;
-	}
-
-	/*
-	 * Each pass through this loop sets up one rcu_data structure.
-	 * Should the corresponding CPU come online in the future, then
-	 * we will spawn the needed set of rcu_nocb_kthread() kthreads.
-	 */
-	for_each_cpu(cpu, rcu_nocb_mask) {
-		rdp = per_cpu_ptr(&rcu_data, cpu);
-		if (rdp->cpu >= nl) {
-			/* New GP kthread, set up for CBs & next GP. */
-			gotnocbs = true;
-			nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
-			rdp->nocb_gp_rdp = rdp;
-			rdp_gp = rdp;
-			if (dump_tree) {
-				if (!firsttime)
-					pr_cont("%s\n", gotnocbscbs
-							? "" : " (self only)");
-				gotnocbscbs = false;
-				firsttime = false;
-				pr_alert("%s: No-CB GP kthread CPU %d:",
-					 __func__, cpu);
-			}
-		} else {
-			/* Another CB kthread, link to previous GP kthread. */
-			gotnocbscbs = true;
-			rdp->nocb_gp_rdp = rdp_gp;
-			rdp_prev->nocb_next_cb_rdp = rdp;
-			if (dump_tree)
-				pr_cont(" %d", cpu);
-		}
-		rdp_prev = rdp;
-	}
-	if (gotnocbs && dump_tree)
-		pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
-}
-
-/*
- * Bind the current task to the offloaded CPUs.  If there are no offloaded
- * CPUs, leave the task unbound.  Splat if the bind attempt fails.
- */
-void rcu_bind_current_to_nocb(void)
-{
-	if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
-		WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
-}
-EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
-
-// The ->on_cpu field is available only in CONFIG_SMP=y, so...
-#ifdef CONFIG_SMP
-static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
-{
-	return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
-}
-#else // #ifdef CONFIG_SMP
-static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
-{
-	return "";
-}
-#endif // #else #ifdef CONFIG_SMP
-
-/*
- * Dump out nocb grace-period kthread state for the specified rcu_data
- * structure.
- */
-static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
-{
-	struct rcu_node *rnp = rdp->mynode;
-
-	pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
-		rdp->cpu,
-		"kK"[!!rdp->nocb_gp_kthread],
-		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
-		"dD"[!!rdp->nocb_defer_wakeup],
-		"tT"[timer_pending(&rdp->nocb_timer)],
-		"sS"[!!rdp->nocb_gp_sleep],
-		".W"[swait_active(&rdp->nocb_gp_wq)],
-		".W"[swait_active(&rnp->nocb_gp_wq[0])],
-		".W"[swait_active(&rnp->nocb_gp_wq[1])],
-		".B"[!!rdp->nocb_gp_bypass],
-		".G"[!!rdp->nocb_gp_gp],
-		(long)rdp->nocb_gp_seq,
-		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
-		rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
-		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
-		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
-}
-
-/* Dump out nocb kthread state for the specified rcu_data structure. */
-static void show_rcu_nocb_state(struct rcu_data *rdp)
-{
-	char bufw[20];
-	char bufr[20];
-	struct rcu_segcblist *rsclp = &rdp->cblist;
-	bool waslocked;
-	bool wassleep;
-
-	if (rdp->nocb_gp_rdp == rdp)
-		show_rcu_nocb_gp_state(rdp);
-
-	sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
-	sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
-	pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
-		rdp->cpu, rdp->nocb_gp_rdp->cpu,
-		rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
-		"kK"[!!rdp->nocb_cb_kthread],
-		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
-		"cC"[!!atomic_read(&rdp->nocb_lock_contended)],
-		"lL"[raw_spin_is_locked(&rdp->nocb_lock)],
-		"sS"[!!rdp->nocb_cb_sleep],
-		".W"[swait_active(&rdp->nocb_cb_wq)],
-		jiffies - rdp->nocb_bypass_first,
-		jiffies - rdp->nocb_nobypass_last,
-		rdp->nocb_nobypass_count,
-		".D"[rcu_segcblist_ready_cbs(rsclp)],
-		".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
-		rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
-		".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
-		rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
-		".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
-		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
-		rcu_segcblist_n_cbs(&rdp->cblist),
-		rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
-		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
-		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
-
-	/* It is OK for GP kthreads to have GP state. */
-	if (rdp->nocb_gp_rdp == rdp)
-		return;
-
-	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
-	wassleep = swait_active(&rdp->nocb_gp_wq);
-	if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
-		return;  /* Nothing untoward. */
-
-	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
-		"lL"[waslocked],
-		"dD"[!!rdp->nocb_defer_wakeup],
-		"sS"[!!rdp->nocb_gp_sleep],
-		".W"[wassleep]);
-}
-
-#else /* #ifdef CONFIG_RCU_NOCB_CPU */
-
-/* No ->nocb_lock to acquire.  */
-static void rcu_nocb_lock(struct rcu_data *rdp)
-{
-}
-
-/* No ->nocb_lock to release.  */
-static void rcu_nocb_unlock(struct rcu_data *rdp)
-{
-}
-
-/* No ->nocb_lock to release.  */
-static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
-				       unsigned long flags)
-{
-	local_irq_restore(flags);
-}
-
-/* Lockdep check that ->cblist may be safely accessed. */
-static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-}
-
-static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
-{
-}
-
-static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
-{
-	return NULL;
-}
-
-static void rcu_init_one_nocb(struct rcu_node *rnp)
-{
-}
-
-static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j)
-{
-	return true;
-}
-
-static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags)
-{
-	return false;
-}
-
-static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
-				 unsigned long flags)
-{
-	WARN_ON_ONCE(1);  /* Should be dead code! */
-}
-
-static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
-{
-}
-
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
-{
-	return false;
-}
-
-static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
-{
-	return false;
-}
-
-static void rcu_spawn_cpu_nocb_kthread(int cpu)
-{
-}
-
-static void __init rcu_spawn_nocb_kthreads(void)
-{
-}
-
-static void show_rcu_nocb_state(struct rcu_data *rdp)
-{
-}
-
-#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
-
 /*
  * Is this CPU a NO_HZ_FULL CPU that should ignore RCU so that the
  * grace-period kthread will do force_quiescent_state() processing?
@@ -2982,17 +1498,17 @@
 /* Turn on heavyweight RCU tasks trace readers on idle/user entry. */
 static void rcu_dynticks_task_trace_enter(void)
 {
-#ifdef CONFIG_TASKS_RCU_TRACE
+#ifdef CONFIG_TASKS_TRACE_RCU
 	if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB))
 		current->trc_reader_special.b.need_mb = true;
-#endif /* #ifdef CONFIG_TASKS_RCU_TRACE */
+#endif /* #ifdef CONFIG_TASKS_TRACE_RCU */
 }
 
 /* Turn off heavyweight RCU tasks trace readers on idle/user exit. */
 static void rcu_dynticks_task_trace_exit(void)
 {
-#ifdef CONFIG_TASKS_RCU_TRACE
+#ifdef CONFIG_TASKS_TRACE_RCU
 	if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB))
 		current->trc_reader_special.b.need_mb = false;
-#endif /* #ifdef CONFIG_TASKS_RCU_TRACE */
+#endif /* #ifdef CONFIG_TASKS_TRACE_RCU */
 }
diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h
index 6c76988c..677ee3d 100644
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -7,6 +7,8 @@
  * Author: Paul E. McKenney <paulmck@linux.ibm.com>
  */
 
+#include <linux/kvm_para.h>
+
 //////////////////////////////////////////////////////////////////////////////
 //
 // Controlling CPU stall warnings, including delay calculation.
@@ -117,17 +119,14 @@
 }
 
 /**
- * rcu_cpu_stall_reset - prevent further stall warnings in current grace period
- *
- * Set the stall-warning timeout way off into the future, thus preventing
- * any RCU CPU stall-warning messages from appearing in the current set of
- * RCU grace periods.
+ * rcu_cpu_stall_reset - restart stall-warning timeout for current grace period
  *
  * The caller must disable hard irqs.
  */
 void rcu_cpu_stall_reset(void)
 {
-	WRITE_ONCE(rcu_state.jiffies_stall, jiffies + ULONG_MAX / 2);
+	WRITE_ONCE(rcu_state.jiffies_stall,
+		   jiffies + rcu_jiffies_till_stall_check());
 }
 
 //////////////////////////////////////////////////////////////////////////////
@@ -267,8 +266,10 @@
 	struct task_struct *ts[8];
 
 	lockdep_assert_irqs_disabled();
-	if (!rcu_preempt_blocked_readers_cgp(rnp))
+	if (!rcu_preempt_blocked_readers_cgp(rnp)) {
+		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 		return 0;
+	}
 	pr_err("\tTasks blocked on level-%d rcu_node (CPUs %d-%d):",
 	       rnp->level, rnp->grplo, rnp->grphi);
 	t = list_entry(rnp->gp_tasks->prev,
@@ -280,8 +281,8 @@
 			break;
 	}
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-	for (i--; i; i--) {
-		t = ts[i];
+	while (i) {
+		t = ts[--i];
 		if (!try_invoke_on_locked_down_task(t, check_slow_task, &rscr))
 			pr_cont(" P%d", t->pid);
 		else
@@ -350,7 +351,7 @@
 
 static void print_cpu_stall_fast_no_hz(char *cp, int cpu)
 {
-	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 
 	sprintf(cp, "last_accelerate: %04lx/%04lx dyntick_enabled: %d",
 		rdp->last_accelerate & 0xffff, jiffies & 0xffff,
@@ -464,9 +465,10 @@
 		pr_err("%s kthread starved for %ld jiffies! g%ld f%#x %s(%d) ->state=%#x ->cpu=%d\n",
 		       rcu_state.name, j,
 		       (long)rcu_seq_current(&rcu_state.gp_seq),
-		       data_race(rcu_state.gp_flags),
-		       gp_state_getname(rcu_state.gp_state), rcu_state.gp_state,
-		       gpk ? gpk->__state : ~0, cpu);
+		       data_race(READ_ONCE(rcu_state.gp_flags)),
+		       gp_state_getname(rcu_state.gp_state),
+		       data_race(READ_ONCE(rcu_state.gp_state)),
+		       gpk ? data_race(READ_ONCE(gpk->__state)) : ~0, cpu);
 		if (gpk) {
 			pr_err("\tUnless %s kthread gets sufficient CPU time, OOM is now expected behavior.\n", rcu_state.name);
 			pr_err("RCU grace-period kthread stack dump:\n");
@@ -509,7 +511,7 @@
 		       (long)rcu_seq_current(&rcu_state.gp_seq),
 		       data_race(rcu_state.gp_flags),
 		       gp_state_getname(RCU_GP_WAIT_FQS), RCU_GP_WAIT_FQS,
-		       gpk->__state);
+		       data_race(READ_ONCE(gpk->__state)));
 		pr_err("\tPossible timer handling issue on cpu=%d timer-softirq=%u\n",
 		       cpu, kstat_softirqs_cpu(TIMER_SOFTIRQ, cpu));
 	}
@@ -568,11 +570,11 @@
 			pr_err("INFO: Stall ended before state dump start\n");
 		} else {
 			j = jiffies;
-			gpa = data_race(rcu_state.gp_activity);
+			gpa = data_race(READ_ONCE(rcu_state.gp_activity));
 			pr_err("All QSes seen, last %s kthread activity %ld (%ld-%ld), jiffies_till_next_fqs=%ld, root ->qsmask %#lx\n",
 			       rcu_state.name, j - gpa, j, gpa,
-			       data_race(jiffies_till_next_fqs),
-			       rcu_get_root()->qsmask);
+			       data_race(READ_ONCE(jiffies_till_next_fqs)),
+			       data_race(READ_ONCE(rcu_get_root()->qsmask)));
 		}
 	}
 	/* Rewrite if needed in case of slow consoles. */
@@ -646,6 +648,7 @@
 
 static void check_cpu_stall(struct rcu_data *rdp)
 {
+	bool didstall = false;
 	unsigned long gs1;
 	unsigned long gs2;
 	unsigned long gps;
@@ -691,24 +694,46 @@
 	    ULONG_CMP_GE(gps, js))
 		return; /* No stall or GP completed since entering function. */
 	rnp = rdp->mynode;
-	jn = jiffies + 3 * rcu_jiffies_till_stall_check() + 3;
+	jn = jiffies + ULONG_MAX / 2;
 	if (rcu_gp_in_progress() &&
 	    (READ_ONCE(rnp->qsmask) & rdp->grpmask) &&
 	    cmpxchg(&rcu_state.jiffies_stall, js, jn) == js) {
 
+		/*
+		 * If a virtual machine is stopped by the host it can look to
+		 * the watchdog like an RCU stall. Check to see if the host
+		 * stopped the vm.
+		 */
+		if (kvm_check_and_clear_guest_paused())
+			return;
+
 		/* We haven't checked in, so go dump stack. */
 		print_cpu_stall(gps);
 		if (READ_ONCE(rcu_cpu_stall_ftrace_dump))
 			rcu_ftrace_dump(DUMP_ALL);
+		didstall = true;
 
 	} else if (rcu_gp_in_progress() &&
 		   ULONG_CMP_GE(j, js + RCU_STALL_RAT_DELAY) &&
 		   cmpxchg(&rcu_state.jiffies_stall, js, jn) == js) {
 
+		/*
+		 * If a virtual machine is stopped by the host it can look to
+		 * the watchdog like an RCU stall. Check to see if the host
+		 * stopped the vm.
+		 */
+		if (kvm_check_and_clear_guest_paused())
+			return;
+
 		/* They had a few time units to dump stack, so complain. */
 		print_other_cpu_stall(gs2, gps);
 		if (READ_ONCE(rcu_cpu_stall_ftrace_dump))
 			rcu_ftrace_dump(DUMP_ALL);
+		didstall = true;
+	}
+	if (didstall && READ_ONCE(rcu_state.jiffies_stall) == jn) {
+		jn = jiffies + 3 * rcu_jiffies_till_stall_check() + 3;
+		WRITE_ONCE(rcu_state.jiffies_stall, jn);
 	}
 }
 
@@ -742,7 +767,7 @@
 
 	rcu_for_each_leaf_node(rnp) {
 		if (!cpup) {
-			if (READ_ONCE(rnp->qsmask)) {
+			if (data_race(READ_ONCE(rnp->qsmask))) {
 				return false;
 			} else {
 				if (READ_ONCE(rnp->gp_tasks))
@@ -791,32 +816,34 @@
 	struct task_struct *t = READ_ONCE(rcu_state.gp_kthread);
 
 	j = jiffies;
-	ja = j - data_race(rcu_state.gp_activity);
-	jr = j - data_race(rcu_state.gp_req_activity);
-	js = j - data_race(rcu_state.gp_start);
-	jw = j - data_race(rcu_state.gp_wake_time);
+	ja = j - data_race(READ_ONCE(rcu_state.gp_activity));
+	jr = j - data_race(READ_ONCE(rcu_state.gp_req_activity));
+	js = j - data_race(READ_ONCE(rcu_state.gp_start));
+	jw = j - data_race(READ_ONCE(rcu_state.gp_wake_time));
 	pr_info("%s: wait state: %s(%d) ->state: %#x ->rt_priority %u delta ->gp_start %lu ->gp_activity %lu ->gp_req_activity %lu ->gp_wake_time %lu ->gp_wake_seq %ld ->gp_seq %ld ->gp_seq_needed %ld ->gp_max %lu ->gp_flags %#x\n",
 		rcu_state.name, gp_state_getname(rcu_state.gp_state),
-		rcu_state.gp_state, t ? t->__state : 0x1ffff, t ? t->rt_priority : 0xffU,
-		js, ja, jr, jw, (long)data_race(rcu_state.gp_wake_seq),
-		(long)data_race(rcu_state.gp_seq),
-		(long)data_race(rcu_get_root()->gp_seq_needed),
-		data_race(rcu_state.gp_max),
-		data_race(rcu_state.gp_flags));
+		data_race(READ_ONCE(rcu_state.gp_state)),
+		t ? data_race(READ_ONCE(t->__state)) : 0x1ffff, t ? t->rt_priority : 0xffU,
+		js, ja, jr, jw, (long)data_race(READ_ONCE(rcu_state.gp_wake_seq)),
+		(long)data_race(READ_ONCE(rcu_state.gp_seq)),
+		(long)data_race(READ_ONCE(rcu_get_root()->gp_seq_needed)),
+		data_race(READ_ONCE(rcu_state.gp_max)),
+		data_race(READ_ONCE(rcu_state.gp_flags)));
 	rcu_for_each_node_breadth_first(rnp) {
 		if (ULONG_CMP_GE(READ_ONCE(rcu_state.gp_seq), READ_ONCE(rnp->gp_seq_needed)) &&
-		    !data_race(rnp->qsmask) && !data_race(rnp->boost_tasks) &&
-		    !data_race(rnp->exp_tasks) && !data_race(rnp->gp_tasks))
+		    !data_race(READ_ONCE(rnp->qsmask)) && !data_race(READ_ONCE(rnp->boost_tasks)) &&
+		    !data_race(READ_ONCE(rnp->exp_tasks)) && !data_race(READ_ONCE(rnp->gp_tasks)))
 			continue;
 		pr_info("\trcu_node %d:%d ->gp_seq %ld ->gp_seq_needed %ld ->qsmask %#lx %c%c%c%c ->n_boosts %ld\n",
 			rnp->grplo, rnp->grphi,
-			(long)data_race(rnp->gp_seq), (long)data_race(rnp->gp_seq_needed),
-			data_race(rnp->qsmask),
-			".b"[!!data_race(rnp->boost_kthread_task)],
-			".B"[!!data_race(rnp->boost_tasks)],
-			".E"[!!data_race(rnp->exp_tasks)],
-			".G"[!!data_race(rnp->gp_tasks)],
-			data_race(rnp->n_boosts));
+			(long)data_race(READ_ONCE(rnp->gp_seq)),
+			(long)data_race(READ_ONCE(rnp->gp_seq_needed)),
+			data_race(READ_ONCE(rnp->qsmask)),
+			".b"[!!data_race(READ_ONCE(rnp->boost_kthread_task))],
+			".B"[!!data_race(READ_ONCE(rnp->boost_tasks))],
+			".E"[!!data_race(READ_ONCE(rnp->exp_tasks))],
+			".G"[!!data_race(READ_ONCE(rnp->gp_tasks))],
+			data_race(READ_ONCE(rnp->n_boosts)));
 		if (!rcu_is_leaf_node(rnp))
 			continue;
 		for_each_leaf_node_possible_cpu(rnp, cpu) {
@@ -826,12 +853,12 @@
 					 READ_ONCE(rdp->gp_seq_needed)))
 				continue;
 			pr_info("\tcpu %d ->gp_seq_needed %ld\n",
-				cpu, (long)data_race(rdp->gp_seq_needed));
+				cpu, (long)data_race(READ_ONCE(rdp->gp_seq_needed)));
 		}
 	}
 	for_each_possible_cpu(cpu) {
 		rdp = per_cpu_ptr(&rcu_data, cpu);
-		cbs += data_race(rdp->n_cbs_invoked);
+		cbs += data_race(READ_ONCE(rdp->n_cbs_invoked));
 		if (rcu_segcblist_is_offloaded(&rdp->cblist))
 			show_rcu_nocb_state(rdp);
 	}
@@ -913,11 +940,11 @@
 
 	if (rcu_gp_in_progress()) {
 		pr_info("%s: GP age %lu jiffies\n",
-			__func__, jiffies - rcu_state.gp_start);
+			__func__, jiffies - data_race(READ_ONCE(rcu_state.gp_start)));
 		show_rcu_gp_kthreads();
 	} else {
 		pr_info("%s: Last GP end %lu jiffies ago\n",
-			__func__, jiffies - rcu_state.gp_end);
+			__func__, jiffies - data_race(READ_ONCE(rcu_state.gp_end)));
 		preempt_disable();
 		rdp = this_cpu_ptr(&rcu_data);
 		rcu_check_gp_start_stall(rdp->mynode, rdp, j);
diff --git a/kernel/scftorture.c b/kernel/scftorture.c
index 29e8fc5..64a0828 100644
--- a/kernel/scftorture.c
+++ b/kernel/scftorture.c
@@ -64,6 +64,7 @@
 torture_param(int, verbose, 0, "Enable verbose debugging printk()s");
 torture_param(int, weight_resched, -1, "Testing weight for resched_cpu() operations.");
 torture_param(int, weight_single, -1, "Testing weight for single-CPU no-wait operations.");
+torture_param(int, weight_single_rpc, -1, "Testing weight for single-CPU RPC operations.");
 torture_param(int, weight_single_wait, -1, "Testing weight for single-CPU operations.");
 torture_param(int, weight_many, -1, "Testing weight for multi-CPU no-wait operations.");
 torture_param(int, weight_many_wait, -1, "Testing weight for multi-CPU operations.");
@@ -86,6 +87,8 @@
 	long long n_resched;
 	long long n_single;
 	long long n_single_ofl;
+	long long n_single_rpc;
+	long long n_single_rpc_ofl;
 	long long n_single_wait;
 	long long n_single_wait_ofl;
 	long long n_many;
@@ -101,14 +104,17 @@
 // Data for random primitive selection
 #define SCF_PRIM_RESCHED	0
 #define SCF_PRIM_SINGLE		1
-#define SCF_PRIM_MANY		2
-#define SCF_PRIM_ALL		3
-#define SCF_NPRIMS		7 // Need wait and no-wait versions of each,
-				  //  except for SCF_PRIM_RESCHED.
+#define SCF_PRIM_SINGLE_RPC	2
+#define SCF_PRIM_MANY		3
+#define SCF_PRIM_ALL		4
+#define SCF_NPRIMS		8 // Need wait and no-wait versions of each,
+				  //  except for SCF_PRIM_RESCHED and
+				  //  SCF_PRIM_SINGLE_RPC.
 
 static char *scf_prim_name[] = {
 	"resched_cpu",
 	"smp_call_function_single",
+	"smp_call_function_single_rpc",
 	"smp_call_function_many",
 	"smp_call_function",
 };
@@ -128,6 +134,8 @@
 	bool scfc_out;
 	int scfc_cpu; // -1 for not _single().
 	bool scfc_wait;
+	bool scfc_rpc;
+	struct completion scfc_completion;
 };
 
 // Use to wait for all threads to start.
@@ -158,6 +166,7 @@
 		scfs.n_resched += scf_stats_p[i].n_resched;
 		scfs.n_single += scf_stats_p[i].n_single;
 		scfs.n_single_ofl += scf_stats_p[i].n_single_ofl;
+		scfs.n_single_rpc += scf_stats_p[i].n_single_rpc;
 		scfs.n_single_wait += scf_stats_p[i].n_single_wait;
 		scfs.n_single_wait_ofl += scf_stats_p[i].n_single_wait_ofl;
 		scfs.n_many += scf_stats_p[i].n_many;
@@ -168,9 +177,10 @@
 	if (atomic_read(&n_errs) || atomic_read(&n_mb_in_errs) ||
 	    atomic_read(&n_mb_out_errs) || atomic_read(&n_alloc_errs))
 		bangstr = "!!! ";
-	pr_alert("%s %sscf_invoked_count %s: %lld resched: %lld single: %lld/%lld single_ofl: %lld/%lld many: %lld/%lld all: %lld/%lld ",
+	pr_alert("%s %sscf_invoked_count %s: %lld resched: %lld single: %lld/%lld single_ofl: %lld/%lld single_rpc: %lld single_rpc_ofl: %lld many: %lld/%lld all: %lld/%lld ",
 		 SCFTORT_FLAG, bangstr, isdone ? "VER" : "ver", invoked_count, scfs.n_resched,
 		 scfs.n_single, scfs.n_single_wait, scfs.n_single_ofl, scfs.n_single_wait_ofl,
+		 scfs.n_single_rpc, scfs.n_single_rpc_ofl,
 		 scfs.n_many, scfs.n_many_wait, scfs.n_all, scfs.n_all_wait);
 	torture_onoff_stats();
 	pr_cont("ste: %d stnmie: %d stnmoe: %d staf: %d\n", atomic_read(&n_errs),
@@ -282,10 +292,13 @@
 out:
 	if (unlikely(!scfcp))
 		return;
-	if (scfcp->scfc_wait)
+	if (scfcp->scfc_wait) {
 		WRITE_ONCE(scfcp->scfc_out, true);
-	else
+		if (scfcp->scfc_rpc)
+			complete(&scfcp->scfc_completion);
+	} else {
 		kfree(scfcp);
+	}
 }
 
 // As above, but check for correct CPU.
@@ -319,6 +332,7 @@
 			scfcp->scfc_cpu = -1;
 			scfcp->scfc_wait = scfsp->scfs_wait;
 			scfcp->scfc_out = false;
+			scfcp->scfc_rpc = false;
 		}
 	}
 	switch (scfsp->scfs_prim) {
@@ -350,6 +364,34 @@
 			scfcp = NULL;
 		}
 		break;
+	case SCF_PRIM_SINGLE_RPC:
+		if (!scfcp)
+			break;
+		cpu = torture_random(trsp) % nr_cpu_ids;
+		scfp->n_single_rpc++;
+		scfcp->scfc_cpu = cpu;
+		scfcp->scfc_wait = true;
+		init_completion(&scfcp->scfc_completion);
+		scfcp->scfc_rpc = true;
+		barrier(); // Prevent race-reduction compiler optimizations.
+		scfcp->scfc_in = true;
+		ret = smp_call_function_single(cpu, scf_handler_1, (void *)scfcp, 0);
+		if (!ret) {
+			if (use_cpus_read_lock)
+				cpus_read_unlock();
+			else
+				preempt_enable();
+			wait_for_completion(&scfcp->scfc_completion);
+			if (use_cpus_read_lock)
+				cpus_read_lock();
+			else
+				preempt_disable();
+		} else {
+			scfp->n_single_rpc_ofl++;
+			kfree(scfcp);
+			scfcp = NULL;
+		}
+		break;
 	case SCF_PRIM_MANY:
 		if (scfsp->scfs_wait)
 			scfp->n_many_wait++;
@@ -379,10 +421,12 @@
 	}
 	if (scfcp && scfsp->scfs_wait) {
 		if (WARN_ON_ONCE((num_online_cpus() > 1 || scfsp->scfs_prim == SCF_PRIM_SINGLE) &&
-				 !scfcp->scfc_out))
+				 !scfcp->scfc_out)) {
+			pr_warn("%s: Memory-ordering failure, scfs_prim: %d.\n", __func__, scfsp->scfs_prim);
 			atomic_inc(&n_mb_out_errs); // Leak rather than trash!
-		else
+		} else {
 			kfree(scfcp);
+		}
 		barrier(); // Prevent race-reduction compiler optimizations.
 	}
 	if (use_cpus_read_lock)
@@ -453,8 +497,8 @@
 scftorture_print_module_parms(const char *tag)
 {
 	pr_alert(SCFTORT_FLAG
-		 "--- %s:  verbose=%d holdoff=%d longwait=%d nthreads=%d onoff_holdoff=%d onoff_interval=%d shutdown_secs=%d stat_interval=%d stutter=%d use_cpus_read_lock=%d, weight_resched=%d, weight_single=%d, weight_single_wait=%d, weight_many=%d, weight_many_wait=%d, weight_all=%d, weight_all_wait=%d\n", tag,
-		 verbose, holdoff, longwait, nthreads, onoff_holdoff, onoff_interval, shutdown, stat_interval, stutter, use_cpus_read_lock, weight_resched, weight_single, weight_single_wait, weight_many, weight_many_wait, weight_all, weight_all_wait);
+		 "--- %s:  verbose=%d holdoff=%d longwait=%d nthreads=%d onoff_holdoff=%d onoff_interval=%d shutdown_secs=%d stat_interval=%d stutter=%d use_cpus_read_lock=%d, weight_resched=%d, weight_single=%d, weight_single_rpc=%d, weight_single_wait=%d, weight_many=%d, weight_many_wait=%d, weight_all=%d, weight_all_wait=%d\n", tag,
+		 verbose, holdoff, longwait, nthreads, onoff_holdoff, onoff_interval, shutdown, stat_interval, stutter, use_cpus_read_lock, weight_resched, weight_single, weight_single_rpc, weight_single_wait, weight_many, weight_many_wait, weight_all, weight_all_wait);
 }
 
 static void scf_cleanup_handler(void *unused)
@@ -469,7 +513,7 @@
 		return;
 
 	WRITE_ONCE(scfdone, true);
-	if (nthreads)
+	if (nthreads && scf_stats_p)
 		for (i = 0; i < nthreads; i++)
 			torture_stop_kthread("scftorture_invoker", scf_stats_p[i].task);
 	else
@@ -497,6 +541,7 @@
 	int firsterr = 0;
 	unsigned long weight_resched1 = weight_resched;
 	unsigned long weight_single1 = weight_single;
+	unsigned long weight_single_rpc1 = weight_single_rpc;
 	unsigned long weight_single_wait1 = weight_single_wait;
 	unsigned long weight_many1 = weight_many;
 	unsigned long weight_many_wait1 = weight_many_wait;
@@ -508,11 +553,13 @@
 
 	scftorture_print_module_parms("Start of test");
 
-	if (weight_resched == -1 && weight_single == -1 && weight_single_wait == -1 &&
+	if (weight_resched == -1 &&
+	    weight_single == -1 && weight_single_rpc == -1 && weight_single_wait == -1 &&
 	    weight_many == -1 && weight_many_wait == -1 &&
 	    weight_all == -1 && weight_all_wait == -1) {
 		weight_resched1 = 2 * nr_cpu_ids;
 		weight_single1 = 2 * nr_cpu_ids;
+		weight_single_rpc1 = 2 * nr_cpu_ids;
 		weight_single_wait1 = 2 * nr_cpu_ids;
 		weight_many1 = 2;
 		weight_many_wait1 = 2;
@@ -523,6 +570,8 @@
 			weight_resched1 = 0;
 		if (weight_single == -1)
 			weight_single1 = 0;
+		if (weight_single_rpc == -1)
+			weight_single_rpc1 = 0;
 		if (weight_single_wait == -1)
 			weight_single_wait1 = 0;
 		if (weight_many == -1)
@@ -534,7 +583,7 @@
 		if (weight_all_wait == -1)
 			weight_all_wait1 = 0;
 	}
-	if (weight_single1 == 0 && weight_single_wait1 == 0 &&
+	if (weight_single1 == 0 && weight_single_rpc1 == 0 && weight_single_wait1 == 0 &&
 	    weight_many1 == 0 && weight_many_wait1 == 0 &&
 	    weight_all1 == 0 && weight_all_wait1 == 0) {
 		VERBOSE_SCFTORTOUT_ERRSTRING("all zero weights makes no sense");
@@ -546,6 +595,7 @@
 	else if (weight_resched1)
 		VERBOSE_SCFTORTOUT_ERRSTRING("built as module, weight_resched ignored");
 	scf_sel_add(weight_single1, SCF_PRIM_SINGLE, false);
+	scf_sel_add(weight_single_rpc1, SCF_PRIM_SINGLE_RPC, true);
 	scf_sel_add(weight_single_wait1, SCF_PRIM_SINGLE, true);
 	scf_sel_add(weight_many1, SCF_PRIM_MANY, false);
 	scf_sel_add(weight_many_wait1, SCF_PRIM_MANY, true);
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 2d9ff40f4..6a03c3f 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -7781,6 +7781,17 @@
 		preempt_schedule_common();
 		return 1;
 	}
+	/*
+	 * In preemptible kernels, ->rcu_read_lock_nesting tells the tick
+	 * whether the current CPU is in an RCU read-side critical section,
+	 * so the tick can report quiescent states even for CPUs looping
+	 * in kernel context.  In contrast, in non-preemptible kernels,
+	 * RCU readers leave no in-memory hints, which means that CPU-bound
+	 * processes executing in kernel context might never report an
+	 * RCU quiescent state.  Therefore, the following code causes
+	 * cond_resched() to report a quiescent state, but only when RCU
+	 * is in urgent need of one.
+	 */
 #ifndef CONFIG_PREEMPT_RCU
 	rcu_all_qs();
 #endif
diff --git a/kernel/torture.c b/kernel/torture.c
index 0a315c3..bb8f411 100644
--- a/kernel/torture.c
+++ b/kernel/torture.c
@@ -521,11 +521,11 @@
 	struct shuffle_task *stp;
 
 	cpumask_setall(shuffle_tmp_mask);
-	get_online_cpus();
+	cpus_read_lock();
 
 	/* No point in shuffling if there is only one online CPU (ex: UP) */
 	if (num_online_cpus() == 1) {
-		put_online_cpus();
+		cpus_read_unlock();
 		return;
 	}
 
@@ -541,7 +541,7 @@
 		set_cpus_allowed_ptr(stp->st_t, shuffle_tmp_mask);
 	mutex_unlock(&shuffle_task_mutex);
 
-	put_online_cpus();
+	cpus_read_unlock();
 }
 
 /* Shuffle tasks across CPUs, with the intent of allowing each CPU in the
diff --git a/tools/include/nolibc/nolibc.h b/tools/include/nolibc/nolibc.h
index 8b7a983..3430667 100644
--- a/tools/include/nolibc/nolibc.h
+++ b/tools/include/nolibc/nolibc.h
@@ -1031,7 +1031,7 @@
  *     scall32-o32.S in the kernel sources.
  *   - the system call is performed by calling "syscall"
  *   - syscall return comes in v0, and register a3 needs to be checked to know
- *     if an error occured, in which case errno is in v0.
+ *     if an error occurred, in which case errno is in v0.
  *   - the arguments are cast to long and assigned into the target registers
  *     which are then simply passed as registers to the asm code, so that we
  *     don't have to experience issues with register constraints.
@@ -2244,6 +2244,19 @@
 }
 
 static __attribute__((unused))
+int msleep(unsigned int msecs)
+{
+	struct timeval my_timeval = { msecs / 1000, (msecs % 1000) * 1000 };
+
+	if (sys_select(0, 0, 0, 0, &my_timeval) < 0)
+		return (my_timeval.tv_sec * 1000) +
+			(my_timeval.tv_usec / 1000) +
+			!!(my_timeval.tv_usec % 1000);
+	else
+		return 0;
+}
+
+static __attribute__((unused))
 int stat(const char *path, struct stat *buf)
 {
 	int ret = sys_stat(path, buf);
diff --git a/tools/testing/selftests/rcutorture/bin/jitter.sh b/tools/testing/selftests/rcutorture/bin/jitter.sh
index 15d937b..fd1ffaa 100755
--- a/tools/testing/selftests/rcutorture/bin/jitter.sh
+++ b/tools/testing/selftests/rcutorture/bin/jitter.sh
@@ -68,16 +68,12 @@
 	cpumask=`awk -v cpus="$cpus" -v me=$me -v n=$n 'BEGIN {
 		srand(n + me + systime());
 		ncpus = split(cpus, ca);
-		curcpu = ca[int(rand() * ncpus + 1)];
-		z = "";
-		for (i = 1; 4 * i <= curcpu; i++)
-			z = z "0";
-		print "0x" 2 ^ (curcpu % 4) z;
+		print ca[int(rand() * ncpus + 1)];
 	}' < /dev/null`
 	n=$(($n+1))
-	if ! taskset -p $cpumask $$ > /dev/null 2>&1
+	if ! taskset -c -p $cpumask $$ > /dev/null 2>&1
 	then
-		echo taskset failure: '"taskset -p ' $cpumask $$ '"'
+		echo taskset failure: '"taskset -c -p ' $cpumask $$ '"'
 		exit 1
 	fi
 
diff --git a/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh b/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
index e5cc6b2..1af5d6b 100755
--- a/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
+++ b/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
@@ -14,7 +14,7 @@
 then
 	exit 0
 fi
-cat $1/*/console.log |
+find $1 -name console.log -exec cat {} \; |
 	grep "BUG: KCSAN: " |
 	sed -e 's/^\[[^]]*] //' |
 	sort |
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-again.sh b/tools/testing/selftests/rcutorture/bin/kvm-again.sh
index d8c8483..5a0023d 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-again.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-again.sh
@@ -142,7 +142,7 @@
 	echo "Cannot copy from $oldrun to $rundir."
 	usage
 fi
-rm -f "$rundir"/*/{console.log,console.log.diags,qemu_pid,qemu-retval,Warnings,kvm-test-1-run.sh.out,kvm-test-1-run-qemu.sh.out,vmlinux} "$rundir"/log
+rm -f "$rundir"/*/{console.log,console.log.diags,qemu_pid,qemu-pid,qemu-retval,Warnings,kvm-test-1-run.sh.out,kvm-test-1-run-qemu.sh.out,vmlinux} "$rundir"/log
 touch "$rundir/log"
 echo $scriptname $args | tee -a "$rundir/log"
 echo $oldrun > "$rundir/re-run"
@@ -179,6 +179,6 @@
 then
 	echo ---- Dryrun complete, directory: $rundir | tee -a "$rundir/log"
 else
-	( cd "$rundir"; sh $T/runbatches.sh )
+	( cd "$rundir"; sh $T/runbatches.sh ) | tee -a "$rundir/log"
 	kvm-end-run-stats.sh "$rundir" "$starttime"
 fi
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh b/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh
new file mode 100755
index 0000000..f99b2c1
--- /dev/null
+++ b/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh
@@ -0,0 +1,106 @@
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Produce awk statements roughly depicting the system's CPU and cache
+# layout.  If the required information is not available, produce
+# error messages as awk comments.  Successful exit regardless.
+#
+# Usage: kvm-assign-cpus.sh /path/to/sysfs
+
+T=/tmp/kvm-assign-cpus.sh.$$
+trap 'rm -rf $T' 0 2
+mkdir $T
+
+sysfsdir=${1-/sys/devices/system/node}
+if ! cd "$sysfsdir" > $T/msg 2>&1
+then
+	sed -e 's/^/# /' < $T/msg
+	exit 0
+fi
+nodelist="`ls -d node*`"
+for i in node*
+do
+	if ! test -d $i/
+	then
+		echo "# Not a directory: $sysfsdir/node*"
+		exit 0
+	fi
+	for j in $i/cpu*/cache/index*
+	do
+		if ! test -d $j/
+		then
+			echo "# Not a directory: $sysfsdir/$j"
+			exit 0
+		else
+			break
+		fi
+	done
+	indexlist="`ls -d $i/cpu* | grep 'cpu[0-9][0-9]*' | head -1 | sed -e 's,^.*$,ls -d &/cache/index*,' | sh | sed -e 's,^.*/,,'`"
+	break
+done
+for i in node*/cpu*/cache/index*/shared_cpu_list
+do
+	if ! test -f $i
+	then
+		echo "# Not a file: $sysfsdir/$i"
+		exit 0
+	else
+		break
+	fi
+done
+firstshared=
+for i in $indexlist
+do
+	rm -f $T/cpulist
+	for n in node*
+	do
+		f="$n/cpu*/cache/$i/shared_cpu_list"
+		if ! cat $f > $T/msg 2>&1
+		then
+			sed -e 's/^/# /' < $T/msg
+			exit 0
+		fi
+		cat $f >> $T/cpulist
+	done
+	if grep -q '[-,]' $T/cpulist
+	then
+		if test -z "$firstshared"
+		then
+			firstshared="$i"
+		fi
+	fi
+done
+if test -z "$firstshared"
+then
+	splitindex="`echo $indexlist | sed -e 's/ .*$//'`"
+else
+	splitindex="$firstshared"
+fi
+nodenum=0
+for n in node*
+do
+	cat $n/cpu*/cache/$splitindex/shared_cpu_list | sort -u -k1n |
+	awk -v nodenum="$nodenum" '
+	BEGIN {
+		idx = 0;
+	}
+
+	{
+		nlists = split($0, cpulists, ",");
+		for (i = 1; i <= nlists; i++) {
+			listsize = split(cpulists[i], cpus, "-");
+			if (listsize == 1)
+				cpus[2] = cpus[1];
+			for (j = cpus[1]; j <= cpus[2]; j++) {
+				print "cpu[" nodenum "][" idx "] = " j ";";
+				idx++;
+			}
+		}
+	}
+
+	END {
+		print "nodecpus[" nodenum "] = " idx ";";
+	}'
+	nodenum=`expr $nodenum + 1`
+done
+echo "numnodes = $nodenum;"
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh b/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh
new file mode 100755
index 0000000..20c7c53
--- /dev/null
+++ b/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh
@@ -0,0 +1,88 @@
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Create an awk script that takes as input numbers of CPUs and outputs
+# lists of CPUs, one per line in both cases.
+#
+# Usage: kvm-get-cpus-script.sh /path/to/cpu/arrays /path/to/put/script [ /path/to/state ]
+#
+# The CPU arrays are output by kvm-assign-cpus.sh, and are valid awk
+# statements initializing the variables describing the system's topology.
+#
+# The optional state is input by this script (if the file exists and is
+# non-empty), and can also be output by this script.
+
+cpuarrays="${1-/sys/devices/system/node}"
+scriptfile="${2}"
+statefile="${3}"
+
+if ! test -f "$cpuarrays"
+then
+	echo "File not found: $cpuarrays" 1>&2
+	exit 1
+fi
+scriptdir="`dirname "$scriptfile"`"
+if ! test -d "$scriptdir" || ! test -x "$scriptdir" || ! test -w "$scriptdir"
+then
+	echo "Directory not usable for script output: $scriptdir"
+	exit 1
+fi
+
+cat << '___EOF___' > "$scriptfile"
+BEGIN {
+___EOF___
+cat "$cpuarrays" >> "$scriptfile"
+if test -r "$statefile"
+then
+	cat "$statefile" >> "$scriptfile"
+fi
+cat << '___EOF___' >> "$scriptfile"
+}
+
+# Do we have the system architecture to guide CPU affinity?
+function gotcpus()
+{
+	return numnodes != "";
+}
+
+# Return a comma-separated list of the next n CPUs.
+function nextcpus(n,  i, s)
+{
+	for (i = 0; i < n; i++) {
+		if (nodecpus[curnode] == "")
+			curnode = 0;
+		if (cpu[curnode][curcpu[curnode]] == "")
+			curcpu[curnode] = 0;
+		if (s != "")
+			s = s ",";
+		s = s cpu[curnode][curcpu[curnode]];
+		curcpu[curnode]++;
+		curnode++
+	}
+	return s;
+}
+
+# Dump out the current node/CPU state so that a later invocation of this
+# script can continue where this one left off.  Of course, this only works
+# when a state file was specified and where there was valid sysfs state.
+# Returns 1 if the state was dumped, 0 otherwise.
+#
+# Dumping the state for one system configuration and loading it into
+# another isn't likely to do what you want, whatever that might be.
+function dumpcpustate(  i, fn)
+{
+___EOF___
+echo '	fn = "'"$statefile"'";' >> $scriptfile
+cat << '___EOF___' >> "$scriptfile"
+	if (fn != "" && gotcpus()) {
+		print "curnode = " curnode ";" > fn;
+		for (i = 0; i < numnodes; i++)
+			if (curcpu[i] != "")
+				print "curcpu[" i "] = " curcpu[i] ";" >> fn;
+		return 1;
+	}
+	if (fn != "")
+		print "# No CPU state to dump." > fn;
+	return 0;
+}
+___EOF___
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh b/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
index f3a7a5e..db2c0e2 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
@@ -25,7 +25,7 @@
 	echo "$configfile -------"
 else
 	title="$configfile ------- $ncs acquisitions/releases"
-	dur=`sed -e 's/^.* locktorture.shutdown_secs=//' -e 's/ .*$//' < $i/qemu-cmd 2> /dev/null`
+	dur=`grep -v '^#' $i/qemu-cmd | sed -e 's/^.* locktorture.shutdown_secs=//' -e 's/ .*$//' 2> /dev/null`
 	if test -z "$dur"
 	then
 		:
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh b/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
index 671bfee..3afa5c6 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
@@ -25,7 +25,7 @@
 then
 	echo "$configfile ------- "
 else
-	dur="`sed -e 's/^.* scftorture.shutdown_secs=//' -e 's/ .*$//' < $i/qemu-cmd 2> /dev/null`"
+	dur="`grep -v '^#' $i/qemu-cmd | sed -e 's/^.* scftorture.shutdown_secs=//' -e 's/ .*$//' 2> /dev/null`"
 	if test -z "$dur"
 	then
 		rate=""
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh b/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
index e01b31b..0a54199 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
@@ -74,7 +74,10 @@
 	done
 	if test -f "$rd/kcsan.sum"
 	then
-		if grep -q CONFIG_KCSAN=y $T
+		if ! test -f $T
+		then
+			:
+		elif grep -q CONFIG_KCSAN=y $T
 		then
 			echo "Compiler or architecture does not support KCSAN!"
 			echo Did you forget to switch your compiler with '--kmake-arg CC=<cc-that-supports-kcsan>'?
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh b/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh
new file mode 100755
index 0000000..014ce68
--- /dev/null
+++ b/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Periodically scan a directory tree to prevent files from being reaped
+# by systemd and friends on long runs.
+#
+# Usage: kvm-remote-noreap.sh pathname
+#
+# Copyright (C) 2021 Facebook, Inc.
+#
+# Authors: Paul E. McKenney <paulmck@kernel.org>
+
+pathname="$1"
+if test "$pathname" = ""
+then
+	echo Usage: kvm-remote-noreap.sh pathname
+	exit 1
+fi
+if ! test -d "$pathname"
+then
+	echo  Usage: kvm-remote-noreap.sh pathname
+	echo "       pathname must be a directory."
+	exit 2
+fi
+
+while test -d "$pathname"
+do
+	find "$pathname" -type f -exec touch -c {} \; > /dev/null 2>&1
+	sleep 30
+done
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-remote.sh b/tools/testing/selftests/rcutorture/bin/kvm-remote.sh
index 79e680e..03126eb 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-remote.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-remote.sh
@@ -124,10 +124,12 @@
 	n = $1;
 	sub(/\./, "", n);
 	fn = dest "/kvm-remote-" n ".sh"
+	print "kvm-remote-noreap.sh " rundir " &" > fn;
 	scenarios = "";
 	for (i = 2; i <= NF; i++)
 		scenarios = scenarios " " $i;
-	print "kvm-test-1-run-batch.sh" scenarios > fn;
+	print "kvm-test-1-run-batch.sh" scenarios >> fn;
+	print "sync" >> fn;
 	print "rm " rundir "/remote.run" >> fn;
 }'
 chmod +x $T/bin/kvm-remote-*.sh
@@ -172,11 +174,20 @@
 	do
 		ssh $1 "test -f \"$2\""
 		ret=$?
-		if test "$ret" -ne 255
+		if test "$ret" -eq 255
 		then
+			echo " ---" ssh failure to $1 checking for file $2, retry after $sleeptime seconds. `date`
+		elif test "$ret" -eq 0
+		then
+			return 0
+		elif test "$ret" -eq 1
+		then
+			echo " ---" File \"$2\" not found: ssh $1 test -f \"$2\"
+			return 1
+		else
+			echo " ---" Exit code $ret: ssh $1 test -f \"$2\", retry after $sleeptime seconds. `date`
 			return $ret
 		fi
-		echo " ---" ssh failure to $1 checking for file $2, retry after $sleeptime seconds. `date`
 		sleep $sleeptime
 	done
 }
@@ -242,7 +253,8 @@
 	do
 		sleep 30
 	done
-	( cd "$oldrun"; ssh $i "cd $rundir; tar -czf - kvm-remote-*.sh.out */console.log */kvm-test-1-run*.sh.out */qemu_pid */qemu-retval; rm -rf $T > /dev/null 2>&1" | tar -xzf - )
+	echo " ---" Collecting results from $i `date`
+	( cd "$oldrun"; ssh $i "cd $rundir; tar -czf - kvm-remote-*.sh.out */console.log */kvm-test-1-run*.sh.out */qemu[_-]pid */qemu-retval */qemu-affinity; rm -rf $T > /dev/null 2>&1" | tar -xzf - )
 done
 
 ( kvm-end-run-stats.sh "$oldrun" "$starttime"; echo $? > $T/exitcode ) | tee -a "$oldrun/remote-log"
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
index 7ea0809e..1e29d65 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
@@ -50,10 +50,34 @@
 echo ---- System running test: `uname -a`
 echo ---- Starting kernels. `date` | tee -a log
 $TORTURE_JITTER_START
+kvm-assign-cpus.sh /sys/devices/system/node > $T/cpuarray.awk
 for i in "$@"
 do
 	echo ---- System running test: `uname -a` > $i/kvm-test-1-run-qemu.sh.out
 	echo > $i/kvm-test-1-run-qemu.sh.out
+	export TORTURE_AFFINITY=
+	kvm-get-cpus-script.sh $T/cpuarray.awk $T/cpubatches.awk $T/cpustate
+	cat << '	___EOF___' >> $T/cpubatches.awk
+	END {
+		affinitylist = "";
+		if (!gotcpus()) {
+			print "echo No CPU-affinity information, so no taskset command.";
+		} else if (cpu_count !~ /^[0-9][0-9]*$/) {
+			print "echo " scenario ": Bogus number of CPUs (old qemu-cmd?), so no taskset command.";
+		} else {
+			affinitylist = nextcpus(cpu_count);
+			if (!(affinitylist ~ /^[0-9,-][0-9,-]*$/))
+				print "echo " scenario ": Bogus CPU-affinity information, so no taskset command.";
+			else if (!dumpcpustate())
+				print "echo " scenario ": Could not dump state, so no taskset command.";
+			else
+				print "export TORTURE_AFFINITY=" affinitylist;
+		}
+	}
+	___EOF___
+	cpu_count="`grep '# TORTURE_CPU_COUNT=' $i/qemu-cmd | sed -e 's/^.*=//'`"
+	affinity_export="`awk -f $T/cpubatches.awk -v cpu_count="$cpu_count" -v scenario=$i < /dev/null`"
+	$affinity_export
 	kvm-test-1-run-qemu.sh $i >> $i/kvm-test-1-run-qemu.sh.out 2>&1 &
 done
 for i in $runfiles
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
index 5b1aa2a..4428058 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
@@ -39,27 +39,34 @@
 grep '^#' $resdir/qemu-cmd | sed -e 's/^# //' > $T/qemu-cmd-settings
 . $T/qemu-cmd-settings
 
-# Decorate qemu-cmd with redirection, backgrounding, and PID capture
-sed -e 's/$/ 2>\&1 \&/' < $resdir/qemu-cmd > $T/qemu-cmd
-echo 'echo $! > $resdir/qemu_pid' >> $T/qemu-cmd
+# Decorate qemu-cmd with affinity, redirection, backgrounding, and PID capture
+taskset_command=
+if test -n "$TORTURE_AFFINITY"
+then
+	taskset_command="taskset -c $TORTURE_AFFINITY "
+fi
+sed -e 's/^[^#].*$/'"$taskset_command"'& 2>\&1 \&/' < $resdir/qemu-cmd > $T/qemu-cmd
+echo 'qemu_pid=$!' >> $T/qemu-cmd
+echo 'echo $qemu_pid > $resdir/qemu-pid' >> $T/qemu-cmd
+echo 'taskset -c -p $qemu_pid > $resdir/qemu-affinity' >> $T/qemu-cmd
 
 # In case qemu refuses to run...
 echo "NOTE: $QEMU either did not run or was interactive" > $resdir/console.log
 
 # Attempt to run qemu
 kstarttime=`gawk 'BEGIN { print systime() }' < /dev/null`
-( . $T/qemu-cmd; wait `cat  $resdir/qemu_pid`; echo $? > $resdir/qemu-retval ) &
+( . $T/qemu-cmd; wait `cat  $resdir/qemu-pid`; echo $? > $resdir/qemu-retval ) &
 commandcompleted=0
 if test -z "$TORTURE_KCONFIG_GDB_ARG"
 then
 	sleep 10 # Give qemu's pid a chance to reach the file
-	if test -s "$resdir/qemu_pid"
+	if test -s "$resdir/qemu-pid"
 	then
-		qemu_pid=`cat "$resdir/qemu_pid"`
-		echo Monitoring qemu job at pid $qemu_pid
+		qemu_pid=`cat "$resdir/qemu-pid"`
+		echo Monitoring qemu job at pid $qemu_pid `date`
 	else
 		qemu_pid=""
-		echo Monitoring qemu job at yet-as-unknown pid
+		echo Monitoring qemu job at yet-as-unknown pid `date`
 	fi
 fi
 if test -n "$TORTURE_KCONFIG_GDB_ARG"
@@ -82,9 +89,9 @@
 fi
 while :
 do
-	if test -z "$qemu_pid" -a -s "$resdir/qemu_pid"
+	if test -z "$qemu_pid" && test -s "$resdir/qemu-pid"
 	then
-		qemu_pid=`cat "$resdir/qemu_pid"`
+		qemu_pid=`cat "$resdir/qemu-pid"`
 	fi
 	kruntime=`gawk 'BEGIN { print systime() - '"$kstarttime"' }' < /dev/null`
 	if test -z "$qemu_pid" || kill -0 "$qemu_pid" > /dev/null 2>&1
@@ -115,22 +122,22 @@
 		break
 	fi
 done
-if test -z "$qemu_pid" -a -s "$resdir/qemu_pid"
+if test -z "$qemu_pid" && test -s "$resdir/qemu-pid"
 then
-	qemu_pid=`cat "$resdir/qemu_pid"`
+	qemu_pid=`cat "$resdir/qemu-pid"`
 fi
-if test $commandcompleted -eq 0 -a -n "$qemu_pid"
+if test $commandcompleted -eq 0 && test -n "$qemu_pid"
 then
 	if ! test -f "$resdir/../STOP.1"
 	then
-		echo Grace period for qemu job at pid $qemu_pid
+		echo Grace period for qemu job at pid $qemu_pid `date`
 	fi
 	oldline="`tail $resdir/console.log`"
 	while :
 	do
 		if test -f "$resdir/../STOP.1"
 		then
-			echo "PID $qemu_pid killed due to run STOP.1 request" >> $resdir/Warnings 2>&1
+			echo "PID $qemu_pid killed due to run STOP.1 request `date`" >> $resdir/Warnings 2>&1
 			kill -KILL $qemu_pid
 			break
 		fi
@@ -152,13 +159,17 @@
 		then
 			last_ts=0
 		fi
-		if test "$newline" != "$oldline" -a "$last_ts" -lt $((seconds + $TORTURE_SHUTDOWN_GRACE))
+		if test "$newline" != "$oldline" && test "$last_ts" -lt $((seconds + $TORTURE_SHUTDOWN_GRACE)) && test "$last_ts" -gt "$TORTURE_SHUTDOWN_GRACE"
 		then
 			must_continue=yes
+			if test $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
+			then
+				echo Continuing at console.log time $last_ts \"`tail -n 1 $resdir/console.log`\" `date`
+			fi
 		fi
-		if test $must_continue = no -a $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
+		if test $must_continue = no && test $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
 		then
-			echo "!!! PID $qemu_pid hung at $kruntime vs. $seconds seconds" >> $resdir/Warnings 2>&1
+			echo "!!! PID $qemu_pid hung at $kruntime vs. $seconds seconds `date`" >> $resdir/Warnings 2>&1
 			kill -KILL $qemu_pid
 			break
 		fi
@@ -172,5 +183,3 @@
 
 # Tell the script that this run is done.
 rm -f $resdir/build.run
-
-parse-console.sh $resdir/console.log $title
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
index 420ed5c..f4c8055 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
@@ -205,6 +205,7 @@
 echo "# TORTURE_JITTER_START=\"$TORTURE_JITTER_START\"" >> $resdir/qemu-cmd
 echo "# TORTURE_JITTER_STOP=\"$TORTURE_JITTER_STOP\"" >> $resdir/qemu-cmd
 echo "# TORTURE_TRUST_MAKE=\"$TORTURE_TRUST_MAKE\"; export TORTURE_TRUST_MAKE" >> $resdir/qemu-cmd
+echo "# TORTURE_CPU_COUNT=$cpu_count" >> $resdir/qemu-cmd
 
 if test -n "$TORTURE_BUILDONLY"
 then
@@ -214,3 +215,4 @@
 fi
 
 kvm-test-1-run-qemu.sh $resdir
+parse-console.sh $resdir/console.log $title
diff --git a/tools/testing/selftests/rcutorture/bin/kvm.sh b/tools/testing/selftests/rcutorture/bin/kvm.sh
index b4ac4ee3..f442d84 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm.sh
@@ -430,17 +430,10 @@
 	git diff HEAD >> $resdir/$ds/testid.txt
 fi
 ___EOF___
-awk < $T/cfgcpu.pack \
-	-v TORTURE_BUILDONLY="$TORTURE_BUILDONLY" \
-	-v CONFIGDIR="$CONFIGFRAG/" \
-	-v KVM="$KVM" \
-	-v ncpus=$cpus \
-	-v jitter="$jitter" \
-	-v rd=$resdir/$ds/ \
-	-v dur=$dur \
-	-v TORTURE_QEMU_ARG="$TORTURE_QEMU_ARG" \
-	-v TORTURE_BOOTARGS="$TORTURE_BOOTARGS" \
-'BEGIN {
+kvm-assign-cpus.sh /sys/devices/system/node > $T/cpuarray.awk
+kvm-get-cpus-script.sh $T/cpuarray.awk $T/dumpbatches.awk
+cat << '___EOF___' >> $T/dumpbatches.awk
+BEGIN {
 	i = 0;
 }
 
@@ -451,7 +444,7 @@
 }
 
 # Dump out the scripting required to run one test batch.
-function dump(first, pastlast, batchnum)
+function dump(first, pastlast, batchnum,  affinitylist)
 {
 	print "echo ----Start batch " batchnum ": `date` | tee -a " rd "log";
 	print "needqemurun="
@@ -483,6 +476,14 @@
 		print "echo ", cfr[jn], cpusr[jn] ovf ": Starting build. `date` | tee -a " rd "log";
 		print "mkdir " rd cfr[jn] " || :";
 		print "touch " builddir ".wait";
+		affinitylist = "";
+		if (gotcpus()) {
+			affinitylist = nextcpus(cpusr[jn]);
+		}
+		if (affinitylist ~ /^[0-9,-][0-9,-]*$/)
+			print "export TORTURE_AFFINITY=" affinitylist;
+		else
+			print "export TORTURE_AFFINITY=";
 		print "kvm-test-1-run.sh " CONFIGDIR cf[j], rd cfr[jn], dur " \"" TORTURE_QEMU_ARG "\" \"" TORTURE_BOOTARGS "\" > " rd cfr[jn]  "/kvm-test-1-run.sh.out 2>&1 &"
 		print "echo ", cfr[jn], cpusr[jn] ovf ": Waiting for build to complete. `date` | tee -a " rd "log";
 		print "while test -f " builddir ".wait"
@@ -560,7 +561,19 @@
 	# Dump the last batch.
 	if (ncpus != 0)
 		dump(first, i, batchnum);
-}' >> $T/script
+}
+___EOF___
+awk < $T/cfgcpu.pack \
+	-v TORTURE_BUILDONLY="$TORTURE_BUILDONLY" \
+	-v CONFIGDIR="$CONFIGFRAG/" \
+	-v KVM="$KVM" \
+	-v ncpus=$cpus \
+	-v jitter="$jitter" \
+	-v rd=$resdir/$ds/ \
+	-v dur=$dur \
+	-v TORTURE_QEMU_ARG="$TORTURE_QEMU_ARG" \
+	-v TORTURE_BOOTARGS="$TORTURE_BOOTARGS" \
+	-f $T/dumpbatches.awk >> $T/script
 echo kvm-end-run-stats.sh "$resdir/$ds" "$starttime" >> $T/script
 
 # Extract the tests and their batches from the script.
diff --git a/tools/testing/selftests/rcutorture/bin/torture.sh b/tools/testing/selftests/rcutorture/bin/torture.sh
index 53ec7c0..363f560 100755
--- a/tools/testing/selftests/rcutorture/bin/torture.sh
+++ b/tools/testing/selftests/rcutorture/bin/torture.sh
@@ -53,6 +53,7 @@
 do_kvfree=yes
 do_kasan=yes
 do_kcsan=no
+do_clocksourcewd=yes
 
 # doyesno - Helper function for yes/no arguments
 function doyesno () {
@@ -72,6 +73,7 @@
 	echo "       --configs-scftorture \"config-file list w/ repeat factor (2*CFLIST)\""
 	echo "       --doall"
 	echo "       --doallmodconfig / --do-no-allmodconfig"
+	echo "       --do-clocksourcewd / --do-no-clocksourcewd"
 	echo "       --do-kasan / --do-no-kasan"
 	echo "       --do-kcsan / --do-no-kcsan"
 	echo "       --do-kvfree / --do-no-kvfree"
@@ -109,7 +111,7 @@
 		configs_scftorture="$configs_scftorture $2"
 		shift
 		;;
-	--doall)
+	--do-all|--doall)
 		do_allmodconfig=yes
 		do_rcutorture=yes
 		do_locktorture=yes
@@ -119,10 +121,14 @@
 		do_kvfree=yes
 		do_kasan=yes
 		do_kcsan=yes
+		do_clocksourcewd=yes
 		;;
 	--do-allmodconfig|--do-no-allmodconfig)
 		do_allmodconfig=`doyesno "$1" --do-allmodconfig`
 		;;
+	--do-clocksourcewd|--do-no-clocksourcewd)
+		do_clocksourcewd=`doyesno "$1" --do-clocksourcewd`
+		;;
 	--do-kasan|--do-no-kasan)
 		do_kasan=`doyesno "$1" --do-kasan`
 		;;
@@ -135,7 +141,7 @@
 	--do-locktorture|--do-no-locktorture)
 		do_locktorture=`doyesno "$1" --do-locktorture`
 		;;
-	--do-none)
+	--do-none|--donone)
 		do_allmodconfig=no
 		do_rcutorture=no
 		do_locktorture=no
@@ -145,6 +151,7 @@
 		do_kvfree=no
 		do_kasan=no
 		do_kcsan=no
+		do_clocksourcewd=no
 		;;
 	--do-rcuscale|--do-no-rcuscale)
 		do_rcuscale=`doyesno "$1" --do-rcuscale`
@@ -279,9 +286,9 @@
 #	torture_bootargs="[ kernel boot arguments ]"
 #	torture_set flavor [ kvm.sh arguments ]
 #
-# Note that "flavor" is an arbitrary string.  Supply --torture if needed.
-# Note that quoting is problematic.  So on the command line, pass multiple
-# values with multiple kvm.sh argument instances.
+# Note that "flavor" is an arbitrary string that does not affect kvm.sh
+# in any way.  So also supply --torture if you need something other than
+# the default.
 function torture_set {
 	local cur_kcsan_kmake_args=
 	local kcsan_kmake_tag=
@@ -377,6 +384,22 @@
 	torture_set "rcuscale-kvfree" tools/testing/selftests/rcutorture/bin/kvm.sh --torture rcuscale --allcpus --duration 10 --kconfig "CONFIG_NR_CPUS=$HALF_ALLOTED_CPUS" --memory 1G --trust-make
 fi
 
+if test "$do_clocksourcewd" = "yes"
+then
+	torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000"
+	torture_set "clocksourcewd-1" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --kconfig "CONFIG_TEST_CLOCKSOURCE_WATCHDOG=y" --trust-make
+
+	torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000 clocksource.max_cswd_read_retries=1"
+	torture_set "clocksourcewd-2" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --kconfig "CONFIG_TEST_CLOCKSOURCE_WATCHDOG=y" --trust-make
+
+	# In case our work is already done...
+	if test "$do_rcutorture" != "yes"
+	then
+		torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000"
+		torture_set "clocksourcewd-3" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --trust-make
+	fi
+fi
+
 echo " --- " $scriptname $args
 echo " --- " Done `date` | tee -a $T/log
 ret=0
@@ -395,6 +418,10 @@
 	nfailures="`wc -l "$T/failures" | awk '{ print $1 }'`"
 	ret=2
 fi
+if test "$do_kcsan" = "yes"
+then
+	TORTURE_KCONFIG_KCSAN_ARG=1 tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh tools/testing/selftests/rcutorture/res/$ds > tools/testing/selftests/rcutorture/res/$ds/kcsan.sum
+fi
 echo Started at $startdate, ended at `date`, duration `get_starttime_duration $starttime`. | tee -a $T/log
 echo Summary: Successes: $nsuccesses Failures: $nfailures. | tee -a $T/log
 tdir="`cat $T/successes $T/failures | head -1 | awk '{ print $NF }' | sed -e 's,/[^/]\+/*$,,'`"
diff --git a/tools/testing/selftests/rcutorture/configs/rcu/RUDE01 b/tools/testing/selftests/rcutorture/configs/rcu/RUDE01
index bafe94c..3ca1124 100644
--- a/tools/testing/selftests/rcutorture/configs/rcu/RUDE01
+++ b/tools/testing/selftests/rcutorture/configs/rcu/RUDE01
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_HOTPLUG_CPU=y
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
diff --git a/tools/testing/selftests/rcutorture/configs/rcu/TASKS01 b/tools/testing/selftests/rcutorture/configs/rcu/TASKS01
index bafe94c..3ca1124 100644
--- a/tools/testing/selftests/rcutorture/configs/rcu/TASKS01
+++ b/tools/testing/selftests/rcutorture/configs/rcu/TASKS01
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_HOTPLUG_CPU=y
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
diff --git a/tools/testing/selftests/rcutorture/configs/rcu/TASKS03 b/tools/testing/selftests/rcutorture/configs/rcu/TASKS03
index ea43990..dc02083 100644
--- a/tools/testing/selftests/rcutorture/configs/rcu/TASKS03
+++ b/tools/testing/selftests/rcutorture/configs/rcu/TASKS03
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
 CONFIG_PREEMPT=y