Merge branches 'doc.2021.07.20c', 'fixes.2021.08.06a', 'nocb.2021.07.20c', 'nolibc.2021.07.20c', 'tasks.2021.07.20c', 'torture.2021.07.27a' and 'torturescript.2021.07.27a' into HEAD

doc.2021.07.20c: Documentation updates.
fixes.2021.08.06a: Miscellaneous fixes.
nocb.2021.07.20c: Callback-offloading (NOCB CPU) updates.
nolibc.2021.07.20c: Tiny userspace library updates.
tasks.2021.07.20c: Tasks RCU updates.
torture.2021.07.27a: In-kernel torture-test updates.
torturescript.2021.07.27a: Torture-test scripting updates.
diff --git a/Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst b/Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst
index 11cdab0..eeb3512 100644
--- a/Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst
+++ b/Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst
@@ -112,6 +112,35 @@
 The ``smp_mb__after_unlock_lock()`` invocations prevent this
 ``WARN_ON()`` from triggering.
 
++-----------------------------------------------------------------------+
+| **Quick Quiz**:                                                       |
++-----------------------------------------------------------------------+
+| But the chain of rcu_node-structure lock acquisitions guarantees      |
+| that new readers will see all of the updater's pre-grace-period       |
+| accesses and also guarantees that the updater's post-grace-period     |
+| accesses will see all of the old reader's accesses.  So why do we     |
+| need all of those calls to smp_mb__after_unlock_lock()?               |
++-----------------------------------------------------------------------+
+| **Answer**:                                                           |
++-----------------------------------------------------------------------+
+| Because we must provide ordering for RCU's polling grace-period       |
+| primitives, for example, get_state_synchronize_rcu() and              |
+| poll_state_synchronize_rcu().  Consider this code::                   |
+|                                                                       |
+|  CPU 0                                     CPU 1                      |
+|  ----                                      ----                       |
+|  WRITE_ONCE(X, 1)                          WRITE_ONCE(Y, 1)           |
+|  g = get_state_synchronize_rcu()           smp_mb()                   |
+|  while (!poll_state_synchronize_rcu(g))    r1 = READ_ONCE(X)          |
+|          continue;                                                    |
+|  r0 = READ_ONCE(Y)                                                    |
+|                                                                       |
+| RCU guarantees that the outcome r0 == 0 && r1 == 0 will not           |
+| happen, even if CPU 1 is in an RCU extended quiescent state           |
+| (idle or offline) and thus won't interact directly with the RCU       |
+| core processing at all.                                               |
++-----------------------------------------------------------------------+
+
 This approach must be extended to include idle CPUs, which need
 RCU's grace-period memory ordering guarantee to extend to any
 RCU read-side critical sections preceding and following the current
diff --git a/Documentation/RCU/Design/Requirements/Requirements.rst b/Documentation/RCU/Design/Requirements/Requirements.rst
index 38a3947..45278e2 100644
--- a/Documentation/RCU/Design/Requirements/Requirements.rst
+++ b/Documentation/RCU/Design/Requirements/Requirements.rst
@@ -362,9 +362,8 @@
       12 }
 
 The rcu_dereference() uses volatile casts and (for DEC Alpha) memory
-barriers in the Linux kernel. Should a `high-quality implementation of
-C11 ``memory_order_consume``
-[PDF] <http://www.rdrop.com/users/paulmck/RCU/consume.2015.07.13a.pdf>`__
+barriers in the Linux kernel. Should a |high-quality implementation of
+C11 memory_order_consume [PDF]|_
 ever appear, then rcu_dereference() could be implemented as a
 ``memory_order_consume`` load. Regardless of the exact implementation, a
 pointer fetched by rcu_dereference() may not be used outside of the
@@ -374,6 +373,9 @@
 mechanism, most commonly locking or `reference
 counting <https://www.kernel.org/doc/Documentation/RCU/rcuref.txt>`__.
 
+.. |high-quality implementation of C11 memory_order_consume [PDF]| replace:: high-quality implementation of C11 ``memory_order_consume`` [PDF]
+.. _high-quality implementation of C11 memory_order_consume [PDF]: http://www.rdrop.com/users/paulmck/RCU/consume.2015.07.13a.pdf
+
 In short, updaters use rcu_assign_pointer() and readers use
 rcu_dereference(), and these two RCU API elements work together to
 ensure that readers have a consistent view of newly added data elements.
diff --git a/Documentation/RCU/checklist.rst b/Documentation/RCU/checklist.rst
index 01cc21f..f4545b7c 100644
--- a/Documentation/RCU/checklist.rst
+++ b/Documentation/RCU/checklist.rst
@@ -37,7 +37,7 @@
 
 1.	Does the update code have proper mutual exclusion?
 
-	RCU does allow -readers- to run (almost) naked, but -writers- must
+	RCU does allow *readers* to run (almost) naked, but *writers* must
 	still use some sort of mutual exclusion, such as:
 
 	a.	locking,
@@ -73,7 +73,7 @@
 	critical section is every bit as bad as letting them leak out
 	from under a lock.  Unless, of course, you have arranged some
 	other means of protection, such as a lock or a reference count
-	-before- letting them out of the RCU read-side critical section.
+	*before* letting them out of the RCU read-side critical section.
 
 3.	Does the update code tolerate concurrent accesses?
 
@@ -101,7 +101,7 @@
 	c.	Make updates appear atomic to readers.	For example,
 		pointer updates to properly aligned fields will
 		appear atomic, as will individual atomic primitives.
-		Sequences of operations performed under a lock will -not-
+		Sequences of operations performed under a lock will *not*
 		appear to be atomic to RCU readers, nor will sequences
 		of multiple atomic primitives.
 
@@ -333,7 +333,7 @@
 	for example) may be omitted.
 
 10.	Conversely, if you are in an RCU read-side critical section,
-	and you don't hold the appropriate update-side lock, you -must-
+	and you don't hold the appropriate update-side lock, you *must*
 	use the "_rcu()" variants of the list macros.  Failing to do so
 	will break Alpha, cause aggressive compilers to generate bad code,
 	and confuse people trying to read your code.
@@ -359,12 +359,12 @@
 	callback pending, then that RCU callback will execute on some
 	surviving CPU.	(If this was not the case, a self-spawning RCU
 	callback would prevent the victim CPU from ever going offline.)
-	Furthermore, CPUs designated by rcu_nocbs= might well -always-
+	Furthermore, CPUs designated by rcu_nocbs= might well *always*
 	have their RCU callbacks executed on some other CPUs, in fact,
 	for some  real-time workloads, this is the whole point of using
 	the rcu_nocbs= kernel boot parameter.
 
-13.	Unlike other forms of RCU, it -is- permissible to block in an
+13.	Unlike other forms of RCU, it *is* permissible to block in an
 	SRCU read-side critical section (demarked by srcu_read_lock()
 	and srcu_read_unlock()), hence the "SRCU": "sleepable RCU".
 	Please note that if you don't need to sleep in read-side critical
@@ -411,16 +411,16 @@
 14.	The whole point of call_rcu(), synchronize_rcu(), and friends
 	is to wait until all pre-existing readers have finished before
 	carrying out some otherwise-destructive operation.  It is
-	therefore critically important to -first- remove any path
+	therefore critically important to *first* remove any path
 	that readers can follow that could be affected by the
-	destructive operation, and -only- -then- invoke call_rcu(),
+	destructive operation, and *only then* invoke call_rcu(),
 	synchronize_rcu(), or friends.
 
 	Because these primitives only wait for pre-existing readers, it
 	is the caller's responsibility to guarantee that any subsequent
 	readers will execute safely.
 
-15.	The various RCU read-side primitives do -not- necessarily contain
+15.	The various RCU read-side primitives do *not* necessarily contain
 	memory barriers.  You should therefore plan for the CPU
 	and the compiler to freely reorder code into and out of RCU
 	read-side critical sections.  It is the responsibility of the
@@ -459,8 +459,8 @@
 	pass in a function defined within a loadable module, then it in
 	necessary to wait for all pending callbacks to be invoked after
 	the last invocation and before unloading that module.  Note that
-	it is absolutely -not- sufficient to wait for a grace period!
-	The current (say) synchronize_rcu() implementation is -not-
+	it is absolutely *not* sufficient to wait for a grace period!
+	The current (say) synchronize_rcu() implementation is *not*
 	guaranteed to wait for callbacks registered on other CPUs.
 	Or even on the current CPU if that CPU recently went offline
 	and came back online.
@@ -470,7 +470,7 @@
 	-	call_rcu() -> rcu_barrier()
 	-	call_srcu() -> srcu_barrier()
 
-	However, these barrier functions are absolutely -not- guaranteed
+	However, these barrier functions are absolutely *not* guaranteed
 	to wait for a grace period.  In fact, if there are no call_rcu()
 	callbacks waiting anywhere in the system, rcu_barrier() is within
 	its rights to return immediately.
diff --git a/Documentation/RCU/rcu_dereference.rst b/Documentation/RCU/rcu_dereference.rst
index f3e587a..0b418a5 100644
--- a/Documentation/RCU/rcu_dereference.rst
+++ b/Documentation/RCU/rcu_dereference.rst
@@ -43,7 +43,7 @@
 	-	Set bits and clear bits down in the must-be-zero low-order
 		bits of that pointer.  This clearly means that the pointer
 		must have alignment constraints, for example, this does
-		-not- work in general for char* pointers.
+		*not* work in general for char* pointers.
 
 	-	XOR bits to translate pointers, as is done in some
 		classic buddy-allocator algorithms.
@@ -174,7 +174,7 @@
 		Please see the "CONTROL DEPENDENCIES" section of
 		Documentation/memory-barriers.txt for more details.
 
-	-	The pointers are not equal -and- the compiler does
+	-	The pointers are not equal *and* the compiler does
 		not have enough information to deduce the value of the
 		pointer.  Note that the volatile cast in rcu_dereference()
 		will normally prevent the compiler from knowing too much.
@@ -360,7 +360,7 @@
 return values.  This can result in "p->b" returning pre-initialization
 garbage values.
 
-In short, rcu_dereference() is -not- optional when you are going to
+In short, rcu_dereference() is *not* optional when you are going to
 dereference the resulting pointer.
 
 
diff --git a/Documentation/RCU/stallwarn.rst b/Documentation/RCU/stallwarn.rst
index 7148e9b..5036df2 100644
--- a/Documentation/RCU/stallwarn.rst
+++ b/Documentation/RCU/stallwarn.rst
@@ -32,7 +32,7 @@
 
 -	Booting Linux using a console connection that is too slow to
 	keep up with the boot-time console-message rate.  For example,
-	a 115Kbaud serial console can be -way- too slow to keep up
+	a 115Kbaud serial console can be *way* too slow to keep up
 	with boot-time message rates, and will frequently result in
 	RCU CPU stall warning messages.  Especially if you have added
 	debug printk()s.
@@ -105,7 +105,7 @@
 	leading the realization that the CPU had failed.
 
 The RCU, RCU-sched, and RCU-tasks implementations have CPU stall warning.
-Note that SRCU does -not- have CPU stall warnings.  Please note that
+Note that SRCU does *not* have CPU stall warnings.  Please note that
 RCU only detects CPU stalls when there is a grace period in progress.
 No grace period, no CPU stall warnings.
 
@@ -145,7 +145,7 @@
 	this parameter is checked only at the beginning of a cycle.
 	So if you are 10 seconds into a 40-second stall, setting this
 	sysfs parameter to (say) five will shorten the timeout for the
-	-next- stall, or the following warning for the current stall
+	*next* stall, or the following warning for the current stall
 	(assuming the stall lasts long enough).  It will not affect the
 	timing of the next warning for the current stall.
 
@@ -189,8 +189,8 @@
 Interpreting RCU's CPU Stall-Detector "Splats"
 ==============================================
 
-For non-RCU-tasks flavors of RCU, when a CPU detects that it is stalling,
-it will print a message similar to the following::
+For non-RCU-tasks flavors of RCU, when a CPU detects that some other
+CPU is stalling, it will print a message similar to the following::
 
 	INFO: rcu_sched detected stalls on CPUs/tasks:
 	2-...: (3 GPs behind) idle=06c/0/0 softirq=1453/1455 fqs=0
@@ -202,8 +202,10 @@
 will normally be followed by stack dumps for each CPU.  Please note that
 PREEMPT_RCU builds can be stalled by tasks as well as by CPUs, and that
 the tasks will be indicated by PID, for example, "P3421".  It is even
-possible for an rcu_state stall to be caused by both CPUs -and- tasks,
+possible for an rcu_state stall to be caused by both CPUs *and* tasks,
 in which case the offending CPUs and tasks will all be called out in the list.
+In some cases, CPUs will detect themselves stalling, which will result
+in a self-detected stall.
 
 CPU 2's "(3 GPs behind)" indicates that this CPU has not interacted with
 the RCU core for the past three grace periods.  In contrast, CPU 16's "(0
@@ -224,7 +226,7 @@
 last noted the beginning of a grace period, which might be the current
 (stalled) grace period, or it might be some earlier grace period (for
 example, if the CPU might have been in dyntick-idle mode for an extended
-time period.  The number after the "/" is the number that have executed
+time period).  The number after the "/" is the number that have executed
 since boot until the current time.  If this latter number stays constant
 across repeated stall-warning messages, it is possible that RCU's softirq
 handlers are no longer able to execute on this CPU.  This can happen if
@@ -283,7 +285,8 @@
 the stall warning, as was the case in the "All QSes seen" line above,
 the following additional line is printed::
 
-	kthread starved for 23807 jiffies! g7075 f0x0 RCU_GP_WAIT_FQS(3) ->state=0x1 ->cpu=5
+	rcu_sched kthread starved for 23807 jiffies! g7075 f0x0 RCU_GP_WAIT_FQS(3) ->state=0x1 ->cpu=5
+	Unless rcu_sched kthread gets sufficient CPU time, OOM is now expected behavior.
 
 Starving the grace-period kthreads of CPU time can of course result
 in RCU CPU stall warnings even when all CPUs and tasks have passed
@@ -313,15 +316,21 @@
 change on successive RCU CPU stall warnings, there is further reason to
 suspect a timer problem.
 
+These messages are usually followed by stack dumps of the CPUs and tasks
+involved in the stall.  These stack traces can help you locate the cause
+of the stall, keeping in mind that the CPU detecting the stall will have
+an interrupt frame that is mainly devoted to detecting the stall.
+
 
 Multiple Warnings From One Stall
 ================================
 
-If a stall lasts long enough, multiple stall-warning messages will be
-printed for it.  The second and subsequent messages are printed at
+If a stall lasts long enough, multiple stall-warning messages will
+be printed for it.  The second and subsequent messages are printed at
 longer intervals, so that the time between (say) the first and second
 message will be about three times the interval between the beginning
-of the stall and the first message.
+of the stall and the first message.  It can be helpful to compare the
+stack dumps for the different messages for the same stalled grace period.
 
 
 Stall Warnings for Expedited Grace Periods
diff --git a/include/linux/rculist.h b/include/linux/rculist.h
index f8633d3..d29740b 100644
--- a/include/linux/rculist.h
+++ b/include/linux/rculist.h
@@ -11,15 +11,6 @@
 #include <linux/rcupdate.h>
 
 /*
- * Why is there no list_empty_rcu()?  Because list_empty() serves this
- * purpose.  The list_empty() function fetches the RCU-protected pointer
- * and compares it to the address of the list head, but neither dereferences
- * this pointer itself nor provides this pointer to the caller.  Therefore,
- * it is not necessary to use rcu_dereference(), so that list_empty() can
- * be used anywhere you would want to use a list_empty_rcu().
- */
-
-/*
  * INIT_LIST_HEAD_RCU - Initialize a list_head visible to RCU readers
  * @list: list to be initialized
  *
@@ -318,21 +309,29 @@
 /*
  * Where are list_empty_rcu() and list_first_entry_rcu()?
  *
- * Implementing those functions following their counterparts list_empty() and
- * list_first_entry() is not advisable because they lead to subtle race
- * conditions as the following snippet shows:
+ * They do not exist because they would lead to subtle race conditions:
  *
  * if (!list_empty_rcu(mylist)) {
  *	struct foo *bar = list_first_entry_rcu(mylist, struct foo, list_member);
  *	do_something(bar);
  * }
  *
- * The list may not be empty when list_empty_rcu checks it, but it may be when
- * list_first_entry_rcu rereads the ->next pointer.
+ * The list might be non-empty when list_empty_rcu() checks it, but it
+ * might have become empty by the time that list_first_entry_rcu() rereads
+ * the ->next pointer, which would result in a SEGV.
  *
- * Rereading the ->next pointer is not a problem for list_empty() and
- * list_first_entry() because they would be protected by a lock that blocks
- * writers.
+ * When not using RCU, it is OK for list_first_entry() to re-read that
+ * pointer because both functions should be protected by some lock that
+ * blocks writers.
+ *
+ * When using RCU, list_empty() uses READ_ONCE() to fetch the
+ * RCU-protected ->next pointer and then compares it to the address of the
+ * list head.  However, it neither dereferences this pointer nor provides
+ * this pointer to its caller.  Thus, READ_ONCE() suffices (that is,
+ * rcu_dereference() is not needed), which means that list_empty() can be
+ * used anywhere you would want to use list_empty_rcu().  Just don't
+ * expect anything useful to happen if you do a subsequent lockless
+ * call to list_first_entry_rcu()!!!
  *
  * See list_first_or_null_rcu for an alternative.
  */
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index d9680b7..434d12f 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -53,7 +53,7 @@
  * nesting depth, but makes sense only if CONFIG_PREEMPT_RCU -- in other
  * types of kernel builds, the rcu_read_lock() nesting depth is unknowable.
  */
-#define rcu_preempt_depth() (current->rcu_read_lock_nesting)
+#define rcu_preempt_depth() READ_ONCE(current->rcu_read_lock_nesting)
 
 #else /* #ifdef CONFIG_PREEMPT_RCU */
 
@@ -167,7 +167,7 @@
 # define synchronize_rcu_tasks synchronize_rcu
 # endif
 
-# ifdef CONFIG_TASKS_RCU_TRACE
+# ifdef CONFIG_TASKS_TRACE_RCU
 # define rcu_tasks_trace_qs(t)						\
 	do {								\
 		if (!likely(READ_ONCE((t)->trc_reader_checked)) &&	\
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 953e70f..9be0153 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -14,9 +14,6 @@
 
 #include <asm/param.h> /* for HZ */
 
-/* Never flag non-existent other CPUs! */
-static inline bool rcu_eqs_special_set(int cpu) { return false; }
-
 unsigned long get_state_synchronize_rcu(void);
 unsigned long start_poll_synchronize_rcu(void);
 bool poll_state_synchronize_rcu(unsigned long oldstate);
diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h
index 0e0cf4d..6cfaa0a 100644
--- a/include/linux/srcutiny.h
+++ b/include/linux/srcutiny.h
@@ -61,7 +61,7 @@
 	int idx;
 
 	idx = ((READ_ONCE(ssp->srcu_idx) + 1) & 0x2) >> 1;
-	WRITE_ONCE(ssp->srcu_lock_nesting[idx], ssp->srcu_lock_nesting[idx] + 1);
+	WRITE_ONCE(ssp->srcu_lock_nesting[idx], READ_ONCE(ssp->srcu_lock_nesting[idx]) + 1);
 	return idx;
 }
 
@@ -81,11 +81,11 @@
 {
 	int idx;
 
-	idx = ((READ_ONCE(ssp->srcu_idx) + 1) & 0x2) >> 1;
+	idx = ((data_race(READ_ONCE(ssp->srcu_idx)) + 1) & 0x2) >> 1;
 	pr_alert("%s%s Tiny SRCU per-CPU(idx=%d): (%hd,%hd)\n",
 		 tt, tf, idx,
-		 READ_ONCE(ssp->srcu_lock_nesting[!idx]),
-		 READ_ONCE(ssp->srcu_lock_nesting[idx]));
+		 data_race(READ_ONCE(ssp->srcu_lock_nesting[!idx])),
+		 data_race(READ_ONCE(ssp->srcu_lock_nesting[idx])));
 }
 
 #endif
