Message ID | 20120315164839.GA1657@linux.vnet.ibm.com |
---|---|
State | New |
Headers | show |
* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote: > The rcu_barrier() primitive interrupts each and every CPU, registering > a callback on every CPU. Once all of these callbacks have been invoked, > rcu_barrier() knows that every callback that was registered before > the call to rcu_barrier() has also been invoked. > > However, there is no point in registering a callback on a CPU that > currently has no callbacks, most especially if that CPU is in a > deep idle state. This commit therefore makes rcu_barrier() avoid > interrupting CPUs that have no callbacks. Doing this requires reworking > the handling of orphaned callbacks, otherwise callbacks could slip through > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had > not yet interrupted to a CPU that rcu_barrier() had already interrupted. > This reworking was needed anyway to take a first step towards weaning > RCU from the CPU_DYING notifier's use of stop_cpu(). Quoting Documentation/RCU/rcubarrier.txt: "We instead need the rcu_barrier() primitive. This primitive is similar to synchronize_rcu(), but instead of waiting solely for a grace period to elapse, it also waits for all outstanding RCU callbacks to complete. Pseudo-code using rcu_barrier() is as follows:" The patch you propose seems like a good approach to make rcu_barrier less disruptive, but everyone need to be aware that rcu_barrier() would quit having the side-effect of doing the equivalent of "synchronize_rcu()" from now on: within this new approach, in the case where there are no pending callbacks, rcu_barrier() could, AFAIU, return without waiting for the current grace period to complete. Any use of rcu_barrier() that would assume that a synchronize_rcu() is implicit with the rcu_barrier() execution would be a bug anyway, but those might only show up after this patch is applied. I would therefore recommend to audit all rcu_barrier() users to ensure none is expecting rcu_barrier to act as a synchronize_rcu before pushing this change. Thanks, Mathieu > > Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org> > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com> > > diff --git a/kernel/rcutree.c b/kernel/rcutree.c > index 403306b..8269656 100644 > --- a/kernel/rcutree.c > +++ b/kernel/rcutree.c > @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS]; > .gpnum = -300, \ > .completed = -300, \ > .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \ > + .orphan_nxttail = &structname##_state.orphan_nxtlist, \ > + .orphan_donetail = &structname##_state.orphan_donelist, \ > .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \ > .n_force_qs = 0, \ > .n_force_qs_ngp = 0, \ > @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp); > unsigned long rcutorture_testseq; > unsigned long rcutorture_vernum; > > +/* State information for rcu_barrier() and friends. */ > + > +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; > +static atomic_t rcu_barrier_cpu_count; > +static DEFINE_MUTEX(rcu_barrier_mutex); > +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq); > + > /* > * Return true if an RCU grace period is in progress. The ACCESS_ONCE()s > * permit this function to be invoked without holding the root rcu_node > @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) > #ifdef CONFIG_HOTPLUG_CPU > > /* > - * Move a dying CPU's RCU callbacks to online CPU's callback list. > + * Adopt the RCU callbacks from the specified rcu_state structure's > + * orphanage. The caller must hold the ->onofflock. > + */ > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) > +{ > + int i; > + struct rcu_data *rdp = __this_cpu_ptr(rsp->rda); > + > + /* > + * If there is an rcu_barrier() operation in progress, then > + * only the task doing that operation is permitted to adopt > + * callbacks. To do otherwise breaks rcu_barrier() and friends > + * by causing them to fail to wait for the callbacks in the > + * orphanage. > + */ > + if (rsp->rcu_barrier_in_progress && > + rsp->rcu_barrier_in_progress != current) > + return; > + > + /* Do the accounting first. */ > + rdp->qlen_lazy += rsp->qlen_lazy; > + rdp->qlen += rsp->qlen; > + rdp->n_cbs_adopted += rsp->qlen; > + rsp->qlen_lazy = 0; > + rsp->qlen = 0; > + > + /* > + * We do not need a memory barrier here because the only way we > + * can get here if there is an rcu_barrier() in flight is if > + * we are the task doing the rcu_barrier(). > + */ > + > + /* First adopt the ready-to-invoke callbacks. */ > + if (rsp->orphan_donelist != NULL) { > + *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL]; > + *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist; > + for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--) > + if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL]) > + rdp->nxttail[i] = rsp->orphan_donetail; > + rsp->orphan_donelist = NULL; > + rsp->orphan_donetail = &rsp->orphan_donelist; > + } > + > + /* And then adopt the callbacks that still need a grace period. */ > + if (rsp->orphan_nxtlist != NULL) { > + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist; > + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail; > + rsp->orphan_nxtlist = NULL; > + rsp->orphan_nxttail = &rsp->orphan_nxtlist; > + } > +} > + > +/* > + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage. > * Also record a quiescent state for this CPU for the current grace period. > * Synchronization and interrupt disabling are not required because > * this function executes in stop_machine() context. Therefore, cleanup > @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) > static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) > { > int i; > + unsigned long flags; > unsigned long mask; > - int receive_cpu = cpumask_any(cpu_online_mask); > + bool orphaned = 0; > struct rcu_data *rdp = this_cpu_ptr(rsp->rda); > - struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu); > RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */ > > - /* First, adjust the counts. */ > + /* Move the callbacks to the orphanage under ->onofflock protection. */ > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > + > + /* First adjust the counts. */ > if (rdp->nxtlist != NULL) { > - receive_rdp->qlen_lazy += rdp->qlen_lazy; > - receive_rdp->qlen += rdp->qlen; > + rsp->qlen_lazy += rdp->qlen_lazy; > + rsp->qlen += rdp->qlen; > + rdp->n_cbs_orphaned += rdp->qlen; > rdp->qlen_lazy = 0; > rdp->qlen = 0; > + orphaned = 1; > } > > /* > - * Next, move ready-to-invoke callbacks to be invoked on some > - * other CPU. These will not be required to pass through another > - * grace period: They are done, regardless of CPU. > + * Next, move those callbacks still needing a grace period to > + * the orphanage, where some other CPU will pick them up. > + * Some of the callbacks might have gone partway through a grace > + * period, but that is too bad. They get to start over because we > + * cannot assume that grace periods are synchronized across CPUs. > + * We don't bother updating the ->nxttail[] array yet, instead > + * we just reset the whole thing later on. > */ > - if (rdp->nxtlist != NULL && > - rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) { > - struct rcu_head *oldhead; > - struct rcu_head **oldtail; > - struct rcu_head **newtail; > - > - oldhead = rdp->nxtlist; > - oldtail = receive_rdp->nxttail[RCU_DONE_TAIL]; > - rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL]; > - *rdp->nxttail[RCU_DONE_TAIL] = *oldtail; > - *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead; > - newtail = rdp->nxttail[RCU_DONE_TAIL]; > - for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) { > - if (receive_rdp->nxttail[i] == oldtail) > - receive_rdp->nxttail[i] = newtail; > - if (rdp->nxttail[i] == newtail) > - rdp->nxttail[i] = &rdp->nxtlist; > - } > + if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) { > + *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL]; > + rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL]; > + *rdp->nxttail[RCU_DONE_TAIL] = NULL; > } > > /* > - * Finally, put the rest of the callbacks at the end of the list. > - * The ones that made it partway through get to start over: We > - * cannot assume that grace periods are synchronized across CPUs. > - * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but > - * this does not seem compelling. Not yet, anyway.) > + * Then move the ready-to-invoke callbacks to the orphanage, > + * where some other CPU will pick them up. These will not be > + * required to pass though another grace period: They are done. > */ > if (rdp->nxtlist != NULL) { > - *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist; > - receive_rdp->nxttail[RCU_NEXT_TAIL] = > - rdp->nxttail[RCU_NEXT_TAIL]; > - receive_rdp->n_cbs_adopted += rdp->qlen; > - rdp->n_cbs_orphaned += rdp->qlen; > - > - rdp->nxtlist = NULL; > - for (i = 0; i < RCU_NEXT_SIZE; i++) > - rdp->nxttail[i] = &rdp->nxtlist; > + *rsp->orphan_donetail = rdp->nxtlist; > + rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL]; > } > > + /* Finally, initialize the rcu_data structure's list to empty. */ > + rdp->nxtlist = NULL; > + for (i = 0; i < RCU_NEXT_SIZE; i++) > + rdp->nxttail[i] = &rdp->nxtlist; > + > + /* > + * Wait up the rcu_barrier() task if there is one and if we > + * actually sent anything to the orphanage. Except that we > + * must delay the wakeup until ->onofflock is released to > + * avoid deadlock. > + */ > + if (!rsp->rcu_barrier_in_progress) > + orphaned = 0; > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > + if (orphaned) > + wake_up(&rcu_barrier_wq); > + > /* > * Record a quiescent state for the dying CPU. This is safe > * only because we have already cleared out the callbacks. > @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) > rcu_stop_cpu_kthread(cpu); > rcu_node_kthread_setaffinity(rnp, -1); > > - /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */ > + /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */ > > /* Exclude any attempts to start a new grace period. */ > raw_spin_lock_irqsave(&rsp->onofflock, flags); > > + /* Collect the dead CPU's callbacks. */ > + rcu_adopt_orphan_cbs(rsp); > + > /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */ > mask = rdp->grpmask; /* rnp->grplo is constant. */ > do { > @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) > > #else /* #ifdef CONFIG_HOTPLUG_CPU */ > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) > +{ > +} > + > static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) > { > } > @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) > rcu_is_callbacks_kthread()); > > /* Update count, and requeue any remaining callbacks. */ > - rdp->qlen_lazy -= count_lazy; > - rdp->qlen -= count; > - rdp->n_cbs_invoked += count; > if (list != NULL) { > *tail = rdp->nxtlist; > rdp->nxtlist = list; > @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) > else > break; > } > + smp_mb(); /* List handling before counting for rcu_barrier(). */ > + rdp->qlen_lazy -= count_lazy; > + rdp->qlen -= count; > + rdp->n_cbs_invoked += count; > > /* Reinstate batch limit if we have worked down the excess. */ > if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark) > @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), > rdp = this_cpu_ptr(rsp->rda); > > /* Add the callback to our list. */ > - *rdp->nxttail[RCU_NEXT_TAIL] = head; > - rdp->nxttail[RCU_NEXT_TAIL] = &head->next; > rdp->qlen++; > if (lazy) > rdp->qlen_lazy++; > else > rcu_idle_count_callbacks_posted(); > + smp_mb(); /* Count before adding callback for rcu_barrier(). */ > + *rdp->nxttail[RCU_NEXT_TAIL] = head; > + rdp->nxttail[RCU_NEXT_TAIL] = &head->next; > > if (__is_kfree_rcu_offset((unsigned long)func)) > trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func, > @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu) > rcu_preempt_cpu_has_callbacks(cpu); > } > > -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; > -static atomic_t rcu_barrier_cpu_count; > -static DEFINE_MUTEX(rcu_barrier_mutex); > -static struct completion rcu_barrier_completion; > - > static void rcu_barrier_callback(struct rcu_head *notused) > { > if (atomic_dec_and_test(&rcu_barrier_cpu_count)) > - complete(&rcu_barrier_completion); > + wake_up(&rcu_barrier_wq); > } > > /* > @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp, > void (*call_rcu_func)(struct rcu_head *head, > void (*func)(struct rcu_head *head))) > { > - BUG_ON(in_interrupt()); > + int cpu; > + unsigned long flags; > + struct rcu_data *rdp; > + struct rcu_head rh; > + > + init_rcu_head_on_stack(&rh); > + > /* Take mutex to serialize concurrent rcu_barrier() requests. */ > mutex_lock(&rcu_barrier_mutex); > - init_completion(&rcu_barrier_completion); > + > + smp_mb(); /* Prevent any prior operations from leaking in. */ > + > /* > - * Initialize rcu_barrier_cpu_count to 1, then invoke > - * rcu_barrier_func() on each CPU, so that each CPU also has > - * incremented rcu_barrier_cpu_count. Only then is it safe to > - * decrement rcu_barrier_cpu_count -- otherwise the first CPU > - * might complete its grace period before all of the other CPUs > - * did their increment, causing this function to return too > - * early. Note that on_each_cpu() disables irqs, which prevents > - * any CPUs from coming online or going offline until each online > - * CPU has queued its RCU-barrier callback. > + * Initialize the count to one rather than to zero in order to > + * avoid a too-soon return to zero in case of a short grace period > + * (or preemption of this task). Also flag this task as doing > + * an rcu_barrier(). This will prevent anyone else from adopting > + * orphaned callbacks, which could cause otherwise failure if a > + * CPU went offline and quickly came back online. To see this, > + * consider the following sequence of events: > + * > + * 1. We cause CPU 0 to post an rcu_barrier_callback() callback. > + * 2. CPU 1 goes offline, orphaning its callbacks. > + * 3. CPU 0 adopts CPU 1's orphaned callbacks. > + * 4. CPU 1 comes back online. > + * 5. We cause CPU 1 to post an rcu_barrier_callback() callback. > + * 6. Both rcu_barrier_callback() callbacks are invoked, awakening > + * us -- but before CPU 1's orphaned callbacks are invoked!!! > */ > atomic_set(&rcu_barrier_cpu_count, 1); > - on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1); > - if (atomic_dec_and_test(&rcu_barrier_cpu_count)) > - complete(&rcu_barrier_completion); > - wait_for_completion(&rcu_barrier_completion); > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > + rsp->rcu_barrier_in_progress = current; > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > + > + /* > + * Force every CPU with callbacks to register a new callback > + * that will tell us when all the preceding callbacks have > + * been invoked. If an offline CPU has callbacks, wait for > + * it to either come back online or to finish orphaning those > + * callbacks. > + */ > + for_each_possible_cpu(cpu) { > + preempt_disable(); > + rdp = per_cpu_ptr(rsp->rda, cpu); > + if (cpu_is_offline(cpu)) { > + preempt_enable(); > + while (cpu_is_offline(cpu) && > + ACCESS_ONCE(rdp->qlen)) > + schedule_timeout_interruptible(1); > + } else if (ACCESS_ONCE(rdp->qlen)) { > + smp_call_function_single(cpu, rcu_barrier_func, > + (void *)call_rcu_func, 1); > + preempt_enable(); > + } > + } > + > + /* > + * Force any ongoing CPU-hotplug operations to complete, > + * so that any callbacks from the outgoing CPUs are now in > + * the orphanage. > + */ > + cpu_maps_update_begin(); > + cpu_maps_update_done(); > + > + /* > + * Now that all online CPUs have rcu_barrier_callback() callbacks > + * posted, we can adopt all of the orphaned callbacks and place > + * an rcu_barrier_callback() callback after them. When that is done, > + * we are guaranteed to have an rcu_barrier_callback() callback > + * following every callback that could possibly have been > + * registered before _rcu_barrier() was called. > + */ > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > + rcu_adopt_orphan_cbs(rsp); > + atomic_inc(&rcu_barrier_cpu_count); > + call_rcu_func(&rh, rcu_barrier_callback); > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > + > + /* > + * Now that we have an rcu_barrier_callback() callback on each > + * CPU, and thus each counted, remove the initial count. > + */ > + atomic_dec(&rcu_barrier_cpu_count); > + smp_mb__after_atomic_dec(); > + > + /* > + * Loop waiting for all rcu_barrier_callback() callbacks to be > + * invoked. Adopt any orphaned callbacks in the meantime, just > + * in case one of the rcu_barrier_callback() callbacks is orphaned. > + */ > + while (atomic_read(&rcu_barrier_cpu_count) > 0) { > + wait_event(rcu_barrier_wq, > + atomic_read(&rcu_barrier_cpu_count) == 0 || > + ACCESS_ONCE(rsp->qlen)); > + if (ACCESS_ONCE(rsp->qlen)) { > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > + rcu_adopt_orphan_cbs(rsp); > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > + } > + } > + > + /* > + * Done, so let others adopt orphaned callbacks. But avoid > + * indefinite postponement of any additional orphans by adopting > + * one more time. > + */ > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > + rcu_adopt_orphan_cbs(rsp); > + rsp->rcu_barrier_in_progress = NULL; > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > + > + /* Other rcu_barrier() invocations can now safely proceed. */ > mutex_unlock(&rcu_barrier_mutex); > + > + destroy_rcu_head_on_stack(&rh); > } > > /** > diff --git a/kernel/rcutree.h b/kernel/rcutree.h > index 36ca28e..1e49c56 100644 > --- a/kernel/rcutree.h > +++ b/kernel/rcutree.h > @@ -371,6 +371,17 @@ struct rcu_state { > > raw_spinlock_t onofflock; /* exclude on/offline and */ > /* starting new GP. */ > + struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */ > + /* need a grace period. */ > + struct rcu_head **orphan_nxttail; /* Tail of above. */ > + struct rcu_head *orphan_donelist; /* Orphaned callbacks that */ > + /* are ready to invoke. */ > + struct rcu_head **orphan_donetail; /* Tail of above. */ > + long qlen_lazy; /* Number of lazy callbacks. */ > + long qlen; /* Total number of callbacks. */ > + struct task_struct *rcu_barrier_in_progress; > + /* Task doing rcu_barrier(), */ > + /* or NULL if no barrier. */ > raw_spinlock_t fqslock; /* Only one task forcing */ > /* quiescent states. */ > unsigned long jiffies_force_qs; /* Time at which to invoke */ > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c > index ed459ed..d4bc16d 100644 > --- a/kernel/rcutree_trace.c > +++ b/kernel/rcutree_trace.c > @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp) > > gpnum = rsp->gpnum; > seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x " > - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n", > + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n", > rsp->completed, gpnum, rsp->fqs_state, > (long)(rsp->jiffies_force_qs - jiffies), > (int)(jiffies & 0xffff), > rsp->n_force_qs, rsp->n_force_qs_ngp, > rsp->n_force_qs - rsp->n_force_qs_ngp, > - rsp->n_force_qs_lh); > + rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen); > for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) { > if (rnp->level != level) { > seq_puts(m, "\n"); >
On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote: > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote: > > The rcu_barrier() primitive interrupts each and every CPU, registering > > a callback on every CPU. Once all of these callbacks have been invoked, > > rcu_barrier() knows that every callback that was registered before > > the call to rcu_barrier() has also been invoked. > > > > However, there is no point in registering a callback on a CPU that > > currently has no callbacks, most especially if that CPU is in a > > deep idle state. This commit therefore makes rcu_barrier() avoid > > interrupting CPUs that have no callbacks. Doing this requires reworking > > the handling of orphaned callbacks, otherwise callbacks could slip through > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had > > not yet interrupted to a CPU that rcu_barrier() had already interrupted. > > This reworking was needed anyway to take a first step towards weaning > > RCU from the CPU_DYING notifier's use of stop_cpu(). > > Quoting Documentation/RCU/rcubarrier.txt: > > "We instead need the rcu_barrier() primitive. This primitive is similar > to synchronize_rcu(), but instead of waiting solely for a grace > period to elapse, it also waits for all outstanding RCU callbacks to > complete. Pseudo-code using rcu_barrier() is as follows:" > > The patch you propose seems like a good approach to make rcu_barrier > less disruptive, but everyone need to be aware that rcu_barrier() would > quit having the side-effect of doing the equivalent of > "synchronize_rcu()" from now on: within this new approach, in the case > where there are no pending callbacks, rcu_barrier() could, AFAIU, return > without waiting for the current grace period to complete. > > Any use of rcu_barrier() that would assume that a synchronize_rcu() is > implicit with the rcu_barrier() execution would be a bug anyway, but > those might only show up after this patch is applied. I would therefore > recommend to audit all rcu_barrier() users to ensure none is expecting > rcu_barrier to act as a synchronize_rcu before pushing this change. Good catch! I am going to chicken out and explicitly wait for a grace period if there were no callbacks. Having rcu_barrier() very rarely be a quick no-op does sound like a standing invitation for subtle non-reproducible bugs. ;-) Thanx, Paul > Thanks, > > Mathieu > > > > > Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org> > > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com> > > > > diff --git a/kernel/rcutree.c b/kernel/rcutree.c > > index 403306b..8269656 100644 > > --- a/kernel/rcutree.c > > +++ b/kernel/rcutree.c > > @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS]; > > .gpnum = -300, \ > > .completed = -300, \ > > .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \ > > + .orphan_nxttail = &structname##_state.orphan_nxtlist, \ > > + .orphan_donetail = &structname##_state.orphan_donelist, \ > > .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \ > > .n_force_qs = 0, \ > > .n_force_qs_ngp = 0, \ > > @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp); > > unsigned long rcutorture_testseq; > > unsigned long rcutorture_vernum; > > > > +/* State information for rcu_barrier() and friends. */ > > + > > +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; > > +static atomic_t rcu_barrier_cpu_count; > > +static DEFINE_MUTEX(rcu_barrier_mutex); > > +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq); > > + > > /* > > * Return true if an RCU grace period is in progress. The ACCESS_ONCE()s > > * permit this function to be invoked without holding the root rcu_node > > @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) > > #ifdef CONFIG_HOTPLUG_CPU > > > > /* > > - * Move a dying CPU's RCU callbacks to online CPU's callback list. > > + * Adopt the RCU callbacks from the specified rcu_state structure's > > + * orphanage. The caller must hold the ->onofflock. > > + */ > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) > > +{ > > + int i; > > + struct rcu_data *rdp = __this_cpu_ptr(rsp->rda); > > + > > + /* > > + * If there is an rcu_barrier() operation in progress, then > > + * only the task doing that operation is permitted to adopt > > + * callbacks. To do otherwise breaks rcu_barrier() and friends > > + * by causing them to fail to wait for the callbacks in the > > + * orphanage. > > + */ > > + if (rsp->rcu_barrier_in_progress && > > + rsp->rcu_barrier_in_progress != current) > > + return; > > + > > + /* Do the accounting first. */ > > + rdp->qlen_lazy += rsp->qlen_lazy; > > + rdp->qlen += rsp->qlen; > > + rdp->n_cbs_adopted += rsp->qlen; > > + rsp->qlen_lazy = 0; > > + rsp->qlen = 0; > > + > > + /* > > + * We do not need a memory barrier here because the only way we > > + * can get here if there is an rcu_barrier() in flight is if > > + * we are the task doing the rcu_barrier(). > > + */ > > + > > + /* First adopt the ready-to-invoke callbacks. */ > > + if (rsp->orphan_donelist != NULL) { > > + *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL]; > > + *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist; > > + for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--) > > + if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL]) > > + rdp->nxttail[i] = rsp->orphan_donetail; > > + rsp->orphan_donelist = NULL; > > + rsp->orphan_donetail = &rsp->orphan_donelist; > > + } > > + > > + /* And then adopt the callbacks that still need a grace period. */ > > + if (rsp->orphan_nxtlist != NULL) { > > + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist; > > + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail; > > + rsp->orphan_nxtlist = NULL; > > + rsp->orphan_nxttail = &rsp->orphan_nxtlist; > > + } > > +} > > + > > +/* > > + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage. > > * Also record a quiescent state for this CPU for the current grace period. > > * Synchronization and interrupt disabling are not required because > > * this function executes in stop_machine() context. Therefore, cleanup > > @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) > > static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) > > { > > int i; > > + unsigned long flags; > > unsigned long mask; > > - int receive_cpu = cpumask_any(cpu_online_mask); > > + bool orphaned = 0; > > struct rcu_data *rdp = this_cpu_ptr(rsp->rda); > > - struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu); > > RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */ > > > > - /* First, adjust the counts. */ > > + /* Move the callbacks to the orphanage under ->onofflock protection. */ > > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > > + > > + /* First adjust the counts. */ > > if (rdp->nxtlist != NULL) { > > - receive_rdp->qlen_lazy += rdp->qlen_lazy; > > - receive_rdp->qlen += rdp->qlen; > > + rsp->qlen_lazy += rdp->qlen_lazy; > > + rsp->qlen += rdp->qlen; > > + rdp->n_cbs_orphaned += rdp->qlen; > > rdp->qlen_lazy = 0; > > rdp->qlen = 0; > > + orphaned = 1; > > } > > > > /* > > - * Next, move ready-to-invoke callbacks to be invoked on some > > - * other CPU. These will not be required to pass through another > > - * grace period: They are done, regardless of CPU. > > + * Next, move those callbacks still needing a grace period to > > + * the orphanage, where some other CPU will pick them up. > > + * Some of the callbacks might have gone partway through a grace > > + * period, but that is too bad. They get to start over because we > > + * cannot assume that grace periods are synchronized across CPUs. > > + * We don't bother updating the ->nxttail[] array yet, instead > > + * we just reset the whole thing later on. > > */ > > - if (rdp->nxtlist != NULL && > > - rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) { > > - struct rcu_head *oldhead; > > - struct rcu_head **oldtail; > > - struct rcu_head **newtail; > > - > > - oldhead = rdp->nxtlist; > > - oldtail = receive_rdp->nxttail[RCU_DONE_TAIL]; > > - rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL]; > > - *rdp->nxttail[RCU_DONE_TAIL] = *oldtail; > > - *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead; > > - newtail = rdp->nxttail[RCU_DONE_TAIL]; > > - for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) { > > - if (receive_rdp->nxttail[i] == oldtail) > > - receive_rdp->nxttail[i] = newtail; > > - if (rdp->nxttail[i] == newtail) > > - rdp->nxttail[i] = &rdp->nxtlist; > > - } > > + if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) { > > + *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL]; > > + rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL]; > > + *rdp->nxttail[RCU_DONE_TAIL] = NULL; > > } > > > > /* > > - * Finally, put the rest of the callbacks at the end of the list. > > - * The ones that made it partway through get to start over: We > > - * cannot assume that grace periods are synchronized across CPUs. > > - * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but > > - * this does not seem compelling. Not yet, anyway.) > > + * Then move the ready-to-invoke callbacks to the orphanage, > > + * where some other CPU will pick them up. These will not be > > + * required to pass though another grace period: They are done. > > */ > > if (rdp->nxtlist != NULL) { > > - *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist; > > - receive_rdp->nxttail[RCU_NEXT_TAIL] = > > - rdp->nxttail[RCU_NEXT_TAIL]; > > - receive_rdp->n_cbs_adopted += rdp->qlen; > > - rdp->n_cbs_orphaned += rdp->qlen; > > - > > - rdp->nxtlist = NULL; > > - for (i = 0; i < RCU_NEXT_SIZE; i++) > > - rdp->nxttail[i] = &rdp->nxtlist; > > + *rsp->orphan_donetail = rdp->nxtlist; > > + rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL]; > > } > > > > + /* Finally, initialize the rcu_data structure's list to empty. */ > > + rdp->nxtlist = NULL; > > + for (i = 0; i < RCU_NEXT_SIZE; i++) > > + rdp->nxttail[i] = &rdp->nxtlist; > > + > > + /* > > + * Wait up the rcu_barrier() task if there is one and if we > > + * actually sent anything to the orphanage. Except that we > > + * must delay the wakeup until ->onofflock is released to > > + * avoid deadlock. > > + */ > > + if (!rsp->rcu_barrier_in_progress) > > + orphaned = 0; > > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > > + if (orphaned) > > + wake_up(&rcu_barrier_wq); > > + > > /* > > * Record a quiescent state for the dying CPU. This is safe > > * only because we have already cleared out the callbacks. > > @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) > > rcu_stop_cpu_kthread(cpu); > > rcu_node_kthread_setaffinity(rnp, -1); > > > > - /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */ > > + /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */ > > > > /* Exclude any attempts to start a new grace period. */ > > raw_spin_lock_irqsave(&rsp->onofflock, flags); > > > > + /* Collect the dead CPU's callbacks. */ > > + rcu_adopt_orphan_cbs(rsp); > > + > > /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */ > > mask = rdp->grpmask; /* rnp->grplo is constant. */ > > do { > > @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) > > > > #else /* #ifdef CONFIG_HOTPLUG_CPU */ > > > > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) > > +{ > > +} > > + > > static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) > > { > > } > > @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) > > rcu_is_callbacks_kthread()); > > > > /* Update count, and requeue any remaining callbacks. */ > > - rdp->qlen_lazy -= count_lazy; > > - rdp->qlen -= count; > > - rdp->n_cbs_invoked += count; > > if (list != NULL) { > > *tail = rdp->nxtlist; > > rdp->nxtlist = list; > > @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) > > else > > break; > > } > > + smp_mb(); /* List handling before counting for rcu_barrier(). */ > > + rdp->qlen_lazy -= count_lazy; > > + rdp->qlen -= count; > > + rdp->n_cbs_invoked += count; > > > > /* Reinstate batch limit if we have worked down the excess. */ > > if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark) > > @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), > > rdp = this_cpu_ptr(rsp->rda); > > > > /* Add the callback to our list. */ > > - *rdp->nxttail[RCU_NEXT_TAIL] = head; > > - rdp->nxttail[RCU_NEXT_TAIL] = &head->next; > > rdp->qlen++; > > if (lazy) > > rdp->qlen_lazy++; > > else > > rcu_idle_count_callbacks_posted(); > > + smp_mb(); /* Count before adding callback for rcu_barrier(). */ > > + *rdp->nxttail[RCU_NEXT_TAIL] = head; > > + rdp->nxttail[RCU_NEXT_TAIL] = &head->next; > > > > if (__is_kfree_rcu_offset((unsigned long)func)) > > trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func, > > @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu) > > rcu_preempt_cpu_has_callbacks(cpu); > > } > > > > -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; > > -static atomic_t rcu_barrier_cpu_count; > > -static DEFINE_MUTEX(rcu_barrier_mutex); > > -static struct completion rcu_barrier_completion; > > - > > static void rcu_barrier_callback(struct rcu_head *notused) > > { > > if (atomic_dec_and_test(&rcu_barrier_cpu_count)) > > - complete(&rcu_barrier_completion); > > + wake_up(&rcu_barrier_wq); > > } > > > > /* > > @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp, > > void (*call_rcu_func)(struct rcu_head *head, > > void (*func)(struct rcu_head *head))) > > { > > - BUG_ON(in_interrupt()); > > + int cpu; > > + unsigned long flags; > > + struct rcu_data *rdp; > > + struct rcu_head rh; > > + > > + init_rcu_head_on_stack(&rh); > > + > > /* Take mutex to serialize concurrent rcu_barrier() requests. */ > > mutex_lock(&rcu_barrier_mutex); > > - init_completion(&rcu_barrier_completion); > > + > > + smp_mb(); /* Prevent any prior operations from leaking in. */ > > + > > /* > > - * Initialize rcu_barrier_cpu_count to 1, then invoke > > - * rcu_barrier_func() on each CPU, so that each CPU also has > > - * incremented rcu_barrier_cpu_count. Only then is it safe to > > - * decrement rcu_barrier_cpu_count -- otherwise the first CPU > > - * might complete its grace period before all of the other CPUs > > - * did their increment, causing this function to return too > > - * early. Note that on_each_cpu() disables irqs, which prevents > > - * any CPUs from coming online or going offline until each online > > - * CPU has queued its RCU-barrier callback. > > + * Initialize the count to one rather than to zero in order to > > + * avoid a too-soon return to zero in case of a short grace period > > + * (or preemption of this task). Also flag this task as doing > > + * an rcu_barrier(). This will prevent anyone else from adopting > > + * orphaned callbacks, which could cause otherwise failure if a > > + * CPU went offline and quickly came back online. To see this, > > + * consider the following sequence of events: > > + * > > + * 1. We cause CPU 0 to post an rcu_barrier_callback() callback. > > + * 2. CPU 1 goes offline, orphaning its callbacks. > > + * 3. CPU 0 adopts CPU 1's orphaned callbacks. > > + * 4. CPU 1 comes back online. > > + * 5. We cause CPU 1 to post an rcu_barrier_callback() callback. > > + * 6. Both rcu_barrier_callback() callbacks are invoked, awakening > > + * us -- but before CPU 1's orphaned callbacks are invoked!!! > > */ > > atomic_set(&rcu_barrier_cpu_count, 1); > > - on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1); > > - if (atomic_dec_and_test(&rcu_barrier_cpu_count)) > > - complete(&rcu_barrier_completion); > > - wait_for_completion(&rcu_barrier_completion); > > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > > + rsp->rcu_barrier_in_progress = current; > > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > > + > > + /* > > + * Force every CPU with callbacks to register a new callback > > + * that will tell us when all the preceding callbacks have > > + * been invoked. If an offline CPU has callbacks, wait for > > + * it to either come back online or to finish orphaning those > > + * callbacks. > > + */ > > + for_each_possible_cpu(cpu) { > > + preempt_disable(); > > + rdp = per_cpu_ptr(rsp->rda, cpu); > > + if (cpu_is_offline(cpu)) { > > + preempt_enable(); > > + while (cpu_is_offline(cpu) && > > + ACCESS_ONCE(rdp->qlen)) > > + schedule_timeout_interruptible(1); > > + } else if (ACCESS_ONCE(rdp->qlen)) { > > + smp_call_function_single(cpu, rcu_barrier_func, > > + (void *)call_rcu_func, 1); > > + preempt_enable(); > > + } > > + } > > + > > + /* > > + * Force any ongoing CPU-hotplug operations to complete, > > + * so that any callbacks from the outgoing CPUs are now in > > + * the orphanage. > > + */ > > + cpu_maps_update_begin(); > > + cpu_maps_update_done(); > > + > > + /* > > + * Now that all online CPUs have rcu_barrier_callback() callbacks > > + * posted, we can adopt all of the orphaned callbacks and place > > + * an rcu_barrier_callback() callback after them. When that is done, > > + * we are guaranteed to have an rcu_barrier_callback() callback > > + * following every callback that could possibly have been > > + * registered before _rcu_barrier() was called. > > + */ > > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > > + rcu_adopt_orphan_cbs(rsp); > > + atomic_inc(&rcu_barrier_cpu_count); > > + call_rcu_func(&rh, rcu_barrier_callback); > > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > > + > > + /* > > + * Now that we have an rcu_barrier_callback() callback on each > > + * CPU, and thus each counted, remove the initial count. > > + */ > > + atomic_dec(&rcu_barrier_cpu_count); > > + smp_mb__after_atomic_dec(); > > + > > + /* > > + * Loop waiting for all rcu_barrier_callback() callbacks to be > > + * invoked. Adopt any orphaned callbacks in the meantime, just > > + * in case one of the rcu_barrier_callback() callbacks is orphaned. > > + */ > > + while (atomic_read(&rcu_barrier_cpu_count) > 0) { > > + wait_event(rcu_barrier_wq, > > + atomic_read(&rcu_barrier_cpu_count) == 0 || > > + ACCESS_ONCE(rsp->qlen)); > > + if (ACCESS_ONCE(rsp->qlen)) { > > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > > + rcu_adopt_orphan_cbs(rsp); > > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > > + } > > + } > > + > > + /* > > + * Done, so let others adopt orphaned callbacks. But avoid > > + * indefinite postponement of any additional orphans by adopting > > + * one more time. > > + */ > > + raw_spin_lock_irqsave(&rsp->onofflock, flags); > > + rcu_adopt_orphan_cbs(rsp); > > + rsp->rcu_barrier_in_progress = NULL; > > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); > > + > > + /* Other rcu_barrier() invocations can now safely proceed. */ > > mutex_unlock(&rcu_barrier_mutex); > > + > > + destroy_rcu_head_on_stack(&rh); > > } > > > > /** > > diff --git a/kernel/rcutree.h b/kernel/rcutree.h > > index 36ca28e..1e49c56 100644 > > --- a/kernel/rcutree.h > > +++ b/kernel/rcutree.h > > @@ -371,6 +371,17 @@ struct rcu_state { > > > > raw_spinlock_t onofflock; /* exclude on/offline and */ > > /* starting new GP. */ > > + struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */ > > + /* need a grace period. */ > > + struct rcu_head **orphan_nxttail; /* Tail of above. */ > > + struct rcu_head *orphan_donelist; /* Orphaned callbacks that */ > > + /* are ready to invoke. */ > > + struct rcu_head **orphan_donetail; /* Tail of above. */ > > + long qlen_lazy; /* Number of lazy callbacks. */ > > + long qlen; /* Total number of callbacks. */ > > + struct task_struct *rcu_barrier_in_progress; > > + /* Task doing rcu_barrier(), */ > > + /* or NULL if no barrier. */ > > raw_spinlock_t fqslock; /* Only one task forcing */ > > /* quiescent states. */ > > unsigned long jiffies_force_qs; /* Time at which to invoke */ > > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c > > index ed459ed..d4bc16d 100644 > > --- a/kernel/rcutree_trace.c > > +++ b/kernel/rcutree_trace.c > > @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp) > > > > gpnum = rsp->gpnum; > > seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x " > > - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n", > > + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n", > > rsp->completed, gpnum, rsp->fqs_state, > > (long)(rsp->jiffies_force_qs - jiffies), > > (int)(jiffies & 0xffff), > > rsp->n_force_qs, rsp->n_force_qs_ngp, > > rsp->n_force_qs - rsp->n_force_qs_ngp, > > - rsp->n_force_qs_lh); > > + rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen); > > for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) { > > if (rnp->level != level) { > > seq_puts(m, "\n"); > > > > -- > Mathieu Desnoyers > Operating System Efficiency R&D Consultant > EfficiOS Inc. > http://www.efficios.com >
On Thu, Mar 15, 2012 at 11:21:59AM -0700, Paul E. McKenney wrote: > On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote: > > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote: > > > The rcu_barrier() primitive interrupts each and every CPU, registering > > > a callback on every CPU. Once all of these callbacks have been invoked, > > > rcu_barrier() knows that every callback that was registered before > > > the call to rcu_barrier() has also been invoked. > > > > > > However, there is no point in registering a callback on a CPU that > > > currently has no callbacks, most especially if that CPU is in a > > > deep idle state. This commit therefore makes rcu_barrier() avoid > > > interrupting CPUs that have no callbacks. Doing this requires reworking > > > the handling of orphaned callbacks, otherwise callbacks could slip through > > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had > > > not yet interrupted to a CPU that rcu_barrier() had already interrupted. > > > This reworking was needed anyway to take a first step towards weaning > > > RCU from the CPU_DYING notifier's use of stop_cpu(). > > > > Quoting Documentation/RCU/rcubarrier.txt: > > > > "We instead need the rcu_barrier() primitive. This primitive is similar > > to synchronize_rcu(), but instead of waiting solely for a grace > > period to elapse, it also waits for all outstanding RCU callbacks to > > complete. Pseudo-code using rcu_barrier() is as follows:" > > > > The patch you propose seems like a good approach to make rcu_barrier > > less disruptive, but everyone need to be aware that rcu_barrier() would > > quit having the side-effect of doing the equivalent of > > "synchronize_rcu()" from now on: within this new approach, in the case > > where there are no pending callbacks, rcu_barrier() could, AFAIU, return > > without waiting for the current grace period to complete. > > > > Any use of rcu_barrier() that would assume that a synchronize_rcu() is > > implicit with the rcu_barrier() execution would be a bug anyway, but > > those might only show up after this patch is applied. I would therefore > > recommend to audit all rcu_barrier() users to ensure none is expecting > > rcu_barrier to act as a synchronize_rcu before pushing this change. > > Good catch! > > I am going to chicken out and explicitly wait for a grace period if there > were no callbacks. Having rcu_barrier() very rarely be a quick no-op does > sound like a standing invitation for subtle non-reproducible bugs. ;-) I take it back... After adopting callbacks (rcu_adopt_orphan_cbs()), _rcu_barrier() unconditionally posts a callback on the current CPU and waits for it. So _rcu_barrier() actually does always wait for a grace period. Yes, I could be more dainty and make rcu_adopt_orphan_cbs() return an indication of whether there were any callbacks, and then post the callback only if either there were some callbacks adopted or if there were no calls to smp_call_function_single(). But that adds complexity for almost no benefit -- and no one can accuse _rcu_barrier() of being a fastpath! ;-) Or am I missing something here? Thanx, Paul
* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote: > On Thu, Mar 15, 2012 at 11:21:59AM -0700, Paul E. McKenney wrote: > > On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote: > > > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote: > > > > The rcu_barrier() primitive interrupts each and every CPU, registering > > > > a callback on every CPU. Once all of these callbacks have been invoked, > > > > rcu_barrier() knows that every callback that was registered before > > > > the call to rcu_barrier() has also been invoked. > > > > > > > > However, there is no point in registering a callback on a CPU that > > > > currently has no callbacks, most especially if that CPU is in a > > > > deep idle state. This commit therefore makes rcu_barrier() avoid > > > > interrupting CPUs that have no callbacks. Doing this requires reworking > > > > the handling of orphaned callbacks, otherwise callbacks could slip through > > > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had > > > > not yet interrupted to a CPU that rcu_barrier() had already interrupted. > > > > This reworking was needed anyway to take a first step towards weaning > > > > RCU from the CPU_DYING notifier's use of stop_cpu(). > > > > > > Quoting Documentation/RCU/rcubarrier.txt: > > > > > > "We instead need the rcu_barrier() primitive. This primitive is similar > > > to synchronize_rcu(), but instead of waiting solely for a grace > > > period to elapse, it also waits for all outstanding RCU callbacks to > > > complete. Pseudo-code using rcu_barrier() is as follows:" > > > > > > The patch you propose seems like a good approach to make rcu_barrier > > > less disruptive, but everyone need to be aware that rcu_barrier() would > > > quit having the side-effect of doing the equivalent of > > > "synchronize_rcu()" from now on: within this new approach, in the case > > > where there are no pending callbacks, rcu_barrier() could, AFAIU, return > > > without waiting for the current grace period to complete. > > > > > > Any use of rcu_barrier() that would assume that a synchronize_rcu() is > > > implicit with the rcu_barrier() execution would be a bug anyway, but > > > those might only show up after this patch is applied. I would therefore > > > recommend to audit all rcu_barrier() users to ensure none is expecting > > > rcu_barrier to act as a synchronize_rcu before pushing this change. > > > > Good catch! > > > > I am going to chicken out and explicitly wait for a grace period if there > > were no callbacks. Having rcu_barrier() very rarely be a quick no-op does > > sound like a standing invitation for subtle non-reproducible bugs. ;-) > > I take it back... > > After adopting callbacks (rcu_adopt_orphan_cbs()), _rcu_barrier() > unconditionally posts a callback on the current CPU and waits for it. > So _rcu_barrier() actually does always wait for a grace period. Ah ok, that should handle it then. > > Yes, I could be more dainty and make rcu_adopt_orphan_cbs() return an > indication of whether there were any callbacks, and then post the callback > only if either there were some callbacks adopted or if there were no calls > to smp_call_function_single(). But that adds complexity for almost no > benefit -- and no one can accuse _rcu_barrier() of being a fastpath! ;-) > > Or am I missing something here? Nope, I think it all makes sense. Thanks, Mathieu > > Thanx, Paul >
Hi Paul, On 03/15/2012 10:18 PM, Paul E. McKenney wrote: > The rcu_barrier() primitive interrupts each and every CPU, registering > a callback on every CPU. Once all of these callbacks have been invoked, > rcu_barrier() knows that every callback that was registered before > the call to rcu_barrier() has also been invoked. > > However, there is no point in registering a callback on a CPU that > currently has no callbacks, most especially if that CPU is in a > deep idle state. This commit therefore makes rcu_barrier() avoid > interrupting CPUs that have no callbacks. Doing this requires reworking > the handling of orphaned callbacks, otherwise callbacks could slip through > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had > not yet interrupted to a CPU that rcu_barrier() had already interrupted. > This reworking was needed anyway to take a first step towards weaning > RCU from the CPU_DYING notifier's use of stop_cpu(). > s/stop_cpu()/stop_machine() ? Thank you for doing this! :-) > Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org> > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com> > Regards, Srivatsa S. Bhat IBM Linux Technology Center
diff --git a/kernel/rcutree.c b/kernel/rcutree.c index 403306b..8269656 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS]; .gpnum = -300, \ .completed = -300, \ .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \ + .orphan_nxttail = &structname##_state.orphan_nxtlist, \ + .orphan_donetail = &structname##_state.orphan_donelist, \ .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \ .n_force_qs = 0, \ .n_force_qs_ngp = 0, \ @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp); unsigned long rcutorture_testseq; unsigned long rcutorture_vernum; +/* State information for rcu_barrier() and friends. */ + +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; +static atomic_t rcu_barrier_cpu_count; +static DEFINE_MUTEX(rcu_barrier_mutex); +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq); + /* * Return true if an RCU grace period is in progress. The ACCESS_ONCE()s * permit this function to be invoked without holding the root rcu_node @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) #ifdef CONFIG_HOTPLUG_CPU /* - * Move a dying CPU's RCU callbacks to online CPU's callback list. + * Adopt the RCU callbacks from the specified rcu_state structure's + * orphanage. The caller must hold the ->onofflock. + */ +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) +{ + int i; + struct rcu_data *rdp = __this_cpu_ptr(rsp->rda); + + /* + * If there is an rcu_barrier() operation in progress, then + * only the task doing that operation is permitted to adopt + * callbacks. To do otherwise breaks rcu_barrier() and friends + * by causing them to fail to wait for the callbacks in the + * orphanage. + */ + if (rsp->rcu_barrier_in_progress && + rsp->rcu_barrier_in_progress != current) + return; + + /* Do the accounting first. */ + rdp->qlen_lazy += rsp->qlen_lazy; + rdp->qlen += rsp->qlen; + rdp->n_cbs_adopted += rsp->qlen; + rsp->qlen_lazy = 0; + rsp->qlen = 0; + + /* + * We do not need a memory barrier here because the only way we + * can get here if there is an rcu_barrier() in flight is if + * we are the task doing the rcu_barrier(). + */ + + /* First adopt the ready-to-invoke callbacks. */ + if (rsp->orphan_donelist != NULL) { + *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL]; + *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist; + for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--) + if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL]) + rdp->nxttail[i] = rsp->orphan_donetail; + rsp->orphan_donelist = NULL; + rsp->orphan_donetail = &rsp->orphan_donelist; + } + + /* And then adopt the callbacks that still need a grace period. */ + if (rsp->orphan_nxtlist != NULL) { + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist; + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail; + rsp->orphan_nxtlist = NULL; + rsp->orphan_nxttail = &rsp->orphan_nxtlist; + } +} + +/* + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage. * Also record a quiescent state for this CPU for the current grace period. * Synchronization and interrupt disabling are not required because * this function executes in stop_machine() context. Therefore, cleanup @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp) static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) { int i; + unsigned long flags; unsigned long mask; - int receive_cpu = cpumask_any(cpu_online_mask); + bool orphaned = 0; struct rcu_data *rdp = this_cpu_ptr(rsp->rda); - struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu); RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */ - /* First, adjust the counts. */ + /* Move the callbacks to the orphanage under ->onofflock protection. */ + raw_spin_lock_irqsave(&rsp->onofflock, flags); + + /* First adjust the counts. */ if (rdp->nxtlist != NULL) { - receive_rdp->qlen_lazy += rdp->qlen_lazy; - receive_rdp->qlen += rdp->qlen; + rsp->qlen_lazy += rdp->qlen_lazy; + rsp->qlen += rdp->qlen; + rdp->n_cbs_orphaned += rdp->qlen; rdp->qlen_lazy = 0; rdp->qlen = 0; + orphaned = 1; } /* - * Next, move ready-to-invoke callbacks to be invoked on some - * other CPU. These will not be required to pass through another - * grace period: They are done, regardless of CPU. + * Next, move those callbacks still needing a grace period to + * the orphanage, where some other CPU will pick them up. + * Some of the callbacks might have gone partway through a grace + * period, but that is too bad. They get to start over because we + * cannot assume that grace periods are synchronized across CPUs. + * We don't bother updating the ->nxttail[] array yet, instead + * we just reset the whole thing later on. */ - if (rdp->nxtlist != NULL && - rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) { - struct rcu_head *oldhead; - struct rcu_head **oldtail; - struct rcu_head **newtail; - - oldhead = rdp->nxtlist; - oldtail = receive_rdp->nxttail[RCU_DONE_TAIL]; - rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL]; - *rdp->nxttail[RCU_DONE_TAIL] = *oldtail; - *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead; - newtail = rdp->nxttail[RCU_DONE_TAIL]; - for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) { - if (receive_rdp->nxttail[i] == oldtail) - receive_rdp->nxttail[i] = newtail; - if (rdp->nxttail[i] == newtail) - rdp->nxttail[i] = &rdp->nxtlist; - } + if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) { + *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL]; + rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL]; + *rdp->nxttail[RCU_DONE_TAIL] = NULL; } /* - * Finally, put the rest of the callbacks at the end of the list. - * The ones that made it partway through get to start over: We - * cannot assume that grace periods are synchronized across CPUs. - * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but - * this does not seem compelling. Not yet, anyway.) + * Then move the ready-to-invoke callbacks to the orphanage, + * where some other CPU will pick them up. These will not be + * required to pass though another grace period: They are done. */ if (rdp->nxtlist != NULL) { - *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist; - receive_rdp->nxttail[RCU_NEXT_TAIL] = - rdp->nxttail[RCU_NEXT_TAIL]; - receive_rdp->n_cbs_adopted += rdp->qlen; - rdp->n_cbs_orphaned += rdp->qlen; - - rdp->nxtlist = NULL; - for (i = 0; i < RCU_NEXT_SIZE; i++) - rdp->nxttail[i] = &rdp->nxtlist; + *rsp->orphan_donetail = rdp->nxtlist; + rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL]; } + /* Finally, initialize the rcu_data structure's list to empty. */ + rdp->nxtlist = NULL; + for (i = 0; i < RCU_NEXT_SIZE; i++) + rdp->nxttail[i] = &rdp->nxtlist; + + /* + * Wait up the rcu_barrier() task if there is one and if we + * actually sent anything to the orphanage. Except that we + * must delay the wakeup until ->onofflock is released to + * avoid deadlock. + */ + if (!rsp->rcu_barrier_in_progress) + orphaned = 0; + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + if (orphaned) + wake_up(&rcu_barrier_wq); + /* * Record a quiescent state for the dying CPU. This is safe * only because we have already cleared out the callbacks. @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) rcu_stop_cpu_kthread(cpu); rcu_node_kthread_setaffinity(rnp, -1); - /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */ + /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */ /* Exclude any attempts to start a new grace period. */ raw_spin_lock_irqsave(&rsp->onofflock, flags); + /* Collect the dead CPU's callbacks. */ + rcu_adopt_orphan_cbs(rsp); + /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */ mask = rdp->grpmask; /* rnp->grplo is constant. */ do { @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp) #else /* #ifdef CONFIG_HOTPLUG_CPU */ +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp) +{ +} + static void rcu_cleanup_dying_cpu(struct rcu_state *rsp) { } @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) rcu_is_callbacks_kthread()); /* Update count, and requeue any remaining callbacks. */ - rdp->qlen_lazy -= count_lazy; - rdp->qlen -= count; - rdp->n_cbs_invoked += count; if (list != NULL) { *tail = rdp->nxtlist; rdp->nxtlist = list; @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp) else break; } + smp_mb(); /* List handling before counting for rcu_barrier(). */ + rdp->qlen_lazy -= count_lazy; + rdp->qlen -= count; + rdp->n_cbs_invoked += count; /* Reinstate batch limit if we have worked down the excess. */ if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark) @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu), rdp = this_cpu_ptr(rsp->rda); /* Add the callback to our list. */ - *rdp->nxttail[RCU_NEXT_TAIL] = head; - rdp->nxttail[RCU_NEXT_TAIL] = &head->next; rdp->qlen++; if (lazy) rdp->qlen_lazy++; else rcu_idle_count_callbacks_posted(); + smp_mb(); /* Count before adding callback for rcu_barrier(). */ + *rdp->nxttail[RCU_NEXT_TAIL] = head; + rdp->nxttail[RCU_NEXT_TAIL] = &head->next; if (__is_kfree_rcu_offset((unsigned long)func)) trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func, @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu) rcu_preempt_cpu_has_callbacks(cpu); } -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL}; -static atomic_t rcu_barrier_cpu_count; -static DEFINE_MUTEX(rcu_barrier_mutex); -static struct completion rcu_barrier_completion; - static void rcu_barrier_callback(struct rcu_head *notused) { if (atomic_dec_and_test(&rcu_barrier_cpu_count)) - complete(&rcu_barrier_completion); + wake_up(&rcu_barrier_wq); } /* @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp, void (*call_rcu_func)(struct rcu_head *head, void (*func)(struct rcu_head *head))) { - BUG_ON(in_interrupt()); + int cpu; + unsigned long flags; + struct rcu_data *rdp; + struct rcu_head rh; + + init_rcu_head_on_stack(&rh); + /* Take mutex to serialize concurrent rcu_barrier() requests. */ mutex_lock(&rcu_barrier_mutex); - init_completion(&rcu_barrier_completion); + + smp_mb(); /* Prevent any prior operations from leaking in. */ + /* - * Initialize rcu_barrier_cpu_count to 1, then invoke - * rcu_barrier_func() on each CPU, so that each CPU also has - * incremented rcu_barrier_cpu_count. Only then is it safe to - * decrement rcu_barrier_cpu_count -- otherwise the first CPU - * might complete its grace period before all of the other CPUs - * did their increment, causing this function to return too - * early. Note that on_each_cpu() disables irqs, which prevents - * any CPUs from coming online or going offline until each online - * CPU has queued its RCU-barrier callback. + * Initialize the count to one rather than to zero in order to + * avoid a too-soon return to zero in case of a short grace period + * (or preemption of this task). Also flag this task as doing + * an rcu_barrier(). This will prevent anyone else from adopting + * orphaned callbacks, which could cause otherwise failure if a + * CPU went offline and quickly came back online. To see this, + * consider the following sequence of events: + * + * 1. We cause CPU 0 to post an rcu_barrier_callback() callback. + * 2. CPU 1 goes offline, orphaning its callbacks. + * 3. CPU 0 adopts CPU 1's orphaned callbacks. + * 4. CPU 1 comes back online. + * 5. We cause CPU 1 to post an rcu_barrier_callback() callback. + * 6. Both rcu_barrier_callback() callbacks are invoked, awakening + * us -- but before CPU 1's orphaned callbacks are invoked!!! */ atomic_set(&rcu_barrier_cpu_count, 1); - on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1); - if (atomic_dec_and_test(&rcu_barrier_cpu_count)) - complete(&rcu_barrier_completion); - wait_for_completion(&rcu_barrier_completion); + raw_spin_lock_irqsave(&rsp->onofflock, flags); + rsp->rcu_barrier_in_progress = current; + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + + /* + * Force every CPU with callbacks to register a new callback + * that will tell us when all the preceding callbacks have + * been invoked. If an offline CPU has callbacks, wait for + * it to either come back online or to finish orphaning those + * callbacks. + */ + for_each_possible_cpu(cpu) { + preempt_disable(); + rdp = per_cpu_ptr(rsp->rda, cpu); + if (cpu_is_offline(cpu)) { + preempt_enable(); + while (cpu_is_offline(cpu) && + ACCESS_ONCE(rdp->qlen)) + schedule_timeout_interruptible(1); + } else if (ACCESS_ONCE(rdp->qlen)) { + smp_call_function_single(cpu, rcu_barrier_func, + (void *)call_rcu_func, 1); + preempt_enable(); + } + } + + /* + * Force any ongoing CPU-hotplug operations to complete, + * so that any callbacks from the outgoing CPUs are now in + * the orphanage. + */ + cpu_maps_update_begin(); + cpu_maps_update_done(); + + /* + * Now that all online CPUs have rcu_barrier_callback() callbacks + * posted, we can adopt all of the orphaned callbacks and place + * an rcu_barrier_callback() callback after them. When that is done, + * we are guaranteed to have an rcu_barrier_callback() callback + * following every callback that could possibly have been + * registered before _rcu_barrier() was called. + */ + raw_spin_lock_irqsave(&rsp->onofflock, flags); + rcu_adopt_orphan_cbs(rsp); + atomic_inc(&rcu_barrier_cpu_count); + call_rcu_func(&rh, rcu_barrier_callback); + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + + /* + * Now that we have an rcu_barrier_callback() callback on each + * CPU, and thus each counted, remove the initial count. + */ + atomic_dec(&rcu_barrier_cpu_count); + smp_mb__after_atomic_dec(); + + /* + * Loop waiting for all rcu_barrier_callback() callbacks to be + * invoked. Adopt any orphaned callbacks in the meantime, just + * in case one of the rcu_barrier_callback() callbacks is orphaned. + */ + while (atomic_read(&rcu_barrier_cpu_count) > 0) { + wait_event(rcu_barrier_wq, + atomic_read(&rcu_barrier_cpu_count) == 0 || + ACCESS_ONCE(rsp->qlen)); + if (ACCESS_ONCE(rsp->qlen)) { + raw_spin_lock_irqsave(&rsp->onofflock, flags); + rcu_adopt_orphan_cbs(rsp); + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + } + } + + /* + * Done, so let others adopt orphaned callbacks. But avoid + * indefinite postponement of any additional orphans by adopting + * one more time. + */ + raw_spin_lock_irqsave(&rsp->onofflock, flags); + rcu_adopt_orphan_cbs(rsp); + rsp->rcu_barrier_in_progress = NULL; + raw_spin_unlock_irqrestore(&rsp->onofflock, flags); + + /* Other rcu_barrier() invocations can now safely proceed. */ mutex_unlock(&rcu_barrier_mutex); + + destroy_rcu_head_on_stack(&rh); } /** diff --git a/kernel/rcutree.h b/kernel/rcutree.h index 36ca28e..1e49c56 100644 --- a/kernel/rcutree.h +++ b/kernel/rcutree.h @@ -371,6 +371,17 @@ struct rcu_state { raw_spinlock_t onofflock; /* exclude on/offline and */ /* starting new GP. */ + struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */ + /* need a grace period. */ + struct rcu_head **orphan_nxttail; /* Tail of above. */ + struct rcu_head *orphan_donelist; /* Orphaned callbacks that */ + /* are ready to invoke. */ + struct rcu_head **orphan_donetail; /* Tail of above. */ + long qlen_lazy; /* Number of lazy callbacks. */ + long qlen; /* Total number of callbacks. */ + struct task_struct *rcu_barrier_in_progress; + /* Task doing rcu_barrier(), */ + /* or NULL if no barrier. */ raw_spinlock_t fqslock; /* Only one task forcing */ /* quiescent states. */ unsigned long jiffies_force_qs; /* Time at which to invoke */ diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c index ed459ed..d4bc16d 100644 --- a/kernel/rcutree_trace.c +++ b/kernel/rcutree_trace.c @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp) gpnum = rsp->gpnum; seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x " - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n", + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n", rsp->completed, gpnum, rsp->fqs_state, (long)(rsp->jiffies_force_qs - jiffies), (int)(jiffies & 0xffff), rsp->n_force_qs, rsp->n_force_qs_ngp, rsp->n_force_qs - rsp->n_force_qs_ngp, - rsp->n_force_qs_lh); + rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen); for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) { if (rnp->level != level) { seq_puts(m, "\n");