diff --git a/kernel/rcu/srcutiny.c b/kernel/rcu/srcutiny.c
index 26344dc..a0ba2ed49 100644
--- a/kernel/rcu/srcutiny.c
+++ b/kernel/rcu/srcutiny.c
@@ -96,7 +96,7 @@
  */
 void __srcu_read_unlock(struct srcu_struct *ssp, int idx)
 {
-	int newval = ssp->srcu_lock_nesting[idx] - 1;
+	int newval = READ_ONCE(ssp->srcu_lock_nesting[idx]) - 1;
 
 	WRITE_ONCE(ssp->srcu_lock_nesting[idx], newval);
 	if (!newval && READ_ONCE(ssp->srcu_gp_waiting))
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index 8536c55..806160c 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -643,8 +643,8 @@
 //
 // "Rude" variant of Tasks RCU, inspired by Steve Rostedt's trick of
 // passing an empty function to schedule_on_each_cpu().  This approach
-// provides an asynchronous call_rcu_tasks_rude() API and batching
-// of concurrent calls to the synchronous synchronize_rcu_rude() API.
+// provides an asynchronous call_rcu_tasks_rude() API and batching of
+// concurrent calls to the synchronous synchronize_rcu_tasks_rude() API.
 // This invokes schedule_on_each_cpu() in order to send IPIs far and wide
 // and induces otherwise unnecessary context switches on all online CPUs,
 // whether idle or not.
@@ -785,7 +785,10 @@
 //	set that task's .need_qs flag so that task's next outermost
 //	rcu_read_unlock_trace() will report the quiescent state (in which
 //	case the count of readers is incremented).  If both attempts fail,
-//	the task is added to a "holdout" list.
+//	the task is added to a "holdout" list.  Note that IPIs are used
+//	to invoke trc_read_check_handler() in the context of running tasks
+//	in order to avoid ordering overhead on common-case shared-variable
+//	accessses.
 // rcu_tasks_trace_postscan():
 //	Initialize state and attempt to identify an immediate quiescent
 //	state as above (but only for idle tasks), unblock CPU-hotplug
@@ -847,7 +850,7 @@
 /* If we are the last reader, wake up the grace-period kthread. */
 void rcu_read_unlock_trace_special(struct task_struct *t, int nesting)
 {
-	int nq = t->trc_reader_special.b.need_qs;
+	int nq = READ_ONCE(t->trc_reader_special.b.need_qs);
 
 	if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB) &&
 	    t->trc_reader_special.b.need_mb)
@@ -894,7 +897,7 @@
 
 	// If the task is not in a read-side critical section, and
 	// if this is the last reader, awaken the grace-period kthread.
-	if (likely(!t->trc_reader_nesting)) {
+	if (likely(!READ_ONCE(t->trc_reader_nesting))) {
 		if (WARN_ON_ONCE(atomic_dec_and_test(&trc_n_readers_need_end)))
 			wake_up(&trc_wait);
 		// Mark as checked after decrement to avoid false
@@ -903,7 +906,7 @@
 		goto reset_ipi;
 	}
 	// If we are racing with an rcu_read_unlock_trace(), try again later.
-	if (unlikely(t->trc_reader_nesting < 0)) {
+	if (unlikely(READ_ONCE(t->trc_reader_nesting) < 0)) {
 		if (WARN_ON_ONCE(atomic_dec_and_test(&trc_n_readers_need_end)))
 			wake_up(&trc_wait);
 		goto reset_ipi;
@@ -913,14 +916,14 @@
 	// Get here if the task is in a read-side critical section.  Set
 	// its state so that it will awaken the grace-period kthread upon
 	// exit from that critical section.
-	WARN_ON_ONCE(t->trc_reader_special.b.need_qs);
+	WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs));
 	WRITE_ONCE(t->trc_reader_special.b.need_qs, true);
 
 reset_ipi:
 	// Allow future IPIs to be sent on CPU and for task.
 	// Also order this IPI handler against any later manipulations of
 	// the intended task.
-	smp_store_release(&per_cpu(trc_ipi_to_cpu, smp_processor_id()), false); // ^^^
+	smp_store_release(per_cpu_ptr(&trc_ipi_to_cpu, smp_processor_id()), false); // ^^^
 	smp_store_release(&texp->trc_ipi_to_cpu, -1); // ^^^
 }
 
@@ -950,6 +953,7 @@
 			n_heavy_reader_ofl_updates++;
 		in_qs = true;
 	} else {
+		// The task is not running, so C-language access is safe.
 		in_qs = likely(!t->trc_reader_nesting);
 	}
 
@@ -964,7 +968,7 @@
 	// state so that it will awaken the grace-period kthread upon exit
 	// from that critical section.
 	atomic_inc(&trc_n_readers_need_end); // One more to wait on.
-	WARN_ON_ONCE(t->trc_reader_special.b.need_qs);
+	WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs));
 	WRITE_ONCE(t->trc_reader_special.b.need_qs, true);
 	return true;
 }
@@ -982,7 +986,7 @@
 	// The current task had better be in a quiescent state.
 	if (t == current) {
 		t->trc_reader_checked = true;
-		WARN_ON_ONCE(t->trc_reader_nesting);
+		WARN_ON_ONCE(READ_ONCE(t->trc_reader_nesting));
 		return;
 	}
 
@@ -994,6 +998,12 @@
 	}
 	put_task_struct(t);
 
+	// If this task is not yet on the holdout list, then we are in
+	// an RCU read-side critical section.  Otherwise, the invocation of
+	// rcu_add_holdout() that added it to the list did the necessary
+	// get_task_struct().  Either way, the task cannot be freed out
+	// from under this code.
+
 	// If currently running, send an IPI, either way, add to list.
 	trc_add_holdout(t, bhp);
 	if (task_curr(t) &&
@@ -1092,8 +1102,8 @@
 		 ".I"[READ_ONCE(t->trc_ipi_to_cpu) > 0],
 		 ".i"[is_idle_task(t)],
 		 ".N"[cpu > 0 && tick_nohz_full_cpu(cpu)],
-		 t->trc_reader_nesting,
-		 " N"[!!t->trc_reader_special.b.need_qs],
+		 READ_ONCE(t->trc_reader_nesting),
+		 " N"[!!READ_ONCE(t->trc_reader_special.b.need_qs)],
 		 cpu);
 	sched_show_task(t);
 }
@@ -1187,7 +1197,7 @@
 static void exit_tasks_rcu_finish_trace(struct task_struct *t)
 {
 	WRITE_ONCE(t->trc_reader_checked, true);
-	WARN_ON_ONCE(t->trc_reader_nesting);
+	WARN_ON_ONCE(READ_ONCE(t->trc_reader_nesting));
 	WRITE_ONCE(t->trc_reader_nesting, 0);
 	if (WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs)))
 		rcu_read_unlock_trace_special(t, 0);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 51f24ec..bce848e 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -74,17 +74,10 @@
 
 /* Data structures. */
 
-/*
- * Steal a bit from the bottom of ->dynticks for idle entry/exit
- * control.  Initially this is for TLB flushing.
- */
-#define RCU_DYNTICK_CTRL_MASK 0x1
-#define RCU_DYNTICK_CTRL_CTR  (RCU_DYNTICK_CTRL_MASK + 1)
-
 static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = {
 	.dynticks_nesting = 1,
 	.dynticks_nmi_nesting = DYNTICK_IRQ_NONIDLE,
-	.dynticks = ATOMIC_INIT(RCU_DYNTICK_CTRL_CTR),
+	.dynticks = ATOMIC_INIT(1),
 #ifdef CONFIG_RCU_NOCB_CPU
 	.cblist.flags = SEGCBLIST_SOFTIRQ_ONLY,
 #endif
@@ -259,6 +252,15 @@
 }
 
 /*
+ * Increment the current CPU's rcu_data structure's ->dynticks field
+ * with ordering.  Return the new value.
+ */
+static noinline noinstr unsigned long rcu_dynticks_inc(int incby)
+{
+	return arch_atomic_add_return(incby, this_cpu_ptr(&rcu_data.dynticks));
+}
+
+/*
  * Record entry into an extended quiescent state.  This is only to be
  * called when not already in an extended quiescent state, that is,
  * RCU is watching prior to the call to this function and is no longer
@@ -266,7 +268,6 @@
  */
 static noinstr void rcu_dynticks_eqs_enter(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	int seq;
 
 	/*
@@ -275,13 +276,9 @@
 	 * next idle sojourn.
 	 */
 	rcu_dynticks_task_trace_enter();  // Before ->dynticks update!
-	seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	seq = rcu_dynticks_inc(1);
 	// RCU is no longer watching.  Better be in extended quiescent state!
-	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-		     (seq & RCU_DYNTICK_CTRL_CTR));
-	/* Better not have special action (TLB flush) pending! */
-	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-		     (seq & RCU_DYNTICK_CTRL_MASK));
+	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && (seq & 0x1));
 }
 
 /*
@@ -291,7 +288,6 @@
  */
 static noinstr void rcu_dynticks_eqs_exit(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	int seq;
 
 	/*
@@ -299,15 +295,10 @@
 	 * and we also must force ordering with the next RCU read-side
 	 * critical section.
 	 */
-	seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	seq = rcu_dynticks_inc(1);
 	// RCU is now watching.  Better not be in an extended quiescent state!
 	rcu_dynticks_task_trace_exit();  // After ->dynticks update!
-	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-		     !(seq & RCU_DYNTICK_CTRL_CTR));
-	if (seq & RCU_DYNTICK_CTRL_MASK) {
-		arch_atomic_andnot(RCU_DYNTICK_CTRL_MASK, &rdp->dynticks);
-		smp_mb__after_atomic(); /* _exit after clearing mask. */
-	}
+	WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && !(seq & 0x1));
 }
 
 /*
@@ -324,9 +315,9 @@
 {
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
-	if (atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR)
+	if (atomic_read(&rdp->dynticks) & 0x1)
 		return;
-	atomic_add(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+	rcu_dynticks_inc(1);
 }
 
 /*
@@ -336,9 +327,7 @@
  */
 static __always_inline bool rcu_dynticks_curr_cpu_in_eqs(void)
 {
-	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
-
-	return !(arch_atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR);
+	return !(atomic_read(this_cpu_ptr(&rcu_data.dynticks)) & 0x1);
 }
 
 /*
@@ -347,9 +336,8 @@
  */
 static int rcu_dynticks_snap(struct rcu_data *rdp)
 {
-	int snap = atomic_add_return(0, &rdp->dynticks);
-
-	return snap & ~RCU_DYNTICK_CTRL_MASK;
+	smp_mb();  // Fundamental RCU ordering guarantee.
+	return atomic_read_acquire(&rdp->dynticks);
 }
 
 /*
@@ -358,7 +346,7 @@
  */
 static bool rcu_dynticks_in_eqs(int snap)
 {
-	return !(snap & RCU_DYNTICK_CTRL_CTR);
+	return !(snap & 0x1);
 }
 
 /* Return true if the specified CPU is currently idle from an RCU viewpoint.  */
@@ -389,8 +377,7 @@
 	int snap;
 
 	// If not quiescent, force back to earlier extended quiescent state.
-	snap = atomic_read(&rdp->dynticks) & ~(RCU_DYNTICK_CTRL_MASK |
-					       RCU_DYNTICK_CTRL_CTR);
+	snap = atomic_read(&rdp->dynticks) & ~0x1;
 
 	smp_rmb(); // Order ->dynticks and *vp reads.
 	if (READ_ONCE(*vp))
@@ -398,32 +385,7 @@
 	smp_rmb(); // Order *vp read and ->dynticks re-read.
 
 	// If still in the same extended quiescent state, we are good!
-	return snap == (atomic_read(&rdp->dynticks) & ~RCU_DYNTICK_CTRL_MASK);
-}
-
-/*
- * Set the special (bottom) bit of the specified CPU so that it
- * will take special action (such as flushing its TLB) on the
- * next exit from an extended quiescent state.  Returns true if
- * the bit was successfully set, or false if the CPU was not in
- * an extended quiescent state.
- */
-bool rcu_eqs_special_set(int cpu)
-{
-	int old;
-	int new;
-	int new_old;
-	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
-
-	new_old = atomic_read(&rdp->dynticks);
-	do {
-		old = new_old;
-		if (old & RCU_DYNTICK_CTRL_CTR)
-			return false;
-		new = old | RCU_DYNTICK_CTRL_MASK;
-		new_old = atomic_cmpxchg(&rdp->dynticks, old, new);
-	} while (new_old != old);
-	return true;
+	return snap == atomic_read(&rdp->dynticks);
 }
 
 /*
@@ -439,13 +401,12 @@
  */
 notrace void rcu_momentary_dyntick_idle(void)
 {
-	int special;
+	int seq;
 
 	raw_cpu_write(rcu_data.rcu_need_heavy_qs, false);
-	special = atomic_add_return(2 * RCU_DYNTICK_CTRL_CTR,
-				    &this_cpu_ptr(&rcu_data)->dynticks);
+	seq = rcu_dynticks_inc(2);
 	/* It is illegal to call this from idle state. */
-	WARN_ON_ONCE(!(special & RCU_DYNTICK_CTRL_CTR));
+	WARN_ON_ONCE(!(seq & 0x1));
 	rcu_preempt_deferred_qs(current);
 }
 EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle);
@@ -1325,7 +1286,7 @@
 	 */
 	jtsq = READ_ONCE(jiffies_to_sched_qs);
 	ruqp = per_cpu_ptr(&rcu_data.rcu_urgent_qs, rdp->cpu);
-	rnhqp = &per_cpu(rcu_data.rcu_need_heavy_qs, rdp->cpu);
+	rnhqp = per_cpu_ptr(&rcu_data.rcu_need_heavy_qs, rdp->cpu);
 	if (!READ_ONCE(*rnhqp) &&
 	    (time_after(jiffies, rcu_state.gp_start + jtsq * 2) ||
 	     time_after(jiffies, rcu_state.jiffies_resched) ||
@@ -1772,7 +1733,7 @@
 /*
  * Initialize a new grace period.  Return false if no grace period required.
  */
-static bool rcu_gp_init(void)
+static noinline_for_stack bool rcu_gp_init(void)
 {
 	unsigned long firstseq;
 	unsigned long flags;
@@ -1966,7 +1927,7 @@
 /*
  * Loop doing repeated quiescent-state forcing until the grace period ends.
  */
-static void rcu_gp_fqs_loop(void)
+static noinline_for_stack void rcu_gp_fqs_loop(void)
 {
 	bool first_gp_fqs;
 	int gf = 0;
@@ -1993,8 +1954,8 @@
 		trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 				       TPS("fqswait"));
 		WRITE_ONCE(rcu_state.gp_state, RCU_GP_WAIT_FQS);
-		ret = swait_event_idle_timeout_exclusive(
-				rcu_state.gp_wq, rcu_gp_fqs_check_wake(&gf), j);
+		(void)swait_event_idle_timeout_exclusive(rcu_state.gp_wq,
+				 rcu_gp_fqs_check_wake(&gf), j);
 		rcu_gp_torture_wait();
 		WRITE_ONCE(rcu_state.gp_state, RCU_GP_DOING_FQS);
 		/* Locking provides needed memory barriers. */
@@ -2471,9 +2432,6 @@
 	WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus - 1);
 	/* Adjust any no-longer-needed kthreads. */
 	rcu_boost_kthread_setaffinity(rnp, -1);
-	/* Do any needed no-CB deferred wakeups from this CPU. */
-	do_nocb_deferred_wakeup(per_cpu_ptr(&rcu_data, cpu));
-
 	// Stop-machine done, so allow nohz_full to disable tick.
 	tick_dep_clear(TICK_DEP_BIT_RCU);
 	return 0;
@@ -4050,7 +4008,7 @@
 	 */
 	init_completion(&rcu_state.barrier_completion);
 	atomic_set(&rcu_state.barrier_cpu_count, 2);
-	get_online_cpus();
+	cpus_read_lock();
 
 	/*
 	 * Force each CPU with callbacks to register a new callback.
@@ -4081,7 +4039,7 @@
 					  rcu_state.barrier_sequence);
 		}
 	}
-	put_online_cpus();
+	cpus_read_unlock();
 
 	/*
 	 * Now that we have an rcu_barrier_callback() callback on each
@@ -4784,4 +4742,5 @@
 
 #include "tree_stall.h"
 #include "tree_exp.h"
+#include "tree_nocb.h"
 #include "tree_plugin.h"
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
new file mode 100644
index 0000000..8fdf44f
--- /dev/null
+++ b/kernel/rcu/tree_nocb.h
@@ -0,0 +1,1496 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
+/*
+ * Read-Copy Update mechanism for mutual exclusion (tree-based version)
+ * Internal non-public definitions that provide either classic
+ * or preemptible semantics.
+ *
+ * Copyright Red Hat, 2009
+ * Copyright IBM Corporation, 2009
+ * Copyright SUSE, 2021
+ *
+ * Author: Ingo Molnar <mingo@elte.hu>
+ *	   Paul E. McKenney <paulmck@linux.ibm.com>
+ *	   Frederic Weisbecker <frederic@kernel.org>
+ */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
+static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
+{
+	return lockdep_is_held(&rdp->nocb_lock);
+}
+
+static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
+{
+	/* Race on early boot between thread creation and assignment */
+	if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
+		return true;
+
+	if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
+		if (in_task())
+			return true;
+	return false;
+}
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
+ * created that pull the callbacks from the corresponding CPU, wait for
+ * a grace period to elapse, and invoke the callbacks.  These kthreads
+ * are organized into GP kthreads, which manage incoming callbacks, wait for
+ * grace periods, and awaken CB kthreads, and the CB kthreads, which only
+ * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
+ * do a wake_up() on their GP kthread when they insert a callback into any
+ * empty list, unless the rcu_nocb_poll boot parameter has been specified,
+ * in which case each kthread actively polls its CPU.  (Which isn't so great
+ * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callbacks can also be used as an energy-efficiency
+ * measure because CPUs with no RCU callbacks queued are more aggressive
+ * about entering dyntick-idle mode.
+ */
+
+
+/*
+ * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
+ * If the list is invalid, a warning is emitted and all CPUs are offloaded.
+ */
+static int __init rcu_nocb_setup(char *str)
+{
+	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+	if (cpulist_parse(str, rcu_nocb_mask)) {
+		pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
+		cpumask_setall(rcu_nocb_mask);
+	}
+	return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+static int __init parse_rcu_nocb_poll(char *arg)
+{
+	rcu_nocb_poll = true;
+	return 0;
+}
+early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
+
+/*
+ * Don't bother bypassing ->cblist if the call_rcu() rate is low.
+ * After all, the main point of bypassing is to avoid lock contention
+ * on ->nocb_lock, which only can happen at high call_rcu() rates.
+ */
+static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
+module_param(nocb_nobypass_lim_per_jiffy, int, 0);
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
+ * lock isn't immediately available, increment ->nocb_lock_contended to
+ * flag the contention.
+ */
+static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
+	__acquires(&rdp->nocb_bypass_lock)
+{
+	lockdep_assert_irqs_disabled();
+	if (raw_spin_trylock(&rdp->nocb_bypass_lock))
+		return;
+	atomic_inc(&rdp->nocb_lock_contended);
+	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+	smp_mb__after_atomic(); /* atomic_inc() before lock. */
+	raw_spin_lock(&rdp->nocb_bypass_lock);
+	smp_mb__before_atomic(); /* atomic_dec() after lock. */
+	atomic_dec(&rdp->nocb_lock_contended);
+}
+
+/*
+ * Spinwait until the specified rcu_data structure's ->nocb_lock is
+ * not contended.  Please note that this is extremely special-purpose,
+ * relying on the fact that at most two kthreads and one CPU contend for
+ * this lock, and also that the two kthreads are guaranteed to have frequent
+ * grace-period-duration time intervals between successive acquisitions
+ * of the lock.  This allows us to use an extremely simple throttling
+ * mechanism, and further to apply it only to the CPU doing floods of
+ * call_rcu() invocations.  Don't try this at home!
+ */
+static void rcu_nocb_wait_contended(struct rcu_data *rdp)
+{
+	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+	while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
+		cpu_relax();
+}
+
+/*
+ * Conditionally acquire the specified rcu_data structure's
+ * ->nocb_bypass_lock.
+ */
+static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+	return raw_spin_trylock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_bypass_lock.
+ */
+static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
+	__releases(&rdp->nocb_bypass_lock)
+{
+	lockdep_assert_irqs_disabled();
+	raw_spin_unlock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+	if (!rcu_rdp_is_offloaded(rdp))
+		return;
+	raw_spin_lock(&rdp->nocb_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+	if (rcu_rdp_is_offloaded(rdp)) {
+		lockdep_assert_irqs_disabled();
+		raw_spin_unlock(&rdp->nocb_lock);
+	}
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock and restore
+ * interrupts, but only if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+				       unsigned long flags)
+{
+	if (rcu_rdp_is_offloaded(rdp)) {
+		lockdep_assert_irqs_disabled();
+		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+	} else {
+		local_irq_restore(flags);
+	}
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+	if (rcu_rdp_is_offloaded(rdp))
+		lockdep_assert_held(&rdp->nocb_lock);
+}
+
+/*
+ * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
+ * grace period.
+ */
+static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
+{
+	swake_up_all(sq);
+}
+
+static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
+{
+	return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
+}
+
+static void rcu_init_one_nocb(struct rcu_node *rnp)
+{
+	init_swait_queue_head(&rnp->nocb_gp_wq[0]);
+	init_swait_queue_head(&rnp->nocb_gp_wq[1]);
+}
+
+/* Is the specified CPU a no-CBs CPU? */
+bool rcu_is_nocb_cpu(int cpu)
+{
+	if (cpumask_available(rcu_nocb_mask))
+		return cpumask_test_cpu(cpu, rcu_nocb_mask);
+	return false;
+}
+
+static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
+			   struct rcu_data *rdp,
+			   bool force, unsigned long flags)
+	__releases(rdp_gp->nocb_gp_lock)
+{
+	bool needwake = false;
+
+	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
+		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+				    TPS("AlreadyAwake"));
+		return false;
+	}
+
+	if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+		del_timer(&rdp_gp->nocb_timer);
+	}
+
+	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
+		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
+		needwake = true;
+	}
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+	if (needwake) {
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
+		wake_up_process(rdp_gp->nocb_gp_kthread);
+	}
+
+	return needwake;
+}
+
+/*
+ * Kick the GP kthread for this NOCB group.
+ */
+static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
+}
+
+/*
+ * Arrange to wake the GP kthread for this NOCB group at some future
+ * time when it is safe to do so.
+ */
+static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
+			       const char *reason)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+
+	/*
+	 * Bypass wakeup overrides previous deferments. In case
+	 * of callback storm, no need to wake up too early.
+	 */
+	if (waketype == RCU_NOCB_WAKE_BYPASS) {
+		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	} else {
+		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
+			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
+		if (rdp_gp->nocb_defer_wakeup < waketype)
+			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	}
+
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				     unsigned long j)
+{
+	struct rcu_cblist rcl;
+
+	WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
+	rcu_lockdep_assert_cblist_protected(rdp);
+	lockdep_assert_held(&rdp->nocb_bypass_lock);
+	if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
+		raw_spin_unlock(&rdp->nocb_bypass_lock);
+		return false;
+	}
+	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
+	if (rhp)
+		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
+	WRITE_ONCE(rdp->nocb_bypass_first, j);
+	rcu_nocb_bypass_unlock(rdp);
+	return true;
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				  unsigned long j)
+{
+	if (!rcu_rdp_is_offloaded(rdp))
+		return true;
+	rcu_lockdep_assert_cblist_protected(rdp);
+	rcu_nocb_bypass_lock(rdp);
+	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+}
+
+/*
+ * If the ->nocb_bypass_lock is immediately available, flush the
+ * ->nocb_bypass queue into ->cblist.
+ */
+static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
+{
+	rcu_lockdep_assert_cblist_protected(rdp);
+	if (!rcu_rdp_is_offloaded(rdp) ||
+	    !rcu_nocb_bypass_trylock(rdp))
+		return;
+	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
+}
+
+/*
+ * See whether it is appropriate to use the ->nocb_bypass list in order
+ * to control contention on ->nocb_lock.  A limited number of direct
+ * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
+ * is non-empty, further callbacks must be placed into ->nocb_bypass,
+ * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
+ * back to direct use of ->cblist.  However, ->nocb_bypass should not be
+ * used if ->cblist is empty, because otherwise callbacks can be stranded
+ * on ->nocb_bypass because we cannot count on the current CPU ever again
+ * invoking call_rcu().  The general rule is that if ->nocb_bypass is
+ * non-empty, the corresponding no-CBs grace-period kthread must not be
+ * in an indefinite sleep state.
+ *
+ * Finally, it is not permitted to use the bypass during early boot,
+ * as doing so would confuse the auto-initialization code.  Besides
+ * which, there is no point in worrying about lock contention while
+ * there is only one CPU in operation.
+ */
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				bool *was_alldone, unsigned long flags)
+{
+	unsigned long c;
+	unsigned long cur_gp_seq;
+	unsigned long j = jiffies;
+	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+
+	lockdep_assert_irqs_disabled();
+
+	// Pure softirq/rcuc based processing: no bypassing, no
+	// locking.
+	if (!rcu_rdp_is_offloaded(rdp)) {
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		return false;
+	}
+
+	// In the process of (de-)offloading: no bypassing, but
+	// locking.
+	if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
+		rcu_nocb_lock(rdp);
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		return false; /* Not offloaded, no bypassing. */
+	}
+
+	// Don't use ->nocb_bypass during early boot.
+	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
+		rcu_nocb_lock(rdp);
+		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		return false;
+	}
+
+	// If we have advanced to a new jiffy, reset counts to allow
+	// moving back from ->nocb_bypass to ->cblist.
+	if (j == rdp->nocb_nobypass_last) {
+		c = rdp->nocb_nobypass_count + 1;
+	} else {
+		WRITE_ONCE(rdp->nocb_nobypass_last, j);
+		c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
+		if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
+				 nocb_nobypass_lim_per_jiffy))
+			c = 0;
+		else if (c > nocb_nobypass_lim_per_jiffy)
+			c = nocb_nobypass_lim_per_jiffy;
+	}
+	WRITE_ONCE(rdp->nocb_nobypass_count, c);
+
+	// If there hasn't yet been all that many ->cblist enqueues
+	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
+	// ->nocb_bypass first.
+	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+		rcu_nocb_lock(rdp);
+		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+		if (*was_alldone)
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("FirstQ"));
+		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+		return false; // Caller must enqueue the callback.
+	}
+
+	// If ->nocb_bypass has been used too long or is too full,
+	// flush ->nocb_bypass to ->cblist.
+	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+	    ncbs >= qhimark) {
+		rcu_nocb_lock(rdp);
+		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+			if (*was_alldone)
+				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+						    TPS("FirstQ"));
+			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+			return false; // Caller must enqueue the callback.
+		}
+		if (j != rdp->nocb_gp_adv_time &&
+		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+			rcu_advance_cbs_nowake(rdp->mynode, rdp);
+			rdp->nocb_gp_adv_time = j;
+		}
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		return true; // Callback already enqueued.
+	}
+
+	// We need to use the bypass.
+	rcu_nocb_wait_contended(rdp);
+	rcu_nocb_bypass_lock(rdp);
+	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+	if (!ncbs) {
+		WRITE_ONCE(rdp->nocb_bypass_first, j);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
+	}
+	rcu_nocb_bypass_unlock(rdp);
+	smp_mb(); /* Order enqueue before wake. */
+	if (ncbs) {
+		local_irq_restore(flags);
+	} else {
+		// No-CBs GP kthread might be indefinitely asleep, if so, wake.
+		rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
+		if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("FirstBQwake"));
+			__call_rcu_nocb_wake(rdp, true, flags);
+		} else {
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("FirstBQnoWake"));
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+		}
+	}
+	return true; // Callback already enqueued.
+}
+
+/*
+ * Awaken the no-CBs grace-period kthread if needed, either due to it
+ * legitimately being asleep or due to overload conditions.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
+				 unsigned long flags)
+				 __releases(rdp->nocb_lock)
+{
+	unsigned long cur_gp_seq;
+	unsigned long j;
+	long len;
+	struct task_struct *t;
+
+	// If we are being polled or there is no kthread, just leave.
+	t = READ_ONCE(rdp->nocb_gp_kthread);
+	if (rcu_nocb_poll || !t) {
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+				    TPS("WakeNotPoll"));
+		return;
+	}
+	// Need to actually to a wakeup.
+	len = rcu_segcblist_n_cbs(&rdp->cblist);
+	if (was_alldone) {
+		rdp->qlen_last_fqs_check = len;
+		if (!irqs_disabled_flags(flags)) {
+			/* ... if queue was empty ... */
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp(rdp, false);
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("WakeEmpty"));
+		} else {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
+					   TPS("WakeEmptyIsDeferred"));
+		}
+	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
+		/* ... or if many callbacks queued. */
+		rdp->qlen_last_fqs_check = len;
+		j = jiffies;
+		if (j != rdp->nocb_gp_adv_time &&
+		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+			rcu_advance_cbs_nowake(rdp->mynode, rdp);
+			rdp->nocb_gp_adv_time = j;
+		}
+		smp_mb(); /* Enqueue before timer_pending(). */
+		if ((rdp->nocb_cb_sleep ||
+		     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
+		    !timer_pending(&rdp->nocb_timer)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
+					   TPS("WakeOvfIsDeferred"));
+		} else {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+		}
+	} else {
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+	}
+	return;
+}
+
+/*
+ * Check if we ignore this rdp.
+ *
+ * We check that without holding the nocb lock but
+ * we make sure not to miss a freshly offloaded rdp
+ * with the current ordering:
+ *
+ *  rdp_offload_toggle()        nocb_gp_enabled_cb()
+ * -------------------------   ----------------------------
+ *    WRITE flags                 LOCK nocb_gp_lock
+ *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
+ *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
+ *    UNLOCK nocb_gp_lock         READ flags
+ */
+static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
+{
+	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
+
+	return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
+						     bool *needwake_state)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+
+	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
+			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
+			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+				*needwake_state = true;
+		}
+		return false;
+	}
+
+	/*
+	 * De-offloading. Clear our flag and notify the de-offload worker.
+	 * We will ignore this rdp until it ever gets re-offloaded.
+	 */
+	WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+	rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
+	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+		*needwake_state = true;
+	return true;
+}
+
+
+/*
+ * No-CBs GP kthreads come here to wait for additional callbacks to show up
+ * or for grace periods to end.
+ */
+static void nocb_gp_wait(struct rcu_data *my_rdp)
+{
+	bool bypass = false;
+	long bypass_ncbs;
+	int __maybe_unused cpu = my_rdp->cpu;
+	unsigned long cur_gp_seq;
+	unsigned long flags;
+	bool gotcbs = false;
+	unsigned long j = jiffies;
+	bool needwait_gp = false; // This prevents actual uninitialized use.
+	bool needwake;
+	bool needwake_gp;
+	struct rcu_data *rdp;
+	struct rcu_node *rnp;
+	unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
+	bool wasempty = false;
+
+	/*
+	 * Each pass through the following loop checks for CBs and for the
+	 * nearest grace period (if any) to wait for next.  The CB kthreads
+	 * and the global grace-period kthread are awakened if needed.
+	 */
+	WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
+	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
+		bool needwake_state = false;
+
+		if (!nocb_gp_enabled_cb(rdp))
+			continue;
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
+		rcu_nocb_lock_irqsave(rdp, flags);
+		if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			if (needwake_state)
+				swake_up_one(&rdp->nocb_state_wq);
+			continue;
+		}
+		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+		if (bypass_ncbs &&
+		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
+		     bypass_ncbs > 2 * qhimark)) {
+			// Bypass full or old, so flush it.
+			(void)rcu_nocb_try_flush_bypass(rdp, j);
+			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			if (needwake_state)
+				swake_up_one(&rdp->nocb_state_wq);
+			continue; /* No callbacks here, try next. */
+		}
+		if (bypass_ncbs) {
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("Bypass"));
+			bypass = true;
+		}
+		rnp = rdp->mynode;
+
+		// Advance callbacks if helpful and low contention.
+		needwake_gp = false;
+		if (!rcu_segcblist_restempty(&rdp->cblist,
+					     RCU_NEXT_READY_TAIL) ||
+		    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+		     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
+			raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
+			needwake_gp = rcu_advance_cbs(rnp, rdp);
+			wasempty = rcu_segcblist_restempty(&rdp->cblist,
+							   RCU_NEXT_READY_TAIL);
+			raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
+		}
+		// Need to wait on some grace period?
+		WARN_ON_ONCE(wasempty &&
+			     !rcu_segcblist_restempty(&rdp->cblist,
+						      RCU_NEXT_READY_TAIL));
+		if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
+			if (!needwait_gp ||
+			    ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
+				wait_gp_seq = cur_gp_seq;
+			needwait_gp = true;
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+					    TPS("NeedWaitGP"));
+		}
+		if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
+			needwake = rdp->nocb_cb_sleep;
+			WRITE_ONCE(rdp->nocb_cb_sleep, false);
+			smp_mb(); /* CB invocation -after- GP end. */
+		} else {
+			needwake = false;
+		}
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		if (needwake) {
+			swake_up_one(&rdp->nocb_cb_wq);
+			gotcbs = true;
+		}
+		if (needwake_gp)
+			rcu_gp_kthread_wake();
+		if (needwake_state)
+			swake_up_one(&rdp->nocb_state_wq);
+	}
+
+	my_rdp->nocb_gp_bypass = bypass;
+	my_rdp->nocb_gp_gp = needwait_gp;
+	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
+
+	if (bypass && !rcu_nocb_poll) {
+		// At least one child with non-empty ->nocb_bypass, so set
+		// timer in order to avoid stranding its callbacks.
+		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+				   TPS("WakeBypassIsDeferred"));
+	}
+	if (rcu_nocb_poll) {
+		/* Polling, so trace if first poll in the series. */
+		if (gotcbs)
+			trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
+		schedule_timeout_idle(1);
+	} else if (!needwait_gp) {
+		/* Wait for callbacks to appear. */
+		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
+		swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
+				!READ_ONCE(my_rdp->nocb_gp_sleep));
+		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
+	} else {
+		rnp = my_rdp->mynode;
+		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
+		swait_event_interruptible_exclusive(
+			rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
+			rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
+			!READ_ONCE(my_rdp->nocb_gp_sleep));
+		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
+	}
+	if (!rcu_nocb_poll) {
+		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
+		if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+			WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+			del_timer(&my_rdp->nocb_timer);
+		}
+		WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
+		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
+	}
+	my_rdp->nocb_gp_seq = -1;
+	WARN_ON(signal_pending(current));
+}
+
+/*
+ * No-CBs grace-period-wait kthread.  There is one of these per group
+ * of CPUs, but only once at least one CPU in that group has come online
+ * at least once since boot.  This kthread checks for newly posted
+ * callbacks from any of the CPUs it is responsible for, waits for a
+ * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
+ * that then have callback-invocation work to do.
+ */
+static int rcu_nocb_gp_kthread(void *arg)
+{
+	struct rcu_data *rdp = arg;
+
+	for (;;) {
+		WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
+		nocb_gp_wait(rdp);
+		cond_resched_tasks_rcu_qs();
+	}
+	return 0;
+}
+
+static inline bool nocb_cb_can_run(struct rcu_data *rdp)
+{
+	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
+	return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
+{
+	return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
+}
+
+/*
+ * Invoke any ready callbacks from the corresponding no-CBs CPU,
+ * then, if there are no more, wait for more to appear.
+ */
+static void nocb_cb_wait(struct rcu_data *rdp)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long cur_gp_seq;
+	unsigned long flags;
+	bool needwake_state = false;
+	bool needwake_gp = false;
+	bool can_sleep = true;
+	struct rcu_node *rnp = rdp->mynode;
+
+	local_irq_save(flags);
+	rcu_momentary_dyntick_idle();
+	local_irq_restore(flags);
+	/*
+	 * Disable BH to provide the expected environment.  Also, when
+	 * transitioning to/from NOCB mode, a self-requeuing callback might
+	 * be invoked from softirq.  A short grace period could cause both
+	 * instances of this callback would execute concurrently.
+	 */
+	local_bh_disable();
+	rcu_do_batch(rdp);
+	local_bh_enable();
+	lockdep_assert_irqs_enabled();
+	rcu_nocb_lock_irqsave(rdp, flags);
+	if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
+	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
+	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
+		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
+		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
+	}
+
+	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
+			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
+			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+				needwake_state = true;
+		}
+		if (rcu_segcblist_ready_cbs(cblist))
+			can_sleep = false;
+	} else {
+		/*
+		 * De-offloading. Clear our flag and notify the de-offload worker.
+		 * We won't touch the callbacks and keep sleeping until we ever
+		 * get re-offloaded.
+		 */
+		WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
+		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+			needwake_state = true;
+	}
+
+	WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
+
+	if (rdp->nocb_cb_sleep)
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
+
+	rcu_nocb_unlock_irqrestore(rdp, flags);
+	if (needwake_gp)
+		rcu_gp_kthread_wake();
+
+	if (needwake_state)
+		swake_up_one(&rdp->nocb_state_wq);
+
+	do {
+		swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
+						    nocb_cb_wait_cond(rdp));
+
+		// VVV Ensure CB invocation follows _sleep test.
+		if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
+			WARN_ON(signal_pending(current));
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+		}
+	} while (!nocb_cb_can_run(rdp));
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
+ * nocb_cb_wait() to do the dirty work.
+ */
+static int rcu_nocb_cb_kthread(void *arg)
+{
+	struct rcu_data *rdp = arg;
+
+	// Each pass through this loop does one callback batch, and,
+	// if there are no more ready callbacks, waits for them.
+	for (;;) {
+		nocb_cb_wait(rdp);
+		cond_resched_tasks_rcu_qs();
+	}
+	return 0;
+}
+
+/* Is a deferred wakeup of rcu_nocb_kthread() required? */
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
+{
+	return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread(). */
+static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
+					   struct rcu_data *rdp, int level,
+					   unsigned long flags)
+	__releases(rdp_gp->nocb_gp_lock)
+{
+	int ndw;
+	int ret;
+
+	if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
+		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+		return false;
+	}
+
+	ndw = rdp_gp->nocb_defer_wakeup;
+	ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
+
+	return ret;
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
+static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
+{
+	unsigned long flags;
+	struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
+
+	WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
+
+	raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
+	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
+	do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
+}
+
+/*
+ * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
+ * This means we do an inexact common-case check.  Note that if
+ * we miss, ->nocb_timer will eventually clean things up.
+ */
+static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
+		return false;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
+}
+
+void rcu_nocb_flush_deferred_wakeup(void)
+{
+	do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
+
+static int rdp_offload_toggle(struct rcu_data *rdp,
+			       bool offload, unsigned long flags)
+	__releases(rdp->nocb_lock)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+	bool wake_gp = false;
+
+	rcu_segcblist_offload(cblist, offload);
+
+	if (rdp->nocb_cb_sleep)
+		rdp->nocb_cb_sleep = false;
+	rcu_nocb_unlock_irqrestore(rdp, flags);
+
+	/*
+	 * Ignore former value of nocb_cb_sleep and force wake up as it could
+	 * have been spuriously set to false already.
+	 */
+	swake_up_one(&rdp->nocb_cb_wq);
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	if (rdp_gp->nocb_gp_sleep) {
+		rdp_gp->nocb_gp_sleep = false;
+		wake_gp = true;
+	}
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+	if (wake_gp)
+		wake_up_process(rdp_gp->nocb_gp_kthread);
+
+	return 0;
+}
+
+static long rcu_nocb_rdp_deoffload(void *arg)
+{
+	struct rcu_data *rdp = arg;
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long flags;
+	int ret;
+
+	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+
+	pr_info("De-offloading %d\n", rdp->cpu);
+
+	rcu_nocb_lock_irqsave(rdp, flags);
+	/*
+	 * Flush once and for all now. This suffices because we are
+	 * running on the target CPU holding ->nocb_lock (thus having
+	 * interrupts disabled), and because rdp_offload_toggle()
+	 * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
+	 * Thus future calls to rcu_segcblist_completely_offloaded() will
+	 * return false, which means that future calls to rcu_nocb_try_bypass()
+	 * will refuse to put anything into the bypass.
+	 */
+	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+	ret = rdp_offload_toggle(rdp, false, flags);
+	swait_event_exclusive(rdp->nocb_state_wq,
+			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
+							SEGCBLIST_KTHREAD_GP));
+	/*
+	 * Lock one last time to acquire latest callback updates from kthreads
+	 * so we can later handle callbacks locally without locking.
+	 */
+	rcu_nocb_lock_irqsave(rdp, flags);
+	/*
+	 * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
+	 * lock is released but how about being paranoid for once?
+	 */
+	rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
+	/*
+	 * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
+	 * rcu_nocb_unlock_irqrestore() anymore.
+	 */
+	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+
+	/* Sanity check */
+	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+
+
+	return ret;
+}
+
+int rcu_nocb_cpu_deoffload(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int ret = 0;
+
+	mutex_lock(&rcu_state.barrier_mutex);
+	cpus_read_lock();
+	if (rcu_rdp_is_offloaded(rdp)) {
+		if (cpu_online(cpu)) {
+			ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
+			if (!ret)
+				cpumask_clear_cpu(cpu, rcu_nocb_mask);
+		} else {
+			pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
+			ret = -EINVAL;
+		}
+	}
+	cpus_read_unlock();
+	mutex_unlock(&rcu_state.barrier_mutex);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
+
+static long rcu_nocb_rdp_offload(void *arg)
+{
+	struct rcu_data *rdp = arg;
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long flags;
+	int ret;
+
+	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+	/*
+	 * For now we only support re-offload, ie: the rdp must have been
+	 * offloaded on boot first.
+	 */
+	if (!rdp->nocb_gp_rdp)
+		return -EINVAL;
+
+	pr_info("Offloading %d\n", rdp->cpu);
+	/*
+	 * Can't use rcu_nocb_lock_irqsave() while we are in
+	 * SEGCBLIST_SOFTIRQ_ONLY mode.
+	 */
+	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+
+	/*
+	 * We didn't take the nocb lock while working on the
+	 * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
+	 * Every modifications that have been done previously on
+	 * rdp->cblist must be visible remotely by the nocb kthreads
+	 * upon wake up after reading the cblist flags.
+	 *
+	 * The layout against nocb_lock enforces that ordering:
+	 *
+	 *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
+	 * -------------------------   ----------------------------
+	 *      WRITE callbacks           rcu_nocb_lock()
+	 *      rcu_nocb_lock()           READ flags
+	 *      WRITE flags               READ callbacks
+	 *      rcu_nocb_unlock()         rcu_nocb_unlock()
+	 */
+	ret = rdp_offload_toggle(rdp, true, flags);
+	swait_event_exclusive(rdp->nocb_state_wq,
+			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
+			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+
+	return ret;
+}
+
+int rcu_nocb_cpu_offload(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int ret = 0;
+
+	mutex_lock(&rcu_state.barrier_mutex);
+	cpus_read_lock();
+	if (!rcu_rdp_is_offloaded(rdp)) {
+		if (cpu_online(cpu)) {
+			ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
+			if (!ret)
+				cpumask_set_cpu(cpu, rcu_nocb_mask);
+		} else {
+			pr_info("NOCB: Can't CB-offload an offline CPU\n");
+			ret = -EINVAL;
+		}
+	}
+	cpus_read_unlock();
+	mutex_unlock(&rcu_state.barrier_mutex);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
+
+void __init rcu_init_nohz(void)
+{
+	int cpu;
+	bool need_rcu_nocb_mask = false;
+	struct rcu_data *rdp;
+
+#if defined(CONFIG_NO_HZ_FULL)
+	if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
+		need_rcu_nocb_mask = true;
+#endif /* #if defined(CONFIG_NO_HZ_FULL) */
+
+	if (!cpumask_available(rcu_nocb_mask) && need_rcu_nocb_mask) {
+		if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
+			pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
+			return;
+		}
+	}
+	if (!cpumask_available(rcu_nocb_mask))
+		return;
+
+#if defined(CONFIG_NO_HZ_FULL)
+	if (tick_nohz_full_running)
+		cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
+#endif /* #if defined(CONFIG_NO_HZ_FULL) */
+
+	if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
+		pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
+		cpumask_and(rcu_nocb_mask, cpu_possible_mask,
+			    rcu_nocb_mask);
+	}
+	if (cpumask_empty(rcu_nocb_mask))
+		pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
+	else
+		pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
+			cpumask_pr_args(rcu_nocb_mask));
+	if (rcu_nocb_poll)
+		pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
+
+	for_each_cpu(cpu, rcu_nocb_mask) {
+		rdp = per_cpu_ptr(&rcu_data, cpu);
+		if (rcu_segcblist_empty(&rdp->cblist))
+			rcu_segcblist_init(&rdp->cblist);
+		rcu_segcblist_offload(&rdp->cblist, true);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
+	}
+	rcu_organize_nocb_kthreads();
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+	init_swait_queue_head(&rdp->nocb_cb_wq);
+	init_swait_queue_head(&rdp->nocb_gp_wq);
+	init_swait_queue_head(&rdp->nocb_state_wq);
+	raw_spin_lock_init(&rdp->nocb_lock);
+	raw_spin_lock_init(&rdp->nocb_bypass_lock);
+	raw_spin_lock_init(&rdp->nocb_gp_lock);
+	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
+	rcu_cblist_init(&rdp->nocb_bypass);
+}
+
+/*
+ * If the specified CPU is a no-CBs CPU that does not already have its
+ * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
+ * for this CPU's group has not yet been created, spawn it as well.
+ */
+static void rcu_spawn_one_nocb_kthread(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	struct rcu_data *rdp_gp;
+	struct task_struct *t;
+
+	/*
+	 * If this isn't a no-CBs CPU or if it already has an rcuo kthread,
+	 * then nothing to do.
+	 */
+	if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
+		return;
+
+	/* If we didn't spawn the GP kthread first, reorganize! */
+	rdp_gp = rdp->nocb_gp_rdp;
+	if (!rdp_gp->nocb_gp_kthread) {
+		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
+				"rcuog/%d", rdp_gp->cpu);
+		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
+			return;
+		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
+	}
+
+	/* Spawn the kthread for this CPU. */
+	t = kthread_run(rcu_nocb_cb_kthread, rdp,
+			"rcuo%c/%d", rcu_state.abbr, cpu);
+	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
+		return;
+	WRITE_ONCE(rdp->nocb_cb_kthread, t);
+	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
+}
+
+/*
+ * If the specified CPU is a no-CBs CPU that does not already have its
+ * rcuo kthread, spawn it.
+ */
+static void rcu_spawn_cpu_nocb_kthread(int cpu)
+{
+	if (rcu_scheduler_fully_active)
+		rcu_spawn_one_nocb_kthread(cpu);
+}
+
+/*
+ * Once the scheduler is running, spawn rcuo kthreads for all online
+ * no-CBs CPUs.  This assumes that the early_initcall()s happen before
+ * non-boot CPUs come online -- if this changes, we will need to add
+ * some mutual exclusion.
+ */
+static void __init rcu_spawn_nocb_kthreads(void)
+{
+	int cpu;
+
+	for_each_online_cpu(cpu)
+		rcu_spawn_cpu_nocb_kthread(cpu);
+}
+
+/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
+static int rcu_nocb_gp_stride = -1;
+module_param(rcu_nocb_gp_stride, int, 0444);
+
+/*
+ * Initialize GP-CB relationships for all no-CBs CPU.
+ */
+static void __init rcu_organize_nocb_kthreads(void)
+{
+	int cpu;
+	bool firsttime = true;
+	bool gotnocbs = false;
+	bool gotnocbscbs = true;
+	int ls = rcu_nocb_gp_stride;
+	int nl = 0;  /* Next GP kthread. */
+	struct rcu_data *rdp;
+	struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
+	struct rcu_data *rdp_prev = NULL;
+
+	if (!cpumask_available(rcu_nocb_mask))
+		return;
+	if (ls == -1) {
+		ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
+		rcu_nocb_gp_stride = ls;
+	}
+
+	/*
+	 * Each pass through this loop sets up one rcu_data structure.
+	 * Should the corresponding CPU come online in the future, then
+	 * we will spawn the needed set of rcu_nocb_kthread() kthreads.
+	 */
+	for_each_cpu(cpu, rcu_nocb_mask) {
+		rdp = per_cpu_ptr(&rcu_data, cpu);
+		if (rdp->cpu >= nl) {
+			/* New GP kthread, set up for CBs & next GP. */
+			gotnocbs = true;
+			nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
+			rdp->nocb_gp_rdp = rdp;
+			rdp_gp = rdp;
+			if (dump_tree) {
+				if (!firsttime)
+					pr_cont("%s\n", gotnocbscbs
+							? "" : " (self only)");
+				gotnocbscbs = false;
+				firsttime = false;
+				pr_alert("%s: No-CB GP kthread CPU %d:",
+					 __func__, cpu);
+			}
+		} else {
+			/* Another CB kthread, link to previous GP kthread. */
+			gotnocbscbs = true;
+			rdp->nocb_gp_rdp = rdp_gp;
+			rdp_prev->nocb_next_cb_rdp = rdp;
+			if (dump_tree)
+				pr_cont(" %d", cpu);
+		}
+		rdp_prev = rdp;
+	}
+	if (gotnocbs && dump_tree)
+		pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
+}
+
+/*
+ * Bind the current task to the offloaded CPUs.  If there are no offloaded
+ * CPUs, leave the task unbound.  Splat if the bind attempt fails.
+ */
+void rcu_bind_current_to_nocb(void)
+{
+	if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
+		WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
+}
+EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
+
+// The ->on_cpu field is available only in CONFIG_SMP=y, so...
+#ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+	return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
+}
+#else // #ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+	return "";
+}
+#endif // #else #ifdef CONFIG_SMP
+
+/*
+ * Dump out nocb grace-period kthread state for the specified rcu_data
+ * structure.
+ */
+static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
+{
+	struct rcu_node *rnp = rdp->mynode;
+
+	pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
+		rdp->cpu,
+		"kK"[!!rdp->nocb_gp_kthread],
+		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
+		"dD"[!!rdp->nocb_defer_wakeup],
+		"tT"[timer_pending(&rdp->nocb_timer)],
+		"sS"[!!rdp->nocb_gp_sleep],
+		".W"[swait_active(&rdp->nocb_gp_wq)],
+		".W"[swait_active(&rnp->nocb_gp_wq[0])],
+		".W"[swait_active(&rnp->nocb_gp_wq[1])],
+		".B"[!!rdp->nocb_gp_bypass],
+		".G"[!!rdp->nocb_gp_gp],
+		(long)rdp->nocb_gp_seq,
+		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
+		rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
+		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
+}
+
+/* Dump out nocb kthread state for the specified rcu_data structure. */
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+	char bufw[20];
+	char bufr[20];
+	struct rcu_segcblist *rsclp = &rdp->cblist;
+	bool waslocked;
+	bool wassleep;
+
+	if (rdp->nocb_gp_rdp == rdp)
+		show_rcu_nocb_gp_state(rdp);
+
+	sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
+	sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
+	pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
+		rdp->cpu, rdp->nocb_gp_rdp->cpu,
+		rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
+		"kK"[!!rdp->nocb_cb_kthread],
+		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
+		"cC"[!!atomic_read(&rdp->nocb_lock_contended)],
+		"lL"[raw_spin_is_locked(&rdp->nocb_lock)],
+		"sS"[!!rdp->nocb_cb_sleep],
+		".W"[swait_active(&rdp->nocb_cb_wq)],
+		jiffies - rdp->nocb_bypass_first,
+		jiffies - rdp->nocb_nobypass_last,
+		rdp->nocb_nobypass_count,
+		".D"[rcu_segcblist_ready_cbs(rsclp)],
+		".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
+		rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
+		".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
+		rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
+		".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
+		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
+		rcu_segcblist_n_cbs(&rdp->cblist),
+		rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
+		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
+
+	/* It is OK for GP kthreads to have GP state. */
+	if (rdp->nocb_gp_rdp == rdp)
+		return;
+
+	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
+	wassleep = swait_active(&rdp->nocb_gp_wq);
+	if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
+		return;  /* Nothing untoward. */
+
+	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
+		"lL"[waslocked],
+		"dD"[!!rdp->nocb_defer_wakeup],
+		"sS"[!!rdp->nocb_gp_sleep],
+		".W"[wassleep]);
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
+{
+	return 0;
+}
+
+static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
+{
+	return false;
+}
+
+/* No ->nocb_lock to acquire.  */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release.  */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release.  */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+				       unsigned long flags)
+{
+	local_irq_restore(flags);
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+	lockdep_assert_irqs_disabled();
+}
+
+static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
+{
+}
+
+static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
+{
+	return NULL;
+}
+
+static void rcu_init_one_nocb(struct rcu_node *rnp)
+{
+}
+
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				  unsigned long j)
+{
+	return true;
+}
+
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+				bool *was_alldone, unsigned long flags)
+{
+	return false;
+}
+
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
+				 unsigned long flags)
+{
+	WARN_ON_ONCE(1);  /* Should be dead code! */
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
+{
+	return false;
+}
+
+static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+	return false;
+}
+
+static void rcu_spawn_cpu_nocb_kthread(int cpu)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(void)
+{
+}
+
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index de1dc3b..7a4876a 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -13,39 +13,6 @@
 
 #include "../locking/rtmutex_common.h"
 
-#ifdef CONFIG_RCU_NOCB_CPU
-static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
-static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
-static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
-{
-	return lockdep_is_held(&rdp->nocb_lock);
-}
-
-static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
-{
-	/* Race on early boot between thread creation and assignment */
-	if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
-		return true;
-
-	if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
-		if (in_task())
-			return true;
-	return false;
-}
-
-#else
-static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
-{
-	return 0;
-}
-
-static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
-{
-	return false;
-}
-
-#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
-
 static bool rcu_rdp_is_offloaded(struct rcu_data *rdp)
 {
 	/*
@@ -346,7 +313,7 @@
 
 	trace_rcu_utilization(TPS("Start context switch"));
 	lockdep_assert_irqs_disabled();
-	WARN_ON_ONCE(!preempt && rcu_preempt_depth() > 0);
+	WARN_ONCE(!preempt && rcu_preempt_depth() > 0, "Voluntary context switch within RCU read-side critical section!");
 	if (rcu_preempt_depth() > 0 &&
 	    !t->rcu_read_unlock_special.b.blocked) {
 
@@ -405,17 +372,20 @@
 
 static void rcu_preempt_read_enter(void)
 {
-	current->rcu_read_lock_nesting++;
+	WRITE_ONCE(current->rcu_read_lock_nesting, READ_ONCE(current->rcu_read_lock_nesting) + 1);
 }
 
 static int rcu_preempt_read_exit(void)
 {
-	return --current->rcu_read_lock_nesting;
+	int ret = READ_ONCE(current->rcu_read_lock_nesting) - 1;
+
+	WRITE_ONCE(current->rcu_read_lock_nesting, ret);
+	return ret;
 }
 
 static void rcu_preempt_depth_set(int val)
 {
-	current->rcu_read_lock_nesting = val;
+	WRITE_ONCE(current->rcu_read_lock_nesting, val);
 }
 
 /*
@@ -1479,1460 +1449,6 @@
 
 #endif /* #else #if !defined(CONFIG_RCU_FAST_NO_HZ) */
 
-#ifdef CONFIG_RCU_NOCB_CPU
-
-/*
- * Offload callback processing from the boot-time-specified set of CPUs
- * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
- * created that pull the callbacks from the corresponding CPU, wait for
- * a grace period to elapse, and invoke the callbacks.  These kthreads
- * are organized into GP kthreads, which manage incoming callbacks, wait for
- * grace periods, and awaken CB kthreads, and the CB kthreads, which only
- * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
- * do a wake_up() on their GP kthread when they insert a callback into any
- * empty list, unless the rcu_nocb_poll boot parameter has been specified,
- * in which case each kthread actively polls its CPU.  (Which isn't so great
- * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
- *
- * This is intended to be used in conjunction with Frederic Weisbecker's
- * adaptive-idle work, which would seriously reduce OS jitter on CPUs
- * running CPU-bound user-mode computations.
- *
- * Offloading of callbacks can also be used as an energy-efficiency
- * measure because CPUs with no RCU callbacks queued are more aggressive
- * about entering dyntick-idle mode.
- */
-
-
-/*
- * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
- * If the list is invalid, a warning is emitted and all CPUs are offloaded.
- */
-static int __init rcu_nocb_setup(char *str)
-{
-	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
-	if (cpulist_parse(str, rcu_nocb_mask)) {
-		pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
-		cpumask_setall(rcu_nocb_mask);
-	}
-	return 1;
-}
-__setup("rcu_nocbs=", rcu_nocb_setup);
-
-static int __init parse_rcu_nocb_poll(char *arg)
-{
-	rcu_nocb_poll = true;
-	return 0;
-}
-early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
-
-/*
- * Don't bother bypassing ->cblist if the call_rcu() rate is low.
- * After all, the main point of bypassing is to avoid lock contention
- * on ->nocb_lock, which only can happen at high call_rcu() rates.
- */
-static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
-module_param(nocb_nobypass_lim_per_jiffy, int, 0);
-
-/*
- * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
- * lock isn't immediately available, increment ->nocb_lock_contended to
- * flag the contention.
- */
-static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
-	__acquires(&rdp->nocb_bypass_lock)
-{
-	lockdep_assert_irqs_disabled();
-	if (raw_spin_trylock(&rdp->nocb_bypass_lock))
-		return;
-	atomic_inc(&rdp->nocb_lock_contended);
-	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
-	smp_mb__after_atomic(); /* atomic_inc() before lock. */
-	raw_spin_lock(&rdp->nocb_bypass_lock);
-	smp_mb__before_atomic(); /* atomic_dec() after lock. */
-	atomic_dec(&rdp->nocb_lock_contended);
-}
-
-/*
- * Spinwait until the specified rcu_data structure's ->nocb_lock is
- * not contended.  Please note that this is extremely special-purpose,
- * relying on the fact that at most two kthreads and one CPU contend for
- * this lock, and also that the two kthreads are guaranteed to have frequent
- * grace-period-duration time intervals between successive acquisitions
- * of the lock.  This allows us to use an extremely simple throttling
- * mechanism, and further to apply it only to the CPU doing floods of
- * call_rcu() invocations.  Don't try this at home!
- */
-static void rcu_nocb_wait_contended(struct rcu_data *rdp)
-{
-	WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
-	while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
-		cpu_relax();
-}
-
-/*
- * Conditionally acquire the specified rcu_data structure's
- * ->nocb_bypass_lock.
- */
-static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-	return raw_spin_trylock(&rdp->nocb_bypass_lock);
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_bypass_lock.
- */
-static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
-	__releases(&rdp->nocb_bypass_lock)
-{
-	lockdep_assert_irqs_disabled();
-	raw_spin_unlock(&rdp->nocb_bypass_lock);
-}
-
-/*
- * Acquire the specified rcu_data structure's ->nocb_lock, but only
- * if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_lock(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-	if (!rcu_rdp_is_offloaded(rdp))
-		return;
-	raw_spin_lock(&rdp->nocb_lock);
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_lock, but only
- * if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_unlock(struct rcu_data *rdp)
-{
-	if (rcu_rdp_is_offloaded(rdp)) {
-		lockdep_assert_irqs_disabled();
-		raw_spin_unlock(&rdp->nocb_lock);
-	}
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_lock and restore
- * interrupts, but only if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
-				       unsigned long flags)
-{
-	if (rcu_rdp_is_offloaded(rdp)) {
-		lockdep_assert_irqs_disabled();
-		raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
-	} else {
-		local_irq_restore(flags);
-	}
-}
-
-/* Lockdep check that ->cblist may be safely accessed. */
-static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-	if (rcu_rdp_is_offloaded(rdp))
-		lockdep_assert_held(&rdp->nocb_lock);
-}
-
-/*
- * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
- * grace period.
- */
-static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
-{
-	swake_up_all(sq);
-}
-
-static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
-{
-	return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
-}
-
-static void rcu_init_one_nocb(struct rcu_node *rnp)
-{
-	init_swait_queue_head(&rnp->nocb_gp_wq[0]);
-	init_swait_queue_head(&rnp->nocb_gp_wq[1]);
-}
-
-/* Is the specified CPU a no-CBs CPU? */
-bool rcu_is_nocb_cpu(int cpu)
-{
-	if (cpumask_available(rcu_nocb_mask))
-		return cpumask_test_cpu(cpu, rcu_nocb_mask);
-	return false;
-}
-
-static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
-			   struct rcu_data *rdp,
-			   bool force, unsigned long flags)
-	__releases(rdp_gp->nocb_gp_lock)
-{
-	bool needwake = false;
-
-	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
-		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-				    TPS("AlreadyAwake"));
-		return false;
-	}
-
-	if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
-		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-		del_timer(&rdp_gp->nocb_timer);
-	}
-
-	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
-		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
-		needwake = true;
-	}
-	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-	if (needwake) {
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
-		wake_up_process(rdp_gp->nocb_gp_kthread);
-	}
-
-	return needwake;
-}
-
-/*
- * Kick the GP kthread for this NOCB group.
- */
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
-{
-	unsigned long flags;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
-}
-
-/*
- * Arrange to wake the GP kthread for this NOCB group at some future
- * time when it is safe to do so.
- */
-static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
-			       const char *reason)
-{
-	unsigned long flags;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-
-	/*
-	 * Bypass wakeup overrides previous deferments. In case
-	 * of callback storm, no need to wake up too early.
-	 */
-	if (waketype == RCU_NOCB_WAKE_BYPASS) {
-		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
-		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
-	} else {
-		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
-			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
-		if (rdp_gp->nocb_defer_wakeup < waketype)
-			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
-	}
-
-	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
-}
-
-/*
- * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
- * However, if there is a callback to be enqueued and if ->nocb_bypass
- * proves to be initially empty, just return false because the no-CB GP
- * kthread may need to be awakened in this case.
- *
- * Note that this function always returns true if rhp is NULL.
- */
-static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				     unsigned long j)
-{
-	struct rcu_cblist rcl;
-
-	WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
-	rcu_lockdep_assert_cblist_protected(rdp);
-	lockdep_assert_held(&rdp->nocb_bypass_lock);
-	if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
-		raw_spin_unlock(&rdp->nocb_bypass_lock);
-		return false;
-	}
-	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
-	if (rhp)
-		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
-	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
-	WRITE_ONCE(rdp->nocb_bypass_first, j);
-	rcu_nocb_bypass_unlock(rdp);
-	return true;
-}
-
-/*
- * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
- * However, if there is a callback to be enqueued and if ->nocb_bypass
- * proves to be initially empty, just return false because the no-CB GP
- * kthread may need to be awakened in this case.
- *
- * Note that this function always returns true if rhp is NULL.
- */
-static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j)
-{
-	if (!rcu_rdp_is_offloaded(rdp))
-		return true;
-	rcu_lockdep_assert_cblist_protected(rdp);
-	rcu_nocb_bypass_lock(rdp);
-	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
-}
-
-/*
- * If the ->nocb_bypass_lock is immediately available, flush the
- * ->nocb_bypass queue into ->cblist.
- */
-static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
-{
-	rcu_lockdep_assert_cblist_protected(rdp);
-	if (!rcu_rdp_is_offloaded(rdp) ||
-	    !rcu_nocb_bypass_trylock(rdp))
-		return;
-	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
-}
-
-/*
- * See whether it is appropriate to use the ->nocb_bypass list in order
- * to control contention on ->nocb_lock.  A limited number of direct
- * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
- * is non-empty, further callbacks must be placed into ->nocb_bypass,
- * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
- * back to direct use of ->cblist.  However, ->nocb_bypass should not be
- * used if ->cblist is empty, because otherwise callbacks can be stranded
- * on ->nocb_bypass because we cannot count on the current CPU ever again
- * invoking call_rcu().  The general rule is that if ->nocb_bypass is
- * non-empty, the corresponding no-CBs grace-period kthread must not be
- * in an indefinite sleep state.
- *
- * Finally, it is not permitted to use the bypass during early boot,
- * as doing so would confuse the auto-initialization code.  Besides
- * which, there is no point in worrying about lock contention while
- * there is only one CPU in operation.
- */
-static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags)
-{
-	unsigned long c;
-	unsigned long cur_gp_seq;
-	unsigned long j = jiffies;
-	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-
-	lockdep_assert_irqs_disabled();
-
-	// Pure softirq/rcuc based processing: no bypassing, no
-	// locking.
-	if (!rcu_rdp_is_offloaded(rdp)) {
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		return false;
-	}
-
-	// In the process of (de-)offloading: no bypassing, but
-	// locking.
-	if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
-		rcu_nocb_lock(rdp);
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		return false; /* Not offloaded, no bypassing. */
-	}
-
-	// Don't use ->nocb_bypass during early boot.
-	if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
-		rcu_nocb_lock(rdp);
-		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		return false;
-	}
-
-	// If we have advanced to a new jiffy, reset counts to allow
-	// moving back from ->nocb_bypass to ->cblist.
-	if (j == rdp->nocb_nobypass_last) {
-		c = rdp->nocb_nobypass_count + 1;
-	} else {
-		WRITE_ONCE(rdp->nocb_nobypass_last, j);
-		c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
-		if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
-				 nocb_nobypass_lim_per_jiffy))
-			c = 0;
-		else if (c > nocb_nobypass_lim_per_jiffy)
-			c = nocb_nobypass_lim_per_jiffy;
-	}
-	WRITE_ONCE(rdp->nocb_nobypass_count, c);
-
-	// If there hasn't yet been all that many ->cblist enqueues
-	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
-	// ->nocb_bypass first.
-	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
-		rcu_nocb_lock(rdp);
-		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-		if (*was_alldone)
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstQ"));
-		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
-		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-		return false; // Caller must enqueue the callback.
-	}
-
-	// If ->nocb_bypass has been used too long or is too full,
-	// flush ->nocb_bypass to ->cblist.
-	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
-	    ncbs >= qhimark) {
-		rcu_nocb_lock(rdp);
-		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
-			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-			if (*was_alldone)
-				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-						    TPS("FirstQ"));
-			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-			return false; // Caller must enqueue the callback.
-		}
-		if (j != rdp->nocb_gp_adv_time &&
-		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
-			rcu_advance_cbs_nowake(rdp->mynode, rdp);
-			rdp->nocb_gp_adv_time = j;
-		}
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		return true; // Callback already enqueued.
-	}
-
-	// We need to use the bypass.
-	rcu_nocb_wait_contended(rdp);
-	rcu_nocb_bypass_lock(rdp);
-	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
-	if (!ncbs) {
-		WRITE_ONCE(rdp->nocb_bypass_first, j);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
-	}
-	rcu_nocb_bypass_unlock(rdp);
-	smp_mb(); /* Order enqueue before wake. */
-	if (ncbs) {
-		local_irq_restore(flags);
-	} else {
-		// No-CBs GP kthread might be indefinitely asleep, if so, wake.
-		rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
-		if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstBQwake"));
-			__call_rcu_nocb_wake(rdp, true, flags);
-		} else {
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstBQnoWake"));
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-		}
-	}
-	return true; // Callback already enqueued.
-}
-
-/*
- * Awaken the no-CBs grace-period kthread if needed, either due to it
- * legitimately being asleep or due to overload conditions.
- *
- * If warranted, also wake up the kthread servicing this CPUs queues.
- */
-static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
-				 unsigned long flags)
-				 __releases(rdp->nocb_lock)
-{
-	unsigned long cur_gp_seq;
-	unsigned long j;
-	long len;
-	struct task_struct *t;
-
-	// If we are being polled or there is no kthread, just leave.
-	t = READ_ONCE(rdp->nocb_gp_kthread);
-	if (rcu_nocb_poll || !t) {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-				    TPS("WakeNotPoll"));
-		return;
-	}
-	// Need to actually to a wakeup.
-	len = rcu_segcblist_n_cbs(&rdp->cblist);
-	if (was_alldone) {
-		rdp->qlen_last_fqs_check = len;
-		if (!irqs_disabled_flags(flags)) {
-			/* ... if queue was empty ... */
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			wake_nocb_gp(rdp, false);
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("WakeEmpty"));
-		} else {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
-					   TPS("WakeEmptyIsDeferred"));
-		}
-	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
-		/* ... or if many callbacks queued. */
-		rdp->qlen_last_fqs_check = len;
-		j = jiffies;
-		if (j != rdp->nocb_gp_adv_time &&
-		    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-		    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
-			rcu_advance_cbs_nowake(rdp->mynode, rdp);
-			rdp->nocb_gp_adv_time = j;
-		}
-		smp_mb(); /* Enqueue before timer_pending(). */
-		if ((rdp->nocb_cb_sleep ||
-		     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
-		    !timer_pending(&rdp->nocb_timer)) {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
-					   TPS("WakeOvfIsDeferred"));
-		} else {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
-		}
-	} else {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
-	}
-	return;
-}
-
-/*
- * Check if we ignore this rdp.
- *
- * We check that without holding the nocb lock but
- * we make sure not to miss a freshly offloaded rdp
- * with the current ordering:
- *
- *  rdp_offload_toggle()        nocb_gp_enabled_cb()
- * -------------------------   ----------------------------
- *    WRITE flags                 LOCK nocb_gp_lock
- *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
- *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
- *    UNLOCK nocb_gp_lock         READ flags
- */
-static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
-{
-	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
-
-	return rcu_segcblist_test_flags(&rdp->cblist, flags);
-}
-
-static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
-						     bool *needwake_state)
-{
-	struct rcu_segcblist *cblist = &rdp->cblist;
-
-	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
-		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
-			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
-			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
-				*needwake_state = true;
-		}
-		return false;
-	}
-
-	/*
-	 * De-offloading. Clear our flag and notify the de-offload worker.
-	 * We will ignore this rdp until it ever gets re-offloaded.
-	 */
-	WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
-	rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
-	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
-		*needwake_state = true;
-	return true;
-}
-
-
-/*
- * No-CBs GP kthreads come here to wait for additional callbacks to show up
- * or for grace periods to end.
- */
-static void nocb_gp_wait(struct rcu_data *my_rdp)
-{
-	bool bypass = false;
-	long bypass_ncbs;
-	int __maybe_unused cpu = my_rdp->cpu;
-	unsigned long cur_gp_seq;
-	unsigned long flags;
-	bool gotcbs = false;
-	unsigned long j = jiffies;
-	bool needwait_gp = false; // This prevents actual uninitialized use.
-	bool needwake;
-	bool needwake_gp;
-	struct rcu_data *rdp;
-	struct rcu_node *rnp;
-	unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
-	bool wasempty = false;
-
-	/*
-	 * Each pass through the following loop checks for CBs and for the
-	 * nearest grace period (if any) to wait for next.  The CB kthreads
-	 * and the global grace-period kthread are awakened if needed.
-	 */
-	WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
-	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
-		bool needwake_state = false;
-
-		if (!nocb_gp_enabled_cb(rdp))
-			continue;
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
-		rcu_nocb_lock_irqsave(rdp, flags);
-		if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			if (needwake_state)
-				swake_up_one(&rdp->nocb_state_wq);
-			continue;
-		}
-		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-		if (bypass_ncbs &&
-		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
-		     bypass_ncbs > 2 * qhimark)) {
-			// Bypass full or old, so flush it.
-			(void)rcu_nocb_try_flush_bypass(rdp, j);
-			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
-			rcu_nocb_unlock_irqrestore(rdp, flags);
-			if (needwake_state)
-				swake_up_one(&rdp->nocb_state_wq);
-			continue; /* No callbacks here, try next. */
-		}
-		if (bypass_ncbs) {
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("Bypass"));
-			bypass = true;
-		}
-		rnp = rdp->mynode;
-
-		// Advance callbacks if helpful and low contention.
-		needwake_gp = false;
-		if (!rcu_segcblist_restempty(&rdp->cblist,
-					     RCU_NEXT_READY_TAIL) ||
-		    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-		     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
-			raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
-			needwake_gp = rcu_advance_cbs(rnp, rdp);
-			wasempty = rcu_segcblist_restempty(&rdp->cblist,
-							   RCU_NEXT_READY_TAIL);
-			raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
-		}
-		// Need to wait on some grace period?
-		WARN_ON_ONCE(wasempty &&
-			     !rcu_segcblist_restempty(&rdp->cblist,
-						      RCU_NEXT_READY_TAIL));
-		if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
-			if (!needwait_gp ||
-			    ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
-				wait_gp_seq = cur_gp_seq;
-			needwait_gp = true;
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("NeedWaitGP"));
-		}
-		if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
-			needwake = rdp->nocb_cb_sleep;
-			WRITE_ONCE(rdp->nocb_cb_sleep, false);
-			smp_mb(); /* CB invocation -after- GP end. */
-		} else {
-			needwake = false;
-		}
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		if (needwake) {
-			swake_up_one(&rdp->nocb_cb_wq);
-			gotcbs = true;
-		}
-		if (needwake_gp)
-			rcu_gp_kthread_wake();
-		if (needwake_state)
-			swake_up_one(&rdp->nocb_state_wq);
-	}
-
-	my_rdp->nocb_gp_bypass = bypass;
-	my_rdp->nocb_gp_gp = needwait_gp;
-	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
-
-	if (bypass && !rcu_nocb_poll) {
-		// At least one child with non-empty ->nocb_bypass, so set
-		// timer in order to avoid stranding its callbacks.
-		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
-				   TPS("WakeBypassIsDeferred"));
-	}
-	if (rcu_nocb_poll) {
-		/* Polling, so trace if first poll in the series. */
-		if (gotcbs)
-			trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
-		schedule_timeout_idle(1);
-	} else if (!needwait_gp) {
-		/* Wait for callbacks to appear. */
-		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
-		swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
-				!READ_ONCE(my_rdp->nocb_gp_sleep));
-		trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
-	} else {
-		rnp = my_rdp->mynode;
-		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
-		swait_event_interruptible_exclusive(
-			rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
-			rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
-			!READ_ONCE(my_rdp->nocb_gp_sleep));
-		trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
-	}
-	if (!rcu_nocb_poll) {
-		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
-		if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
-			WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-			del_timer(&my_rdp->nocb_timer);
-		}
-		WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
-		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
-	}
-	my_rdp->nocb_gp_seq = -1;
-	WARN_ON(signal_pending(current));
-}
-
-/*
- * No-CBs grace-period-wait kthread.  There is one of these per group
- * of CPUs, but only once at least one CPU in that group has come online
- * at least once since boot.  This kthread checks for newly posted
- * callbacks from any of the CPUs it is responsible for, waits for a
- * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
- * that then have callback-invocation work to do.
- */
-static int rcu_nocb_gp_kthread(void *arg)
-{
-	struct rcu_data *rdp = arg;
-
-	for (;;) {
-		WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
-		nocb_gp_wait(rdp);
-		cond_resched_tasks_rcu_qs();
-	}
-	return 0;
-}
-
-static inline bool nocb_cb_can_run(struct rcu_data *rdp)
-{
-	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
-	return rcu_segcblist_test_flags(&rdp->cblist, flags);
-}
-
-static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
-{
-	return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
-}
-
-/*
- * Invoke any ready callbacks from the corresponding no-CBs CPU,
- * then, if there are no more, wait for more to appear.
- */
-static void nocb_cb_wait(struct rcu_data *rdp)
-{
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	unsigned long cur_gp_seq;
-	unsigned long flags;
-	bool needwake_state = false;
-	bool needwake_gp = false;
-	bool can_sleep = true;
-	struct rcu_node *rnp = rdp->mynode;
-
-	local_irq_save(flags);
-	rcu_momentary_dyntick_idle();
-	local_irq_restore(flags);
-	/*
-	 * Disable BH to provide the expected environment.  Also, when
-	 * transitioning to/from NOCB mode, a self-requeuing callback might
-	 * be invoked from softirq.  A short grace period could cause both
-	 * instances of this callback would execute concurrently.
-	 */
-	local_bh_disable();
-	rcu_do_batch(rdp);
-	local_bh_enable();
-	lockdep_assert_irqs_enabled();
-	rcu_nocb_lock_irqsave(rdp, flags);
-	if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
-	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
-	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
-		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
-		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
-	}
-
-	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
-		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
-			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
-			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
-				needwake_state = true;
-		}
-		if (rcu_segcblist_ready_cbs(cblist))
-			can_sleep = false;
-	} else {
-		/*
-		 * De-offloading. Clear our flag and notify the de-offload worker.
-		 * We won't touch the callbacks and keep sleeping until we ever
-		 * get re-offloaded.
-		 */
-		WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
-		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
-		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
-			needwake_state = true;
-	}
-
-	WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
-
-	if (rdp->nocb_cb_sleep)
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
-
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-	if (needwake_gp)
-		rcu_gp_kthread_wake();
-
-	if (needwake_state)
-		swake_up_one(&rdp->nocb_state_wq);
-
-	do {
-		swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
-						    nocb_cb_wait_cond(rdp));
-
-		// VVV Ensure CB invocation follows _sleep test.
-		if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
-			WARN_ON(signal_pending(current));
-			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
-		}
-	} while (!nocb_cb_can_run(rdp));
-}
-
-/*
- * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
- * nocb_cb_wait() to do the dirty work.
- */
-static int rcu_nocb_cb_kthread(void *arg)
-{
-	struct rcu_data *rdp = arg;
-
-	// Each pass through this loop does one callback batch, and,
-	// if there are no more ready callbacks, waits for them.
-	for (;;) {
-		nocb_cb_wait(rdp);
-		cond_resched_tasks_rcu_qs();
-	}
-	return 0;
-}
-
-/* Is a deferred wakeup of rcu_nocb_kthread() required? */
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
-{
-	return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
-}
-
-/* Do a deferred wakeup of rcu_nocb_kthread(). */
-static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
-					   struct rcu_data *rdp, int level,
-					   unsigned long flags)
-	__releases(rdp_gp->nocb_gp_lock)
-{
-	int ndw;
-	int ret;
-
-	if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
-		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-		return false;
-	}
-
-	ndw = rdp_gp->nocb_defer_wakeup;
-	ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
-
-	return ret;
-}
-
-/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
-static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
-{
-	unsigned long flags;
-	struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
-
-	WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
-
-	raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
-	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
-	do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
-}
-
-/*
- * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
- * This means we do an inexact common-case check.  Note that if
- * we miss, ->nocb_timer will eventually clean things up.
- */
-static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
-{
-	unsigned long flags;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-	if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
-		return false;
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-	return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
-}
-
-void rcu_nocb_flush_deferred_wakeup(void)
-{
-	do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
-
-static int rdp_offload_toggle(struct rcu_data *rdp,
-			       bool offload, unsigned long flags)
-	__releases(rdp->nocb_lock)
-{
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-	bool wake_gp = false;
-
-	rcu_segcblist_offload(cblist, offload);
-
-	if (rdp->nocb_cb_sleep)
-		rdp->nocb_cb_sleep = false;
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-
-	/*
-	 * Ignore former value of nocb_cb_sleep and force wake up as it could
-	 * have been spuriously set to false already.
-	 */
-	swake_up_one(&rdp->nocb_cb_wq);
-
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-	if (rdp_gp->nocb_gp_sleep) {
-		rdp_gp->nocb_gp_sleep = false;
-		wake_gp = true;
-	}
-	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-
-	if (wake_gp)
-		wake_up_process(rdp_gp->nocb_gp_kthread);
-
-	return 0;
-}
-
-static long rcu_nocb_rdp_deoffload(void *arg)
-{
-	struct rcu_data *rdp = arg;
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	unsigned long flags;
-	int ret;
-
-	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
-
-	pr_info("De-offloading %d\n", rdp->cpu);
-
-	rcu_nocb_lock_irqsave(rdp, flags);
-	/*
-	 * Flush once and for all now. This suffices because we are
-	 * running on the target CPU holding ->nocb_lock (thus having
-	 * interrupts disabled), and because rdp_offload_toggle()
-	 * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
-	 * Thus future calls to rcu_segcblist_completely_offloaded() will
-	 * return false, which means that future calls to rcu_nocb_try_bypass()
-	 * will refuse to put anything into the bypass.
-	 */
-	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
-	ret = rdp_offload_toggle(rdp, false, flags);
-	swait_event_exclusive(rdp->nocb_state_wq,
-			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
-							SEGCBLIST_KTHREAD_GP));
-	/*
-	 * Lock one last time to acquire latest callback updates from kthreads
-	 * so we can later handle callbacks locally without locking.
-	 */
-	rcu_nocb_lock_irqsave(rdp, flags);
-	/*
-	 * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
-	 * lock is released but how about being paranoid for once?
-	 */
-	rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
-	/*
-	 * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
-	 * rcu_nocb_unlock_irqrestore() anymore.
-	 */
-	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
-
-	/* Sanity check */
-	WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-
-
-	return ret;
-}
-
-int rcu_nocb_cpu_deoffload(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	int ret = 0;
-
-	mutex_lock(&rcu_state.barrier_mutex);
-	cpus_read_lock();
-	if (rcu_rdp_is_offloaded(rdp)) {
-		if (cpu_online(cpu)) {
-			ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
-			if (!ret)
-				cpumask_clear_cpu(cpu, rcu_nocb_mask);
-		} else {
-			pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
-			ret = -EINVAL;
-		}
-	}
-	cpus_read_unlock();
-	mutex_unlock(&rcu_state.barrier_mutex);
-
-	return ret;
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
-
-static long rcu_nocb_rdp_offload(void *arg)
-{
-	struct rcu_data *rdp = arg;
-	struct rcu_segcblist *cblist = &rdp->cblist;
-	unsigned long flags;
-	int ret;
-
-	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
-	/*
-	 * For now we only support re-offload, ie: the rdp must have been
-	 * offloaded on boot first.
-	 */
-	if (!rdp->nocb_gp_rdp)
-		return -EINVAL;
-
-	pr_info("Offloading %d\n", rdp->cpu);
-	/*
-	 * Can't use rcu_nocb_lock_irqsave() while we are in
-	 * SEGCBLIST_SOFTIRQ_ONLY mode.
-	 */
-	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
-
-	/*
-	 * We didn't take the nocb lock while working on the
-	 * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
-	 * Every modifications that have been done previously on
-	 * rdp->cblist must be visible remotely by the nocb kthreads
-	 * upon wake up after reading the cblist flags.
-	 *
-	 * The layout against nocb_lock enforces that ordering:
-	 *
-	 *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
-	 * -------------------------   ----------------------------
-	 *      WRITE callbacks           rcu_nocb_lock()
-	 *      rcu_nocb_lock()           READ flags
-	 *      WRITE flags               READ callbacks
-	 *      rcu_nocb_unlock()         rcu_nocb_unlock()
-	 */
-	ret = rdp_offload_toggle(rdp, true, flags);
-	swait_event_exclusive(rdp->nocb_state_wq,
-			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
-			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
-
-	return ret;
-}
-
-int rcu_nocb_cpu_offload(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	int ret = 0;
-
-	mutex_lock(&rcu_state.barrier_mutex);
-	cpus_read_lock();
-	if (!rcu_rdp_is_offloaded(rdp)) {
-		if (cpu_online(cpu)) {
-			ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
-			if (!ret)
-				cpumask_set_cpu(cpu, rcu_nocb_mask);
-		} else {
-			pr_info("NOCB: Can't CB-offload an offline CPU\n");
-			ret = -EINVAL;
-		}
-	}
-	cpus_read_unlock();
-	mutex_unlock(&rcu_state.barrier_mutex);
-
-	return ret;
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
-
-void __init rcu_init_nohz(void)
-{
-	int cpu;
-	bool need_rcu_nocb_mask = false;
-	struct rcu_data *rdp;
-
-#if defined(CONFIG_NO_HZ_FULL)
-	if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
-		need_rcu_nocb_mask = true;
-#endif /* #if defined(CONFIG_NO_HZ_FULL) */
-
-	if (!cpumask_available(rcu_nocb_mask) && need_rcu_nocb_mask) {
-		if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
-			pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
-			return;
-		}
-	}
-	if (!cpumask_available(rcu_nocb_mask))
-		return;
-
-#if defined(CONFIG_NO_HZ_FULL)
-	if (tick_nohz_full_running)
-		cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
-#endif /* #if defined(CONFIG_NO_HZ_FULL) */
-
-	if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
-		pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
-		cpumask_and(rcu_nocb_mask, cpu_possible_mask,
-			    rcu_nocb_mask);
-	}
-	if (cpumask_empty(rcu_nocb_mask))
-		pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
-	else
-		pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
-			cpumask_pr_args(rcu_nocb_mask));
-	if (rcu_nocb_poll)
-		pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
-
-	for_each_cpu(cpu, rcu_nocb_mask) {
-		rdp = per_cpu_ptr(&rcu_data, cpu);
-		if (rcu_segcblist_empty(&rdp->cblist))
-			rcu_segcblist_init(&rdp->cblist);
-		rcu_segcblist_offload(&rdp->cblist, true);
-		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
-		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
-	}
-	rcu_organize_nocb_kthreads();
-}
-
-/* Initialize per-rcu_data variables for no-CBs CPUs. */
-static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
-{
-	init_swait_queue_head(&rdp->nocb_cb_wq);
-	init_swait_queue_head(&rdp->nocb_gp_wq);
-	init_swait_queue_head(&rdp->nocb_state_wq);
-	raw_spin_lock_init(&rdp->nocb_lock);
-	raw_spin_lock_init(&rdp->nocb_bypass_lock);
-	raw_spin_lock_init(&rdp->nocb_gp_lock);
-	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
-	rcu_cblist_init(&rdp->nocb_bypass);
-}
-
-/*
- * If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
- * for this CPU's group has not yet been created, spawn it as well.
- */
-static void rcu_spawn_one_nocb_kthread(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	struct rcu_data *rdp_gp;
-	struct task_struct *t;
-
-	/*
-	 * If this isn't a no-CBs CPU or if it already has an rcuo kthread,
-	 * then nothing to do.
-	 */
-	if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
-		return;
-
-	/* If we didn't spawn the GP kthread first, reorganize! */
-	rdp_gp = rdp->nocb_gp_rdp;
-	if (!rdp_gp->nocb_gp_kthread) {
-		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
-				"rcuog/%d", rdp_gp->cpu);
-		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
-			return;
-		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
-	}
-
-	/* Spawn the kthread for this CPU. */
-	t = kthread_run(rcu_nocb_cb_kthread, rdp,
-			"rcuo%c/%d", rcu_state.abbr, cpu);
-	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
-		return;
-	WRITE_ONCE(rdp->nocb_cb_kthread, t);
-	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
-}
-
-/*
- * If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo kthread, spawn it.
- */
-static void rcu_spawn_cpu_nocb_kthread(int cpu)
-{
-	if (rcu_scheduler_fully_active)
-		rcu_spawn_one_nocb_kthread(cpu);
-}
-
-/*
- * Once the scheduler is running, spawn rcuo kthreads for all online
- * no-CBs CPUs.  This assumes that the early_initcall()s happen before
- * non-boot CPUs come online -- if this changes, we will need to add
- * some mutual exclusion.
- */
-static void __init rcu_spawn_nocb_kthreads(void)
-{
-	int cpu;
-
-	for_each_online_cpu(cpu)
-		rcu_spawn_cpu_nocb_kthread(cpu);
-}
-
-/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
-static int rcu_nocb_gp_stride = -1;
-module_param(rcu_nocb_gp_stride, int, 0444);
-
-/*
- * Initialize GP-CB relationships for all no-CBs CPU.
- */
-static void __init rcu_organize_nocb_kthreads(void)
-{
-	int cpu;
-	bool firsttime = true;
-	bool gotnocbs = false;
-	bool gotnocbscbs = true;
-	int ls = rcu_nocb_gp_stride;
-	int nl = 0;  /* Next GP kthread. */
-	struct rcu_data *rdp;
-	struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
-	struct rcu_data *rdp_prev = NULL;
-
-	if (!cpumask_available(rcu_nocb_mask))
-		return;
-	if (ls == -1) {
-		ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
-		rcu_nocb_gp_stride = ls;
-	}
-
-	/*
-	 * Each pass through this loop sets up one rcu_data structure.
-	 * Should the corresponding CPU come online in the future, then
-	 * we will spawn the needed set of rcu_nocb_kthread() kthreads.
-	 */
-	for_each_cpu(cpu, rcu_nocb_mask) {
-		rdp = per_cpu_ptr(&rcu_data, cpu);
-		if (rdp->cpu >= nl) {
-			/* New GP kthread, set up for CBs & next GP. */
-			gotnocbs = true;
-			nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
-			rdp->nocb_gp_rdp = rdp;
-			rdp_gp = rdp;
-			if (dump_tree) {
-				if (!firsttime)
-					pr_cont("%s\n", gotnocbscbs
-							? "" : " (self only)");
-				gotnocbscbs = false;
-				firsttime = false;
-				pr_alert("%s: No-CB GP kthread CPU %d:",
-					 __func__, cpu);
-			}
-		} else {
-			/* Another CB kthread, link to previous GP kthread. */
-			gotnocbscbs = true;
-			rdp->nocb_gp_rdp = rdp_gp;
-			rdp_prev->nocb_next_cb_rdp = rdp;
-			if (dump_tree)
-				pr_cont(" %d", cpu);
-		}
-		rdp_prev = rdp;
-	}
-	if (gotnocbs && dump_tree)
-		pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
-}
-
-/*
- * Bind the current task to the offloaded CPUs.  If there are no offloaded
- * CPUs, leave the task unbound.  Splat if the bind attempt fails.
- */
-void rcu_bind_current_to_nocb(void)
-{
-	if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
-		WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
-}
-EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
-
-// The ->on_cpu field is available only in CONFIG_SMP=y, so...
-#ifdef CONFIG_SMP
-static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
-{
-	return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
-}
-#else // #ifdef CONFIG_SMP
-static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
-{
-	return "";
-}
-#endif // #else #ifdef CONFIG_SMP
-
-/*
- * Dump out nocb grace-period kthread state for the specified rcu_data
- * structure.
- */
-static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
-{
-	struct rcu_node *rnp = rdp->mynode;
-
-	pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
-		rdp->cpu,
-		"kK"[!!rdp->nocb_gp_kthread],
-		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
-		"dD"[!!rdp->nocb_defer_wakeup],
-		"tT"[timer_pending(&rdp->nocb_timer)],
-		"sS"[!!rdp->nocb_gp_sleep],
-		".W"[swait_active(&rdp->nocb_gp_wq)],
-		".W"[swait_active(&rnp->nocb_gp_wq[0])],
-		".W"[swait_active(&rnp->nocb_gp_wq[1])],
-		".B"[!!rdp->nocb_gp_bypass],
-		".G"[!!rdp->nocb_gp_gp],
-		(long)rdp->nocb_gp_seq,
-		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
-		rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
-		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
-		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
-}
-
-/* Dump out nocb kthread state for the specified rcu_data structure. */
-static void show_rcu_nocb_state(struct rcu_data *rdp)
-{
-	char bufw[20];
-	char bufr[20];
-	struct rcu_segcblist *rsclp = &rdp->cblist;
-	bool waslocked;
-	bool wassleep;
-
-	if (rdp->nocb_gp_rdp == rdp)
-		show_rcu_nocb_gp_state(rdp);
-
-	sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
-	sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
-	pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
-		rdp->cpu, rdp->nocb_gp_rdp->cpu,
-		rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
-		"kK"[!!rdp->nocb_cb_kthread],
-		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
-		"cC"[!!atomic_read(&rdp->nocb_lock_contended)],
-		"lL"[raw_spin_is_locked(&rdp->nocb_lock)],
-		"sS"[!!rdp->nocb_cb_sleep],
-		".W"[swait_active(&rdp->nocb_cb_wq)],
-		jiffies - rdp->nocb_bypass_first,
-		jiffies - rdp->nocb_nobypass_last,
-		rdp->nocb_nobypass_count,
-		".D"[rcu_segcblist_ready_cbs(rsclp)],
-		".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
-		rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
-		".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
-		rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
-		".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
-		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
-		rcu_segcblist_n_cbs(&rdp->cblist),
-		rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
-		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
-		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
-
-	/* It is OK for GP kthreads to have GP state. */
-	if (rdp->nocb_gp_rdp == rdp)
-		return;
-
-	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
-	wassleep = swait_active(&rdp->nocb_gp_wq);
-	if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
-		return;  /* Nothing untoward. */
-
-	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
-		"lL"[waslocked],
-		"dD"[!!rdp->nocb_defer_wakeup],
-		"sS"[!!rdp->nocb_gp_sleep],
-		".W"[wassleep]);
-}
-
-#else /* #ifdef CONFIG_RCU_NOCB_CPU */
-
-/* No ->nocb_lock to acquire.  */
-static void rcu_nocb_lock(struct rcu_data *rdp)
-{
-}
-
-/* No ->nocb_lock to release.  */
-static void rcu_nocb_unlock(struct rcu_data *rdp)
-{
-}
-
-/* No ->nocb_lock to release.  */
-static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
-				       unsigned long flags)
-{
-	local_irq_restore(flags);
-}
-
-/* Lockdep check that ->cblist may be safely accessed. */
-static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
-{
-	lockdep_assert_irqs_disabled();
-}
-
-static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
-{
-}
-
-static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
-{
-	return NULL;
-}
-
-static void rcu_init_one_nocb(struct rcu_node *rnp)
-{
-}
-
-static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j)
-{
-	return true;
-}
-
-static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags)
-{
-	return false;
-}
-
-static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
-				 unsigned long flags)
-{
-	WARN_ON_ONCE(1);  /* Should be dead code! */
-}
-
-static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
-{
-}
-
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
-{
-	return false;
-}
-
-static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
-{
-	return false;
-}
-
-static void rcu_spawn_cpu_nocb_kthread(int cpu)
-{
-}
-
-static void __init rcu_spawn_nocb_kthreads(void)
-{
-}
-
-static void show_rcu_nocb_state(struct rcu_data *rdp)
-{
-}
-
-#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
-
 /*
  * Is this CPU a NO_HZ_FULL CPU that should ignore RCU so that the
  * grace-period kthread will do force_quiescent_state() processing?
@@ -2982,17 +1498,17 @@
 /* Turn on heavyweight RCU tasks trace readers on idle/user entry. */
 static void rcu_dynticks_task_trace_enter(void)
 {
-#ifdef CONFIG_TASKS_RCU_TRACE
+#ifdef CONFIG_TASKS_TRACE_RCU
 	if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB))
 		current->trc_reader_special.b.need_mb = true;
-#endif /* #ifdef CONFIG_TASKS_RCU_TRACE */
+#endif /* #ifdef CONFIG_TASKS_TRACE_RCU */
 }
 
 /* Turn off heavyweight RCU tasks trace readers on idle/user exit. */
 static void rcu_dynticks_task_trace_exit(void)
 {
-#ifdef CONFIG_TASKS_RCU_TRACE
+#ifdef CONFIG_TASKS_TRACE_RCU
 	if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB))
 		current->trc_reader_special.b.need_mb = false;
-#endif /* #ifdef CONFIG_TASKS_RCU_TRACE */
+#endif /* #ifdef CONFIG_TASKS_TRACE_RCU */
 }
diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h
index 6c76988c..677ee3d 100644
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -7,6 +7,8 @@
  * Author: Paul E. McKenney <paulmck@linux.ibm.com>
  */
 
+#include <linux/kvm_para.h>
+
 //////////////////////////////////////////////////////////////////////////////
 //
 // Controlling CPU stall warnings, including delay calculation.
@@ -117,17 +119,14 @@
 }
 
 /**
- * rcu_cpu_stall_reset - prevent further stall warnings in current grace period
- *
- * Set the stall-warning timeout way off into the future, thus preventing
- * any RCU CPU stall-warning messages from appearing in the current set of
- * RCU grace periods.
+ * rcu_cpu_stall_reset - restart stall-warning timeout for current grace period
  *
  * The caller must disable hard irqs.
  */
 void rcu_cpu_stall_reset(void)
 {
-	WRITE_ONCE(rcu_state.jiffies_stall, jiffies + ULONG_MAX / 2);
+	WRITE_ONCE(rcu_state.jiffies_stall,
+		   jiffies + rcu_jiffies_till_stall_check());
 }
 
 //////////////////////////////////////////////////////////////////////////////
@@ -267,8 +266,10 @@
 	struct task_struct *ts[8];
 
 	lockdep_assert_irqs_disabled();
-	if (!rcu_preempt_blocked_readers_cgp(rnp))
+	if (!rcu_preempt_blocked_readers_cgp(rnp)) {
+		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 		return 0;
+	}
 	pr_err("\tTasks blocked on level-%d rcu_node (CPUs %d-%d):",
 	       rnp->level, rnp->grplo, rnp->grphi);
 	t = list_entry(rnp->gp_tasks->prev,
@@ -280,8 +281,8 @@
 			break;
 	}
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-	for (i--; i; i--) {
-		t = ts[i];
+	while (i) {
+		t = ts[--i];
 		if (!try_invoke_on_locked_down_task(t, check_slow_task, &rscr))
 			pr_cont(" P%d", t->pid);
 		else
@@ -350,7 +351,7 @@
 
 static void print_cpu_stall_fast_no_hz(char *cp, int cpu)
 {
-	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 
 	sprintf(cp, "last_accelerate: %04lx/%04lx dyntick_enabled: %d",
 		rdp->last_accelerate & 0xffff, jiffies & 0xffff,
@@ -464,9 +465,10 @@
 		pr_err("%s kthread starved for %ld jiffies! g%ld f%#x %s(%d) ->state=%#x ->cpu=%d\n",
 		       rcu_state.name, j,
 		       (long)rcu_seq_current(&rcu_state.gp_seq),
-		       data_race(rcu_state.gp_flags),
-		       gp_state_getname(rcu_state.gp_state), rcu_state.gp_state,
-		       gpk ? gpk->__state : ~0, cpu);
+		       data_race(READ_ONCE(rcu_state.gp_flags)),
+		       gp_state_getname(rcu_state.gp_state),
+		       data_race(READ_ONCE(rcu_state.gp_state)),
+		       gpk ? data_race(READ_ONCE(gpk->__state)) : ~0, cpu);
 		if (gpk) {
 			pr_err("\tUnless %s kthread gets sufficient CPU time, OOM is now expected behavior.\n", rcu_state.name);
 			pr_err("RCU grace-period kthread stack dump:\n");
@@ -509,7 +511,7 @@
 		       (long)rcu_seq_current(&rcu_state.gp_seq),
 		       data_race(rcu_state.gp_flags),
 		       gp_state_getname(RCU_GP_WAIT_FQS), RCU_GP_WAIT_FQS,
-		       gpk->__state);
+		       data_race(READ_ONCE(gpk->__state)));
 		pr_err("\tPossible timer handling issue on cpu=%d timer-softirq=%u\n",
 		       cpu, kstat_softirqs_cpu(TIMER_SOFTIRQ, cpu));
 	}
@@ -568,11 +570,11 @@
 			pr_err("INFO: Stall ended before state dump start\n");
 		} else {
 			j = jiffies;
-			gpa = data_race(rcu_state.gp_activity);
+			gpa = data_race(READ_ONCE(rcu_state.gp_activity));
 			pr_err("All QSes seen, last %s kthread activity %ld (%ld-%ld), jiffies_till_next_fqs=%ld, root ->qsmask %#lx\n",
 			       rcu_state.name, j - gpa, j, gpa,
-			       data_race(jiffies_till_next_fqs),
-			       rcu_get_root()->qsmask);
+			       data_race(READ_ONCE(jiffies_till_next_fqs)),
+			       data_race(READ_ONCE(rcu_get_root()->qsmask)));
 		}
 	}
 	/* Rewrite if needed in case of slow consoles. */
@@ -646,6 +648,7 @@
 
 static void check_cpu_stall(struct rcu_data *rdp)
 {
+	bool didstall = false;
 	unsigned long gs1;
 	unsigned long gs2;
 	unsigned long gps;
@@ -691,24 +694,46 @@
 	    ULONG_CMP_GE(gps, js))
 		return; /* No stall or GP completed since entering function. */
 	rnp = rdp->mynode;
-	jn = jiffies + 3 * rcu_jiffies_till_stall_check() + 3;
+	jn = jiffies + ULONG_MAX / 2;
 	if (rcu_gp_in_progress() &&
 	    (READ_ONCE(rnp->qsmask) & rdp->grpmask) &&
 	    cmpxchg(&rcu_state.jiffies_stall, js, jn) == js) {
 
+		/*
+		 * If a virtual machine is stopped by the host it can look to
+		 * the watchdog like an RCU stall. Check to see if the host
+		 * stopped the vm.
+		 */
+		if (kvm_check_and_clear_guest_paused())
+			return;
+
 		/* We haven't checked in, so go dump stack. */
 		print_cpu_stall(gps);
 		if (READ_ONCE(rcu_cpu_stall_ftrace_dump))
 			rcu_ftrace_dump(DUMP_ALL);
+		didstall = true;
 
 	} else if (rcu_gp_in_progress() &&
 		   ULONG_CMP_GE(j, js + RCU_STALL_RAT_DELAY) &&
 		   cmpxchg(&rcu_state.jiffies_stall, js, jn) == js) {
 
+		/*
+		 * If a virtual machine is stopped by the host it can look to
+		 * the watchdog like an RCU stall. Check to see if the host
+		 * stopped the vm.
+		 */
+		if (kvm_check_and_clear_guest_paused())
+			return;
+
 		/* They had a few time units to dump stack, so complain. */
 		print_other_cpu_stall(gs2, gps);
 		if (READ_ONCE(rcu_cpu_stall_ftrace_dump))
 			rcu_ftrace_dump(DUMP_ALL);
+		didstall = true;
+	}
+	if (didstall && READ_ONCE(rcu_state.jiffies_stall) == jn) {
+		jn = jiffies + 3 * rcu_jiffies_till_stall_check() + 3;
+		WRITE_ONCE(rcu_state.jiffies_stall, jn);
 	}
 }
 
@@ -742,7 +767,7 @@
 
 	rcu_for_each_leaf_node(rnp) {
 		if (!cpup) {
-			if (READ_ONCE(rnp->qsmask)) {
+			if (data_race(READ_ONCE(rnp->qsmask))) {
 				return false;
 			} else {
 				if (READ_ONCE(rnp->gp_tasks))
@@ -791,32 +816,34 @@
 	struct task_struct *t = READ_ONCE(rcu_state.gp_kthread);
 
 	j = jiffies;
-	ja = j - data_race(rcu_state.gp_activity);
-	jr = j - data_race(rcu_state.gp_req_activity);
-	js = j - data_race(rcu_state.gp_start);
-	jw = j - data_race(rcu_state.gp_wake_time);
+	ja = j - data_race(READ_ONCE(rcu_state.gp_activity));
+	jr = j - data_race(READ_ONCE(rcu_state.gp_req_activity));
+	js = j - data_race(READ_ONCE(rcu_state.gp_start));
+	jw = j - data_race(READ_ONCE(rcu_state.gp_wake_time));
 	pr_info("%s: wait state: %s(%d) ->state: %#x ->rt_priority %u delta ->gp_start %lu ->gp_activity %lu ->gp_req_activity %lu ->gp_wake_time %lu ->gp_wake_seq %ld ->gp_seq %ld ->gp_seq_needed %ld ->gp_max %lu ->gp_flags %#x\n",
 		rcu_state.name, gp_state_getname(rcu_state.gp_state),
-		rcu_state.gp_state, t ? t->__state : 0x1ffff, t ? t->rt_priority : 0xffU,
-		js, ja, jr, jw, (long)data_race(rcu_state.gp_wake_seq),
-		(long)data_race(rcu_state.gp_seq),
-		(long)data_race(rcu_get_root()->gp_seq_needed),
-		data_race(rcu_state.gp_max),
-		data_race(rcu_state.gp_flags));
+		data_race(READ_ONCE(rcu_state.gp_state)),
+		t ? data_race(READ_ONCE(t->__state)) : 0x1ffff, t ? t->rt_priority : 0xffU,
+		js, ja, jr, jw, (long)data_race(READ_ONCE(rcu_state.gp_wake_seq)),
+		(long)data_race(READ_ONCE(rcu_state.gp_seq)),
+		(long)data_race(READ_ONCE(rcu_get_root()->gp_seq_needed)),
+		data_race(READ_ONCE(rcu_state.gp_max)),
+		data_race(READ_ONCE(rcu_state.gp_flags)));
 	rcu_for_each_node_breadth_first(rnp) {
 		if (ULONG_CMP_GE(READ_ONCE(rcu_state.gp_seq), READ_ONCE(rnp->gp_seq_needed)) &&
-		    !data_race(rnp->qsmask) && !data_race(rnp->boost_tasks) &&
-		    !data_race(rnp->exp_tasks) && !data_race(rnp->gp_tasks))
+		    !data_race(READ_ONCE(rnp->qsmask)) && !data_race(READ_ONCE(rnp->boost_tasks)) &&
+		    !data_race(READ_ONCE(rnp->exp_tasks)) && !data_race(READ_ONCE(rnp->gp_tasks)))
 			continue;
 		pr_info("\trcu_node %d:%d ->gp_seq %ld ->gp_seq_needed %ld ->qsmask %#lx %c%c%c%c ->n_boosts %ld\n",
 			rnp->grplo, rnp->grphi,
-			(long)data_race(rnp->gp_seq), (long)data_race(rnp->gp_seq_needed),
-			data_race(rnp->qsmask),
-			".b"[!!data_race(rnp->boost_kthread_task)],
-			".B"[!!data_race(rnp->boost_tasks)],
-			".E"[!!data_race(rnp->exp_tasks)],
-			".G"[!!data_race(rnp->gp_tasks)],
-			data_race(rnp->n_boosts));
+			(long)data_race(READ_ONCE(rnp->gp_seq)),
+			(long)data_race(READ_ONCE(rnp->gp_seq_needed)),
+			data_race(READ_ONCE(rnp->qsmask)),
+			".b"[!!data_race(READ_ONCE(rnp->boost_kthread_task))],
+			".B"[!!data_race(READ_ONCE(rnp->boost_tasks))],
+			".E"[!!data_race(READ_ONCE(rnp->exp_tasks))],
+			".G"[!!data_race(READ_ONCE(rnp->gp_tasks))],
+			data_race(READ_ONCE(rnp->n_boosts)));
 		if (!rcu_is_leaf_node(rnp))
 			continue;
 		for_each_leaf_node_possible_cpu(rnp, cpu) {
@@ -826,12 +853,12 @@
 					 READ_ONCE(rdp->gp_seq_needed)))
 				continue;
 			pr_info("\tcpu %d ->gp_seq_needed %ld\n",
-				cpu, (long)data_race(rdp->gp_seq_needed));
+				cpu, (long)data_race(READ_ONCE(rdp->gp_seq_needed)));
 		}
 	}
 	for_each_possible_cpu(cpu) {
 		rdp = per_cpu_ptr(&rcu_data, cpu);
-		cbs += data_race(rdp->n_cbs_invoked);
+		cbs += data_race(READ_ONCE(rdp->n_cbs_invoked));
 		if (rcu_segcblist_is_offloaded(&rdp->cblist))
 			show_rcu_nocb_state(rdp);
 	}
@@ -913,11 +940,11 @@
 
 	if (rcu_gp_in_progress()) {
 		pr_info("%s: GP age %lu jiffies\n",
-			__func__, jiffies - rcu_state.gp_start);
+			__func__, jiffies - data_race(READ_ONCE(rcu_state.gp_start)));
 		show_rcu_gp_kthreads();
 	} else {
 		pr_info("%s: Last GP end %lu jiffies ago\n",
-			__func__, jiffies - rcu_state.gp_end);
+			__func__, jiffies - data_race(READ_ONCE(rcu_state.gp_end)));
 		preempt_disable();
 		rdp = this_cpu_ptr(&rcu_data);
 		rcu_check_gp_start_stall(rdp->mynode, rdp, j);
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 2d9ff40f4..6a03c3f 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -7781,6 +7781,17 @@
 		preempt_schedule_common();
 		return 1;
 	}
+	/*
+	 * In preemptible kernels, ->rcu_read_lock_nesting tells the tick
+	 * whether the current CPU is in an RCU read-side critical section,
+	 * so the tick can report quiescent states even for CPUs looping
+	 * in kernel context.  In contrast, in non-preemptible kernels,
+	 * RCU readers leave no in-memory hints, which means that CPU-bound
+	 * processes executing in kernel context might never report an
+	 * RCU quiescent state.  Therefore, the following code causes
+	 * cond_resched() to report a quiescent state, but only when RCU
+	 * is in urgent need of one.
+	 */
 #ifndef CONFIG_PREEMPT_RCU
 	rcu_all_qs();
 #endif
diff --git a/tools/include/nolibc/nolibc.h b/tools/include/nolibc/nolibc.h
index 8b7a983..3430667 100644
--- a/tools/include/nolibc/nolibc.h
+++ b/tools/include/nolibc/nolibc.h
@@ -1031,7 +1031,7 @@
  *     scall32-o32.S in the kernel sources.
  *   - the system call is performed by calling "syscall"
  *   - syscall return comes in v0, and register a3 needs to be checked to know
- *     if an error occured, in which case errno is in v0.
+ *     if an error occurred, in which case errno is in v0.
  *   - the arguments are cast to long and assigned into the target registers
  *     which are then simply passed as registers to the asm code, so that we
  *     don't have to experience issues with register constraints.
@@ -2244,6 +2244,19 @@
 }
 
 static __attribute__((unused))
+int msleep(unsigned int msecs)
+{
+	struct timeval my_timeval = { msecs / 1000, (msecs % 1000) * 1000 };
+
+	if (sys_select(0, 0, 0, 0, &my_timeval) < 0)
+		return (my_timeval.tv_sec * 1000) +
+			(my_timeval.tv_usec / 1000) +
+			!!(my_timeval.tv_usec % 1000);
+	else
+		return 0;
+}
+
+static __attribute__((unused))
 int stat(const char *path, struct stat *buf)
 {
 	int ret = sys_stat(path, buf);
diff --git a/tools/testing/selftests/rcutorture/bin/jitter.sh b/tools/testing/selftests/rcutorture/bin/jitter.sh
index 15d937b..fd1ffaa 100755
--- a/tools/testing/selftests/rcutorture/bin/jitter.sh
+++ b/tools/testing/selftests/rcutorture/bin/jitter.sh
@@ -68,16 +68,12 @@
 	cpumask=`awk -v cpus="$cpus" -v me=$me -v n=$n 'BEGIN {
 		srand(n + me + systime());
 		ncpus = split(cpus, ca);
-		curcpu = ca[int(rand() * ncpus + 1)];
-		z = "";
-		for (i = 1; 4 * i <= curcpu; i++)
-			z = z "0";
-		print "0x" 2 ^ (curcpu % 4) z;
+		print ca[int(rand() * ncpus + 1)];
 	}' < /dev/null`
 	n=$(($n+1))
-	if ! taskset -p $cpumask $$ > /dev/null 2>&1
+	if ! taskset -c -p $cpumask $$ > /dev/null 2>&1
 	then
-		echo taskset failure: '"taskset -p ' $cpumask $$ '"'
+		echo taskset failure: '"taskset -c -p ' $cpumask $$ '"'
 		exit 1
 	fi
 
diff --git a/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh b/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
index e5cc6b2..1af5d6b 100755
--- a/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
+++ b/tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
@@ -14,7 +14,7 @@
 then
 	exit 0
 fi
-cat $1/*/console.log |
+find $1 -name console.log -exec cat {} \; |
 	grep "BUG: KCSAN: " |
 	sed -e 's/^\[[^]]*] //' |
 	sort |
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-again.sh b/tools/testing/selftests/rcutorture/bin/kvm-again.sh
index d8c8483..5a0023d 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-again.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-again.sh
@@ -142,7 +142,7 @@
 	echo "Cannot copy from $oldrun to $rundir."
 	usage
 fi
-rm -f "$rundir"/*/{console.log,console.log.diags,qemu_pid,qemu-retval,Warnings,kvm-test-1-run.sh.out,kvm-test-1-run-qemu.sh.out,vmlinux} "$rundir"/log
+rm -f "$rundir"/*/{console.log,console.log.diags,qemu_pid,qemu-pid,qemu-retval,Warnings,kvm-test-1-run.sh.out,kvm-test-1-run-qemu.sh.out,vmlinux} "$rundir"/log
 touch "$rundir/log"
 echo $scriptname $args | tee -a "$rundir/log"
 echo $oldrun > "$rundir/re-run"
@@ -179,6 +179,6 @@
 then
 	echo ---- Dryrun complete, directory: $rundir | tee -a "$rundir/log"
 else
-	( cd "$rundir"; sh $T/runbatches.sh )
+	( cd "$rundir"; sh $T/runbatches.sh ) | tee -a "$rundir/log"
 	kvm-end-run-stats.sh "$rundir" "$starttime"
 fi
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh b/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh
new file mode 100755
index 0000000..f99b2c1
--- /dev/null
+++ b/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh
@@ -0,0 +1,106 @@
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Produce awk statements roughly depicting the system's CPU and cache
+# layout.  If the required information is not available, produce
+# error messages as awk comments.  Successful exit regardless.
+#
+# Usage: kvm-assign-cpus.sh /path/to/sysfs
+
+T=/tmp/kvm-assign-cpus.sh.$$
+trap 'rm -rf $T' 0 2
+mkdir $T
+
+sysfsdir=${1-/sys/devices/system/node}
+if ! cd "$sysfsdir" > $T/msg 2>&1
+then
+	sed -e 's/^/# /' < $T/msg
+	exit 0
+fi
+nodelist="`ls -d node*`"
+for i in node*
+do
+	if ! test -d $i/
+	then
+		echo "# Not a directory: $sysfsdir/node*"
+		exit 0
+	fi
+	for j in $i/cpu*/cache/index*
+	do
+		if ! test -d $j/
+		then
+			echo "# Not a directory: $sysfsdir/$j"
+			exit 0
+		else
+			break
+		fi
+	done
+	indexlist="`ls -d $i/cpu* | grep 'cpu[0-9][0-9]*' | head -1 | sed -e 's,^.*$,ls -d &/cache/index*,' | sh | sed -e 's,^.*/,,'`"
+	break
+done
+for i in node*/cpu*/cache/index*/shared_cpu_list
+do
+	if ! test -f $i
+	then
+		echo "# Not a file: $sysfsdir/$i"
+		exit 0
+	else
+		break
+	fi
+done
+firstshared=
+for i in $indexlist
+do
+	rm -f $T/cpulist
+	for n in node*
+	do
+		f="$n/cpu*/cache/$i/shared_cpu_list"
+		if ! cat $f > $T/msg 2>&1
+		then
+			sed -e 's/^/# /' < $T/msg
+			exit 0
+		fi
+		cat $f >> $T/cpulist
+	done
+	if grep -q '[-,]' $T/cpulist
+	then
+		if test -z "$firstshared"
+		then
+			firstshared="$i"
+		fi
+	fi
+done
+if test -z "$firstshared"
+then
+	splitindex="`echo $indexlist | sed -e 's/ .*$//'`"
+else
+	splitindex="$firstshared"
+fi
+nodenum=0
+for n in node*
+do
+	cat $n/cpu*/cache/$splitindex/shared_cpu_list | sort -u -k1n |
+	awk -v nodenum="$nodenum" '
+	BEGIN {
+		idx = 0;
+	}
+
+	{
+		nlists = split($0, cpulists, ",");
+		for (i = 1; i <= nlists; i++) {
+			listsize = split(cpulists[i], cpus, "-");
+			if (listsize == 1)
+				cpus[2] = cpus[1];
+			for (j = cpus[1]; j <= cpus[2]; j++) {
+				print "cpu[" nodenum "][" idx "] = " j ";";
+				idx++;
+			}
+		}
+	}
+
+	END {
+		print "nodecpus[" nodenum "] = " idx ";";
+	}'
+	nodenum=`expr $nodenum + 1`
+done
+echo "numnodes = $nodenum;"
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh b/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh
new file mode 100755
index 0000000..20c7c53
--- /dev/null
+++ b/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh
@@ -0,0 +1,88 @@
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Create an awk script that takes as input numbers of CPUs and outputs
+# lists of CPUs, one per line in both cases.
+#
+# Usage: kvm-get-cpus-script.sh /path/to/cpu/arrays /path/to/put/script [ /path/to/state ]
+#
+# The CPU arrays are output by kvm-assign-cpus.sh, and are valid awk
+# statements initializing the variables describing the system's topology.
+#
+# The optional state is input by this script (if the file exists and is
+# non-empty), and can also be output by this script.
+
+cpuarrays="${1-/sys/devices/system/node}"
+scriptfile="${2}"
+statefile="${3}"
+
+if ! test -f "$cpuarrays"
+then
+	echo "File not found: $cpuarrays" 1>&2
+	exit 1
+fi
+scriptdir="`dirname "$scriptfile"`"
+if ! test -d "$scriptdir" || ! test -x "$scriptdir" || ! test -w "$scriptdir"
+then
+	echo "Directory not usable for script output: $scriptdir"
+	exit 1
+fi
+
+cat << '___EOF___' > "$scriptfile"
+BEGIN {
+___EOF___
+cat "$cpuarrays" >> "$scriptfile"
+if test -r "$statefile"
+then
+	cat "$statefile" >> "$scriptfile"
+fi
+cat << '___EOF___' >> "$scriptfile"
+}
+
+# Do we have the system architecture to guide CPU affinity?
+function gotcpus()
+{
+	return numnodes != "";
+}
+
+# Return a comma-separated list of the next n CPUs.
+function nextcpus(n,  i, s)
+{
+	for (i = 0; i < n; i++) {
+		if (nodecpus[curnode] == "")
+			curnode = 0;
+		if (cpu[curnode][curcpu[curnode]] == "")
+			curcpu[curnode] = 0;
+		if (s != "")
+			s = s ",";
+		s = s cpu[curnode][curcpu[curnode]];
+		curcpu[curnode]++;
+		curnode++
+	}
+	return s;
+}
+
+# Dump out the current node/CPU state so that a later invocation of this
+# script can continue where this one left off.  Of course, this only works
+# when a state file was specified and where there was valid sysfs state.
+# Returns 1 if the state was dumped, 0 otherwise.
+#
+# Dumping the state for one system configuration and loading it into
+# another isn't likely to do what you want, whatever that might be.
+function dumpcpustate(  i, fn)
+{
+___EOF___
+echo '	fn = "'"$statefile"'";' >> $scriptfile
+cat << '___EOF___' >> "$scriptfile"
+	if (fn != "" && gotcpus()) {
+		print "curnode = " curnode ";" > fn;
+		for (i = 0; i < numnodes; i++)
+			if (curcpu[i] != "")
+				print "curcpu[" i "] = " curcpu[i] ";" >> fn;
+		return 1;
+	}
+	if (fn != "")
+		print "# No CPU state to dump." > fn;
+	return 0;
+}
+___EOF___
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh b/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
index f3a7a5e..db2c0e2 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
@@ -25,7 +25,7 @@
 	echo "$configfile -------"
 else
 	title="$configfile ------- $ncs acquisitions/releases"
-	dur=`sed -e 's/^.* locktorture.shutdown_secs=//' -e 's/ .*$//' < $i/qemu-cmd 2> /dev/null`
+	dur=`grep -v '^#' $i/qemu-cmd | sed -e 's/^.* locktorture.shutdown_secs=//' -e 's/ .*$//' 2> /dev/null`
 	if test -z "$dur"
 	then
 		:
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh b/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
index 671bfee..3afa5c6 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
@@ -25,7 +25,7 @@
 then
 	echo "$configfile ------- "
 else
-	dur="`sed -e 's/^.* scftorture.shutdown_secs=//' -e 's/ .*$//' < $i/qemu-cmd 2> /dev/null`"
+	dur="`grep -v '^#' $i/qemu-cmd | sed -e 's/^.* scftorture.shutdown_secs=//' -e 's/ .*$//' 2> /dev/null`"
 	if test -z "$dur"
 	then
 		rate=""
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh b/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
index e01b31b..0a54199 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
@@ -74,7 +74,10 @@
 	done
 	if test -f "$rd/kcsan.sum"
 	then
-		if grep -q CONFIG_KCSAN=y $T
+		if ! test -f $T
+		then
+			:
+		elif grep -q CONFIG_KCSAN=y $T
 		then
 			echo "Compiler or architecture does not support KCSAN!"
 			echo Did you forget to switch your compiler with '--kmake-arg CC=<cc-that-supports-kcsan>'?
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh b/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh
new file mode 100755
index 0000000..014ce68
--- /dev/null
+++ b/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Periodically scan a directory tree to prevent files from being reaped
+# by systemd and friends on long runs.
+#
+# Usage: kvm-remote-noreap.sh pathname
+#
+# Copyright (C) 2021 Facebook, Inc.
+#
+# Authors: Paul E. McKenney <paulmck@kernel.org>
+
+pathname="$1"
+if test "$pathname" = ""
+then
+	echo Usage: kvm-remote-noreap.sh pathname
+	exit 1
+fi
+if ! test -d "$pathname"
+then
+	echo  Usage: kvm-remote-noreap.sh pathname
+	echo "       pathname must be a directory."
+	exit 2
+fi
+
+while test -d "$pathname"
+do
+	find "$pathname" -type f -exec touch -c {} \; > /dev/null 2>&1
+	sleep 30
+done
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-remote.sh b/tools/testing/selftests/rcutorture/bin/kvm-remote.sh
index 79e680e..03126eb 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-remote.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-remote.sh
@@ -124,10 +124,12 @@
 	n = $1;
 	sub(/\./, "", n);
 	fn = dest "/kvm-remote-" n ".sh"
+	print "kvm-remote-noreap.sh " rundir " &" > fn;
 	scenarios = "";
 	for (i = 2; i <= NF; i++)
 		scenarios = scenarios " " $i;
-	print "kvm-test-1-run-batch.sh" scenarios > fn;
+	print "kvm-test-1-run-batch.sh" scenarios >> fn;
+	print "sync" >> fn;
 	print "rm " rundir "/remote.run" >> fn;
 }'
 chmod +x $T/bin/kvm-remote-*.sh
@@ -172,11 +174,20 @@
 	do
 		ssh $1 "test -f \"$2\""
 		ret=$?
-		if test "$ret" -ne 255
+		if test "$ret" -eq 255
 		then
+			echo " ---" ssh failure to $1 checking for file $2, retry after $sleeptime seconds. `date`
+		elif test "$ret" -eq 0
+		then
+			return 0
+		elif test "$ret" -eq 1
+		then
+			echo " ---" File \"$2\" not found: ssh $1 test -f \"$2\"
+			return 1
+		else
+			echo " ---" Exit code $ret: ssh $1 test -f \"$2\", retry after $sleeptime seconds. `date`
 			return $ret
 		fi
-		echo " ---" ssh failure to $1 checking for file $2, retry after $sleeptime seconds. `date`
 		sleep $sleeptime
 	done
 }
@@ -242,7 +253,8 @@
 	do
 		sleep 30
 	done
-	( cd "$oldrun"; ssh $i "cd $rundir; tar -czf - kvm-remote-*.sh.out */console.log */kvm-test-1-run*.sh.out */qemu_pid */qemu-retval; rm -rf $T > /dev/null 2>&1" | tar -xzf - )
+	echo " ---" Collecting results from $i `date`
+	( cd "$oldrun"; ssh $i "cd $rundir; tar -czf - kvm-remote-*.sh.out */console.log */kvm-test-1-run*.sh.out */qemu[_-]pid */qemu-retval */qemu-affinity; rm -rf $T > /dev/null 2>&1" | tar -xzf - )
 done
 
 ( kvm-end-run-stats.sh "$oldrun" "$starttime"; echo $? > $T/exitcode ) | tee -a "$oldrun/remote-log"
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
index 7ea0809e..1e29d65 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
@@ -50,10 +50,34 @@
 echo ---- System running test: `uname -a`
 echo ---- Starting kernels. `date` | tee -a log
 $TORTURE_JITTER_START
+kvm-assign-cpus.sh /sys/devices/system/node > $T/cpuarray.awk
 for i in "$@"
 do
 	echo ---- System running test: `uname -a` > $i/kvm-test-1-run-qemu.sh.out
 	echo > $i/kvm-test-1-run-qemu.sh.out
+	export TORTURE_AFFINITY=
+	kvm-get-cpus-script.sh $T/cpuarray.awk $T/cpubatches.awk $T/cpustate
+	cat << '	___EOF___' >> $T/cpubatches.awk
+	END {
+		affinitylist = "";
+		if (!gotcpus()) {
+			print "echo No CPU-affinity information, so no taskset command.";
+		} else if (cpu_count !~ /^[0-9][0-9]*$/) {
+			print "echo " scenario ": Bogus number of CPUs (old qemu-cmd?), so no taskset command.";
+		} else {
+			affinitylist = nextcpus(cpu_count);
+			if (!(affinitylist ~ /^[0-9,-][0-9,-]*$/))
+				print "echo " scenario ": Bogus CPU-affinity information, so no taskset command.";
+			else if (!dumpcpustate())
+				print "echo " scenario ": Could not dump state, so no taskset command.";
+			else
+				print "export TORTURE_AFFINITY=" affinitylist;
+		}
+	}
+	___EOF___
+	cpu_count="`grep '# TORTURE_CPU_COUNT=' $i/qemu-cmd | sed -e 's/^.*=//'`"
+	affinity_export="`awk -f $T/cpubatches.awk -v cpu_count="$cpu_count" -v scenario=$i < /dev/null`"
+	$affinity_export
 	kvm-test-1-run-qemu.sh $i >> $i/kvm-test-1-run-qemu.sh.out 2>&1 &
 done
 for i in $runfiles
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
index 5b1aa2a..4428058 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
@@ -39,27 +39,34 @@
 grep '^#' $resdir/qemu-cmd | sed -e 's/^# //' > $T/qemu-cmd-settings
 . $T/qemu-cmd-settings
 
-# Decorate qemu-cmd with redirection, backgrounding, and PID capture
-sed -e 's/$/ 2>\&1 \&/' < $resdir/qemu-cmd > $T/qemu-cmd
-echo 'echo $! > $resdir/qemu_pid' >> $T/qemu-cmd
+# Decorate qemu-cmd with affinity, redirection, backgrounding, and PID capture
+taskset_command=
+if test -n "$TORTURE_AFFINITY"
+then
+	taskset_command="taskset -c $TORTURE_AFFINITY "
+fi
+sed -e 's/^[^#].*$/'"$taskset_command"'& 2>\&1 \&/' < $resdir/qemu-cmd > $T/qemu-cmd
+echo 'qemu_pid=$!' >> $T/qemu-cmd
+echo 'echo $qemu_pid > $resdir/qemu-pid' >> $T/qemu-cmd
+echo 'taskset -c -p $qemu_pid > $resdir/qemu-affinity' >> $T/qemu-cmd
 
 # In case qemu refuses to run...
 echo "NOTE: $QEMU either did not run or was interactive" > $resdir/console.log
 
 # Attempt to run qemu
 kstarttime=`gawk 'BEGIN { print systime() }' < /dev/null`
-( . $T/qemu-cmd; wait `cat  $resdir/qemu_pid`; echo $? > $resdir/qemu-retval ) &
+( . $T/qemu-cmd; wait `cat  $resdir/qemu-pid`; echo $? > $resdir/qemu-retval ) &
 commandcompleted=0
 if test -z "$TORTURE_KCONFIG_GDB_ARG"
 then
 	sleep 10 # Give qemu's pid a chance to reach the file
-	if test -s "$resdir/qemu_pid"
+	if test -s "$resdir/qemu-pid"
 	then
-		qemu_pid=`cat "$resdir/qemu_pid"`
-		echo Monitoring qemu job at pid $qemu_pid
+		qemu_pid=`cat "$resdir/qemu-pid"`
+		echo Monitoring qemu job at pid $qemu_pid `date`
 	else
 		qemu_pid=""
-		echo Monitoring qemu job at yet-as-unknown pid
+		echo Monitoring qemu job at yet-as-unknown pid `date`
 	fi
 fi
 if test -n "$TORTURE_KCONFIG_GDB_ARG"
@@ -82,9 +89,9 @@
 fi
 while :
 do
-	if test -z "$qemu_pid" -a -s "$resdir/qemu_pid"
+	if test -z "$qemu_pid" && test -s "$resdir/qemu-pid"
 	then
-		qemu_pid=`cat "$resdir/qemu_pid"`
+		qemu_pid=`cat "$resdir/qemu-pid"`
 	fi
 	kruntime=`gawk 'BEGIN { print systime() - '"$kstarttime"' }' < /dev/null`
 	if test -z "$qemu_pid" || kill -0 "$qemu_pid" > /dev/null 2>&1
@@ -115,22 +122,22 @@
 		break
 	fi
 done
-if test -z "$qemu_pid" -a -s "$resdir/qemu_pid"
+if test -z "$qemu_pid" && test -s "$resdir/qemu-pid"
 then
-	qemu_pid=`cat "$resdir/qemu_pid"`
+	qemu_pid=`cat "$resdir/qemu-pid"`
 fi
-if test $commandcompleted -eq 0 -a -n "$qemu_pid"
+if test $commandcompleted -eq 0 && test -n "$qemu_pid"
 then
 	if ! test -f "$resdir/../STOP.1"
 	then
-		echo Grace period for qemu job at pid $qemu_pid
+		echo Grace period for qemu job at pid $qemu_pid `date`
 	fi
 	oldline="`tail $resdir/console.log`"
 	while :
 	do
 		if test -f "$resdir/../STOP.1"
 		then
-			echo "PID $qemu_pid killed due to run STOP.1 request" >> $resdir/Warnings 2>&1
+			echo "PID $qemu_pid killed due to run STOP.1 request `date`" >> $resdir/Warnings 2>&1
 			kill -KILL $qemu_pid
 			break
 		fi
@@ -152,13 +159,17 @@
 		then
 			last_ts=0
 		fi
-		if test "$newline" != "$oldline" -a "$last_ts" -lt $((seconds + $TORTURE_SHUTDOWN_GRACE))
+		if test "$newline" != "$oldline" && test "$last_ts" -lt $((seconds + $TORTURE_SHUTDOWN_GRACE)) && test "$last_ts" -gt "$TORTURE_SHUTDOWN_GRACE"
 		then
 			must_continue=yes
+			if test $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
+			then
+				echo Continuing at console.log time $last_ts \"`tail -n 1 $resdir/console.log`\" `date`
+			fi
 		fi
-		if test $must_continue = no -a $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
+		if test $must_continue = no && test $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
 		then
-			echo "!!! PID $qemu_pid hung at $kruntime vs. $seconds seconds" >> $resdir/Warnings 2>&1
+			echo "!!! PID $qemu_pid hung at $kruntime vs. $seconds seconds `date`" >> $resdir/Warnings 2>&1
 			kill -KILL $qemu_pid
 			break
 		fi
@@ -172,5 +183,3 @@
 
 # Tell the script that this run is done.
 rm -f $resdir/build.run
-
-parse-console.sh $resdir/console.log $title
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
index 420ed5c..f4c8055 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
@@ -205,6 +205,7 @@
 echo "# TORTURE_JITTER_START=\"$TORTURE_JITTER_START\"" >> $resdir/qemu-cmd
 echo "# TORTURE_JITTER_STOP=\"$TORTURE_JITTER_STOP\"" >> $resdir/qemu-cmd
 echo "# TORTURE_TRUST_MAKE=\"$TORTURE_TRUST_MAKE\"; export TORTURE_TRUST_MAKE" >> $resdir/qemu-cmd
+echo "# TORTURE_CPU_COUNT=$cpu_count" >> $resdir/qemu-cmd
 
 if test -n "$TORTURE_BUILDONLY"
 then
@@ -214,3 +215,4 @@
 fi
 
 kvm-test-1-run-qemu.sh $resdir
+parse-console.sh $resdir/console.log $title
diff --git a/tools/testing/selftests/rcutorture/bin/kvm.sh b/tools/testing/selftests/rcutorture/bin/kvm.sh
index b4ac4ee3..f442d84 100755
--- a/tools/testing/selftests/rcutorture/bin/kvm.sh
+++ b/tools/testing/selftests/rcutorture/bin/kvm.sh
@@ -430,17 +430,10 @@
 	git diff HEAD >> $resdir/$ds/testid.txt
 fi
 ___EOF___
-awk < $T/cfgcpu.pack \
-	-v TORTURE_BUILDONLY="$TORTURE_BUILDONLY" \
-	-v CONFIGDIR="$CONFIGFRAG/" \
-	-v KVM="$KVM" \
-	-v ncpus=$cpus \
-	-v jitter="$jitter" \
-	-v rd=$resdir/$ds/ \
-	-v dur=$dur \
-	-v TORTURE_QEMU_ARG="$TORTURE_QEMU_ARG" \
-	-v TORTURE_BOOTARGS="$TORTURE_BOOTARGS" \
-'BEGIN {
+kvm-assign-cpus.sh /sys/devices/system/node > $T/cpuarray.awk
+kvm-get-cpus-script.sh $T/cpuarray.awk $T/dumpbatches.awk
+cat << '___EOF___' >> $T/dumpbatches.awk
+BEGIN {
 	i = 0;
 }
 
@@ -451,7 +444,7 @@
 }
 
 # Dump out the scripting required to run one test batch.
-function dump(first, pastlast, batchnum)
+function dump(first, pastlast, batchnum,  affinitylist)
 {
 	print "echo ----Start batch " batchnum ": `date` | tee -a " rd "log";
 	print "needqemurun="
@@ -483,6 +476,14 @@
 		print "echo ", cfr[jn], cpusr[jn] ovf ": Starting build. `date` | tee -a " rd "log";
 		print "mkdir " rd cfr[jn] " || :";
 		print "touch " builddir ".wait";
+		affinitylist = "";
+		if (gotcpus()) {
+			affinitylist = nextcpus(cpusr[jn]);
+		}
+		if (affinitylist ~ /^[0-9,-][0-9,-]*$/)
+			print "export TORTURE_AFFINITY=" affinitylist;
+		else
+			print "export TORTURE_AFFINITY=";
 		print "kvm-test-1-run.sh " CONFIGDIR cf[j], rd cfr[jn], dur " \"" TORTURE_QEMU_ARG "\" \"" TORTURE_BOOTARGS "\" > " rd cfr[jn]  "/kvm-test-1-run.sh.out 2>&1 &"
 		print "echo ", cfr[jn], cpusr[jn] ovf ": Waiting for build to complete. `date` | tee -a " rd "log";
 		print "while test -f " builddir ".wait"
@@ -560,7 +561,19 @@
 	# Dump the last batch.
 	if (ncpus != 0)
 		dump(first, i, batchnum);
-}' >> $T/script
+}
+___EOF___
+awk < $T/cfgcpu.pack \
+	-v TORTURE_BUILDONLY="$TORTURE_BUILDONLY" \
+	-v CONFIGDIR="$CONFIGFRAG/" \
+	-v KVM="$KVM" \
+	-v ncpus=$cpus \
+	-v jitter="$jitter" \
+	-v rd=$resdir/$ds/ \
+	-v dur=$dur \
+	-v TORTURE_QEMU_ARG="$TORTURE_QEMU_ARG" \
+	-v TORTURE_BOOTARGS="$TORTURE_BOOTARGS" \
+	-f $T/dumpbatches.awk >> $T/script
 echo kvm-end-run-stats.sh "$resdir/$ds" "$starttime" >> $T/script
 
 # Extract the tests and their batches from the script.
diff --git a/tools/testing/selftests/rcutorture/bin/torture.sh b/tools/testing/selftests/rcutorture/bin/torture.sh
index 53ec7c0..363f560 100755
--- a/tools/testing/selftests/rcutorture/bin/torture.sh
+++ b/tools/testing/selftests/rcutorture/bin/torture.sh
@@ -53,6 +53,7 @@
 do_kvfree=yes
 do_kasan=yes
 do_kcsan=no
+do_clocksourcewd=yes
 
 # doyesno - Helper function for yes/no arguments
 function doyesno () {
@@ -72,6 +73,7 @@
 	echo "       --configs-scftorture \"config-file list w/ repeat factor (2*CFLIST)\""
 	echo "       --doall"
 	echo "       --doallmodconfig / --do-no-allmodconfig"
+	echo "       --do-clocksourcewd / --do-no-clocksourcewd"
 	echo "       --do-kasan / --do-no-kasan"
 	echo "       --do-kcsan / --do-no-kcsan"
 	echo "       --do-kvfree / --do-no-kvfree"
@@ -109,7 +111,7 @@
 		configs_scftorture="$configs_scftorture $2"
 		shift
 		;;
-	--doall)
+	--do-all|--doall)
 		do_allmodconfig=yes
 		do_rcutorture=yes
 		do_locktorture=yes
@@ -119,10 +121,14 @@
 		do_kvfree=yes
 		do_kasan=yes
 		do_kcsan=yes
+		do_clocksourcewd=yes
 		;;
 	--do-allmodconfig|--do-no-allmodconfig)
 		do_allmodconfig=`doyesno "$1" --do-allmodconfig`
 		;;
+	--do-clocksourcewd|--do-no-clocksourcewd)
+		do_clocksourcewd=`doyesno "$1" --do-clocksourcewd`
+		;;
 	--do-kasan|--do-no-kasan)
 		do_kasan=`doyesno "$1" --do-kasan`
 		;;
@@ -135,7 +141,7 @@
 	--do-locktorture|--do-no-locktorture)
 		do_locktorture=`doyesno "$1" --do-locktorture`
 		;;
-	--do-none)
+	--do-none|--donone)
 		do_allmodconfig=no
 		do_rcutorture=no
 		do_locktorture=no
@@ -145,6 +151,7 @@
 		do_kvfree=no
 		do_kasan=no
 		do_kcsan=no
+		do_clocksourcewd=no
 		;;
 	--do-rcuscale|--do-no-rcuscale)
 		do_rcuscale=`doyesno "$1" --do-rcuscale`
@@ -279,9 +286,9 @@
 #	torture_bootargs="[ kernel boot arguments ]"
 #	torture_set flavor [ kvm.sh arguments ]
 #
-# Note that "flavor" is an arbitrary string.  Supply --torture if needed.
-# Note that quoting is problematic.  So on the command line, pass multiple
-# values with multiple kvm.sh argument instances.
+# Note that "flavor" is an arbitrary string that does not affect kvm.sh
+# in any way.  So also supply --torture if you need something other than
+# the default.
 function torture_set {
 	local cur_kcsan_kmake_args=
 	local kcsan_kmake_tag=
@@ -377,6 +384,22 @@
 	torture_set "rcuscale-kvfree" tools/testing/selftests/rcutorture/bin/kvm.sh --torture rcuscale --allcpus --duration 10 --kconfig "CONFIG_NR_CPUS=$HALF_ALLOTED_CPUS" --memory 1G --trust-make
 fi
 
+if test "$do_clocksourcewd" = "yes"
+then
+	torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000"
+	torture_set "clocksourcewd-1" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --kconfig "CONFIG_TEST_CLOCKSOURCE_WATCHDOG=y" --trust-make
+
+	torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000 clocksource.max_cswd_read_retries=1"
+	torture_set "clocksourcewd-2" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --kconfig "CONFIG_TEST_CLOCKSOURCE_WATCHDOG=y" --trust-make
+
+	# In case our work is already done...
+	if test "$do_rcutorture" != "yes"
+	then
+		torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000"
+		torture_set "clocksourcewd-3" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --trust-make
+	fi
+fi
+
 echo " --- " $scriptname $args
 echo " --- " Done `date` | tee -a $T/log
 ret=0
@@ -395,6 +418,10 @@
 	nfailures="`wc -l "$T/failures" | awk '{ print $1 }'`"
 	ret=2
 fi
+if test "$do_kcsan" = "yes"
+then
+	TORTURE_KCONFIG_KCSAN_ARG=1 tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh tools/testing/selftests/rcutorture/res/$ds > tools/testing/selftests/rcutorture/res/$ds/kcsan.sum
+fi
 echo Started at $startdate, ended at `date`, duration `get_starttime_duration $starttime`. | tee -a $T/log
 echo Summary: Successes: $nsuccesses Failures: $nfailures. | tee -a $T/log
 tdir="`cat $T/successes $T/failures | head -1 | awk '{ print $NF }' | sed -e 's,/[^/]\+/*$,,'`"
diff --git a/tools/testing/selftests/rcutorture/configs/rcu/RUDE01 b/tools/testing/selftests/rcutorture/configs/rcu/RUDE01
index bafe94c..3ca1124 100644
--- a/tools/testing/selftests/rcutorture/configs/rcu/RUDE01
+++ b/tools/testing/selftests/rcutorture/configs/rcu/RUDE01
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_HOTPLUG_CPU=y
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
diff --git a/tools/testing/selftests/rcutorture/configs/rcu/TASKS01 b/tools/testing/selftests/rcutorture/configs/rcu/TASKS01
index bafe94c..3ca1124 100644
--- a/tools/testing/selftests/rcutorture/configs/rcu/TASKS01
+++ b/tools/testing/selftests/rcutorture/configs/rcu/TASKS01
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_HOTPLUG_CPU=y
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
diff --git a/tools/testing/selftests/rcutorture/configs/rcu/TASKS03 b/tools/testing/selftests/rcutorture/configs/rcu/TASKS03
index ea43990..dc02083 100644
--- a/tools/testing/selftests/rcutorture/configs/rcu/TASKS03
+++ b/tools/testing/selftests/rcutorture/configs/rcu/TASKS03
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
 CONFIG_PREEMPT=y