Commit b0546776 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'printk-for-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/printk/linux

Pull printk updates from Petr Mladek:
 "Improve the behavior during panic. The issues were found when testing
  the ongoing changes introducing atomic consoles and printk kthreads:

   - pr_flush() has to wait for the last reserved record instead of the
     last finalized one. Note that records are finalized in random order
     when generated by more CPUs in parallel.

   - Ignore non-finalized records during panic(). Messages printed on
     panic-CPU are always finalized. Messages printed by other CPUs
     might never be finalized when the CPUs get stopped.

   - Block new printk() calls on non-panic CPUs completely. Backtraces
     are printed before entering the panic mode. Later messages would
     just mess information printed by the panic CPU.

   - Do not take console_lock in console_flush_on_panic() at all. The
     original code did try_lock()/console_unlock(). The unlock part
     might cause a deadlock when panic() happened in a scheduler code.

   - Fix conversion of 64-bit sequence number for 32-bit atomic
     operations"

* tag 'printk-for-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/printk/linux:
  dump_stack: Do not get cpu_sync for panic CPU
  panic: Flush kernel log buffer at the end
  printk: Avoid non-panic CPUs writing to ringbuffer
  printk: Disable passing console lock owner completely during panic()
  printk: ringbuffer: Skip non-finalized records in panic
  printk: Wait for all reserved records with pr_flush()
  printk: ringbuffer: Cleanup reader terminology
  printk: Add this_cpu_in_panic()
  printk: For @suppress_panic_printk check for other CPU in panic
  printk: ringbuffer: Clarify special lpos values
  printk: ringbuffer: Do not skip non-finalized records with prb_next_seq()
  printk: Use prb_first_seq() as base for 32bit seq macros
  printk: Adjust mapping for 32bit seq macros
  printk: nbcon: Relocate 32bit seq macros
parents f88c3fb8 7412dc6d
...@@ -273,6 +273,8 @@ static inline void printk_trigger_flush(void) ...@@ -273,6 +273,8 @@ static inline void printk_trigger_flush(void)
} }
#endif #endif
bool this_cpu_in_panic(void);
#ifdef CONFIG_SMP #ifdef CONFIG_SMP
extern int __printk_cpu_sync_try_get(void); extern int __printk_cpu_sync_try_get(void);
extern void __printk_cpu_sync_wait(void); extern void __printk_cpu_sync_wait(void);
......
...@@ -446,6 +446,14 @@ void panic(const char *fmt, ...) ...@@ -446,6 +446,14 @@ void panic(const char *fmt, ...)
/* Do not scroll important messages printed above */ /* Do not scroll important messages printed above */
suppress_printk = 1; suppress_printk = 1;
/*
* The final messages may not have been printed if in a context that
* defers printing (such as NMI) and irq_work is not available.
* Explicitly flush the kernel log buffer one last time.
*/
console_flush_on_panic(CONSOLE_FLUSH_PENDING);
local_irq_enable(); local_irq_enable();
for (i = 0; ; i += PANIC_TIMER_STEP) { for (i = 0; ; i += PANIC_TIMER_STEP) {
touch_softlockup_watchdog(); touch_softlockup_watchdog();
......
...@@ -140,39 +140,6 @@ static inline bool nbcon_state_try_cmpxchg(struct console *con, struct nbcon_sta ...@@ -140,39 +140,6 @@ static inline bool nbcon_state_try_cmpxchg(struct console *con, struct nbcon_sta
return atomic_try_cmpxchg(&ACCESS_PRIVATE(con, nbcon_state), &cur->atom, new->atom); return atomic_try_cmpxchg(&ACCESS_PRIVATE(con, nbcon_state), &cur->atom, new->atom);
} }
#ifdef CONFIG_64BIT
#define __seq_to_nbcon_seq(seq) (seq)
#define __nbcon_seq_to_seq(seq) (seq)
#else /* CONFIG_64BIT */
#define __seq_to_nbcon_seq(seq) ((u32)seq)
static inline u64 __nbcon_seq_to_seq(u32 nbcon_seq)
{
u64 seq;
u64 rb_next_seq;
/*
* The provided sequence is only the lower 32 bits of the ringbuffer
* sequence. It needs to be expanded to 64bit. Get the next sequence
* number from the ringbuffer and fold it.
*
* Having a 32bit representation in the console is sufficient.
* If a console ever gets more than 2^31 records behind
* the ringbuffer then this is the least of the problems.
*
* Also the access to the ring buffer is always safe.
*/
rb_next_seq = prb_next_seq(prb);
seq = rb_next_seq - ((u32)rb_next_seq - nbcon_seq);
return seq;
}
#endif /* CONFIG_64BIT */
/** /**
* nbcon_seq_read - Read the current console sequence * nbcon_seq_read - Read the current console sequence
* @con: Console to read the sequence of * @con: Console to read the sequence of
...@@ -183,7 +150,7 @@ u64 nbcon_seq_read(struct console *con) ...@@ -183,7 +150,7 @@ u64 nbcon_seq_read(struct console *con)
{ {
unsigned long nbcon_seq = atomic_long_read(&ACCESS_PRIVATE(con, nbcon_seq)); unsigned long nbcon_seq = atomic_long_read(&ACCESS_PRIVATE(con, nbcon_seq));
return __nbcon_seq_to_seq(nbcon_seq); return __ulseq_to_u64seq(prb, nbcon_seq);
} }
/** /**
...@@ -204,7 +171,7 @@ void nbcon_seq_force(struct console *con, u64 seq) ...@@ -204,7 +171,7 @@ void nbcon_seq_force(struct console *con, u64 seq)
*/ */
u64 valid_seq = max_t(u64, seq, prb_first_valid_seq(prb)); u64 valid_seq = max_t(u64, seq, prb_first_valid_seq(prb));
atomic_long_set(&ACCESS_PRIVATE(con, nbcon_seq), __seq_to_nbcon_seq(valid_seq)); atomic_long_set(&ACCESS_PRIVATE(con, nbcon_seq), __u64seq_to_ulseq(valid_seq));
/* Clear con->seq since nbcon consoles use con->nbcon_seq instead. */ /* Clear con->seq since nbcon consoles use con->nbcon_seq instead. */
con->seq = 0; con->seq = 0;
...@@ -223,11 +190,11 @@ void nbcon_seq_force(struct console *con, u64 seq) ...@@ -223,11 +190,11 @@ void nbcon_seq_force(struct console *con, u64 seq)
*/ */
static void nbcon_seq_try_update(struct nbcon_context *ctxt, u64 new_seq) static void nbcon_seq_try_update(struct nbcon_context *ctxt, u64 new_seq)
{ {
unsigned long nbcon_seq = __seq_to_nbcon_seq(ctxt->seq); unsigned long nbcon_seq = __u64seq_to_ulseq(ctxt->seq);
struct console *con = ctxt->console; struct console *con = ctxt->console;
if (atomic_long_try_cmpxchg(&ACCESS_PRIVATE(con, nbcon_seq), &nbcon_seq, if (atomic_long_try_cmpxchg(&ACCESS_PRIVATE(con, nbcon_seq), &nbcon_seq,
__seq_to_nbcon_seq(new_seq))) { __u64seq_to_ulseq(new_seq))) {
ctxt->seq = new_seq; ctxt->seq = new_seq;
} else { } else {
ctxt->seq = nbcon_seq_read(con); ctxt->seq = nbcon_seq_read(con);
......
...@@ -347,6 +347,29 @@ static bool panic_in_progress(void) ...@@ -347,6 +347,29 @@ static bool panic_in_progress(void)
return unlikely(atomic_read(&panic_cpu) != PANIC_CPU_INVALID); return unlikely(atomic_read(&panic_cpu) != PANIC_CPU_INVALID);
} }
/* Return true if a panic is in progress on the current CPU. */
bool this_cpu_in_panic(void)
{
/*
* We can use raw_smp_processor_id() here because it is impossible for
* the task to be migrated to the panic_cpu, or away from it. If
* panic_cpu has already been set, and we're not currently executing on
* that CPU, then we never will be.
*/
return unlikely(atomic_read(&panic_cpu) == raw_smp_processor_id());
}
/*
* Return true if a panic is in progress on a remote CPU.
*
* On true, the local CPU should immediately release any printing resources
* that may be needed by the panic CPU.
*/
bool other_cpu_in_panic(void)
{
return (panic_in_progress() && !this_cpu_in_panic());
}
/* /*
* This is used for debugging the mess that is the VT code by * This is used for debugging the mess that is the VT code by
* keeping track if we have the console semaphore held. It's * keeping track if we have the console semaphore held. It's
...@@ -439,12 +462,6 @@ static int console_msg_format = MSG_FORMAT_DEFAULT; ...@@ -439,12 +462,6 @@ static int console_msg_format = MSG_FORMAT_DEFAULT;
static DEFINE_MUTEX(syslog_lock); static DEFINE_MUTEX(syslog_lock);
#ifdef CONFIG_PRINTK #ifdef CONFIG_PRINTK
/*
* During panic, heavy printk by other CPUs can delay the
* panic and risk deadlock on console resources.
*/
static int __read_mostly suppress_panic_printk;
DECLARE_WAIT_QUEUE_HEAD(log_wait); DECLARE_WAIT_QUEUE_HEAD(log_wait);
/* All 3 protected by @syslog_lock. */ /* All 3 protected by @syslog_lock. */
/* the next printk record to read by syslog(READ) or /proc/kmsg */ /* the next printk record to read by syslog(READ) or /proc/kmsg */
...@@ -1835,10 +1852,23 @@ static bool console_waiter; ...@@ -1835,10 +1852,23 @@ static bool console_waiter;
*/ */
static void console_lock_spinning_enable(void) static void console_lock_spinning_enable(void)
{ {
/*
* Do not use spinning in panic(). The panic CPU wants to keep the lock.
* Non-panic CPUs abandon the flush anyway.
*
* Just keep the lockdep annotation. The panic-CPU should avoid
* taking console_owner_lock because it might cause a deadlock.
* This looks like the easiest way how to prevent false lockdep
* reports without handling races a lockless way.
*/
if (panic_in_progress())
goto lockdep;
raw_spin_lock(&console_owner_lock); raw_spin_lock(&console_owner_lock);
console_owner = current; console_owner = current;
raw_spin_unlock(&console_owner_lock); raw_spin_unlock(&console_owner_lock);
lockdep:
/* The waiter may spin on us after setting console_owner */ /* The waiter may spin on us after setting console_owner */
spin_acquire(&console_owner_dep_map, 0, 0, _THIS_IP_); spin_acquire(&console_owner_dep_map, 0, 0, _THIS_IP_);
} }
...@@ -1863,6 +1893,22 @@ static int console_lock_spinning_disable_and_check(int cookie) ...@@ -1863,6 +1893,22 @@ static int console_lock_spinning_disable_and_check(int cookie)
{ {
int waiter; int waiter;
/*
* Ignore spinning waiters during panic() because they might get stopped
* or blocked at any time,
*
* It is safe because nobody is allowed to start spinning during panic
* in the first place. If there has been a waiter then non panic CPUs
* might stay spinning. They would get stopped anyway. The panic context
* will never start spinning and an interrupted spin on panic CPU will
* never continue.
*/
if (panic_in_progress()) {
/* Keep lockdep happy. */
spin_release(&console_owner_dep_map, _THIS_IP_);
return 0;
}
raw_spin_lock(&console_owner_lock); raw_spin_lock(&console_owner_lock);
waiter = READ_ONCE(console_waiter); waiter = READ_ONCE(console_waiter);
console_owner = NULL; console_owner = NULL;
...@@ -2259,8 +2305,12 @@ asmlinkage int vprintk_emit(int facility, int level, ...@@ -2259,8 +2305,12 @@ asmlinkage int vprintk_emit(int facility, int level,
if (unlikely(suppress_printk)) if (unlikely(suppress_printk))
return 0; return 0;
if (unlikely(suppress_panic_printk) && /*
atomic_read(&panic_cpu) != raw_smp_processor_id()) * The messages on the panic CPU are the most important. If
* non-panic CPUs are generating any messages, they will be
* silently dropped.
*/
if (other_cpu_in_panic())
return 0; return 0;
if (level == LOGLEVEL_SCHED) { if (level == LOGLEVEL_SCHED) {
...@@ -2590,26 +2640,6 @@ static int console_cpu_notify(unsigned int cpu) ...@@ -2590,26 +2640,6 @@ static int console_cpu_notify(unsigned int cpu)
return 0; return 0;
} }
/*
* Return true if a panic is in progress on a remote CPU.
*
* On true, the local CPU should immediately release any printing resources
* that may be needed by the panic CPU.
*/
bool other_cpu_in_panic(void)
{
if (!panic_in_progress())
return false;
/*
* We can use raw_smp_processor_id() here because it is impossible for
* the task to be migrated to the panic_cpu, or away from it. If
* panic_cpu has already been set, and we're not currently executing on
* that CPU, then we never will be.
*/
return atomic_read(&panic_cpu) != raw_smp_processor_id();
}
/** /**
* console_lock - block the console subsystem from printing * console_lock - block the console subsystem from printing
* *
...@@ -2765,8 +2795,6 @@ void console_prepend_dropped(struct printk_message *pmsg, unsigned long dropped) ...@@ -2765,8 +2795,6 @@ void console_prepend_dropped(struct printk_message *pmsg, unsigned long dropped)
bool printk_get_next_message(struct printk_message *pmsg, u64 seq, bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
bool is_extended, bool may_suppress) bool is_extended, bool may_suppress)
{ {
static int panic_console_dropped;
struct printk_buffers *pbufs = pmsg->pbufs; struct printk_buffers *pbufs = pmsg->pbufs;
const size_t scratchbuf_sz = sizeof(pbufs->scratchbuf); const size_t scratchbuf_sz = sizeof(pbufs->scratchbuf);
const size_t outbuf_sz = sizeof(pbufs->outbuf); const size_t outbuf_sz = sizeof(pbufs->outbuf);
...@@ -2794,17 +2822,6 @@ bool printk_get_next_message(struct printk_message *pmsg, u64 seq, ...@@ -2794,17 +2822,6 @@ bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
pmsg->seq = r.info->seq; pmsg->seq = r.info->seq;
pmsg->dropped = r.info->seq - seq; pmsg->dropped = r.info->seq - seq;
/*
* Check for dropped messages in panic here so that printk
* suppression can occur as early as possible if necessary.
*/
if (pmsg->dropped &&
panic_in_progress() &&
panic_console_dropped++ > 10) {
suppress_panic_printk = 1;
pr_warn_once("Too many dropped messages. Suppress messages on non-panic CPUs to prevent livelock.\n");
}
/* Skip record that has level above the console loglevel. */ /* Skip record that has level above the console loglevel. */
if (may_suppress && suppress_message_printing(r.info->level)) if (may_suppress && suppress_message_printing(r.info->level))
goto out; goto out;
...@@ -3750,7 +3767,7 @@ static bool __pr_flush(struct console *con, int timeout_ms, bool reset_on_progre ...@@ -3750,7 +3767,7 @@ static bool __pr_flush(struct console *con, int timeout_ms, bool reset_on_progre
might_sleep(); might_sleep();
seq = prb_next_seq(prb); seq = prb_next_reserve_seq(prb);
/* Flush the consoles so that records up to @seq are printed. */ /* Flush the consoles so that records up to @seq are printed. */
console_lock(); console_lock();
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <linux/errno.h> #include <linux/errno.h>
#include <linux/bug.h> #include <linux/bug.h>
#include "printk_ringbuffer.h" #include "printk_ringbuffer.h"
#include "internal.h"
/** /**
* DOC: printk_ringbuffer overview * DOC: printk_ringbuffer overview
...@@ -303,6 +304,9 @@ ...@@ -303,6 +304,9 @@
* *
* desc_push_tail:B / desc_reserve:D * desc_push_tail:B / desc_reserve:D
* set descriptor reusable (state), then push descriptor tail (id) * set descriptor reusable (state), then push descriptor tail (id)
*
* desc_update_last_finalized:A / desc_last_finalized_seq:A
* store finalized record, then set new highest finalized sequence number
*/ */
#define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits) #define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits)
...@@ -1030,9 +1034,13 @@ static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size, ...@@ -1030,9 +1034,13 @@ static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size,
unsigned long next_lpos; unsigned long next_lpos;
if (size == 0) { if (size == 0) {
/* Specify a data-less block. */ /*
blk_lpos->begin = NO_LPOS; * Data blocks are not created for empty lines. Instead, the
blk_lpos->next = NO_LPOS; * reader will recognize these special lpos values and handle
* it appropriately.
*/
blk_lpos->begin = EMPTY_LINE_LPOS;
blk_lpos->next = EMPTY_LINE_LPOS;
return NULL; return NULL;
} }
...@@ -1210,10 +1218,18 @@ static const char *get_data(struct prb_data_ring *data_ring, ...@@ -1210,10 +1218,18 @@ static const char *get_data(struct prb_data_ring *data_ring,
/* Data-less data block description. */ /* Data-less data block description. */
if (BLK_DATALESS(blk_lpos)) { if (BLK_DATALESS(blk_lpos)) {
if (blk_lpos->begin == NO_LPOS && blk_lpos->next == NO_LPOS) { /*
* Records that are just empty lines are also valid, even
* though they do not have a data block. For such records
* explicitly return empty string data to signify success.
*/
if (blk_lpos->begin == EMPTY_LINE_LPOS &&
blk_lpos->next == EMPTY_LINE_LPOS) {
*data_size = 0; *data_size = 0;
return ""; return "";
} }
/* Data lost, invalid, or otherwise unavailable. */
return NULL; return NULL;
} }
...@@ -1441,20 +1457,118 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer ...@@ -1441,20 +1457,118 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
return false; return false;
} }
/*
* @last_finalized_seq value guarantees that all records up to and including
* this sequence number are finalized and can be read. The only exception are
* too old records which have already been overwritten.
*
* It is also guaranteed that @last_finalized_seq only increases.
*
* Be aware that finalized records following non-finalized records are not
* reported because they are not yet available to the reader. For example,
* a new record stored via printk() will not be available to a printer if
* it follows a record that has not been finalized yet. However, once that
* non-finalized record becomes finalized, @last_finalized_seq will be
* appropriately updated and the full set of finalized records will be
* available to the printer. And since each printk() caller will either
* directly print or trigger deferred printing of all available unprinted
* records, all printk() messages will get printed.
*/
static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long ulseq;
/*
* Guarantee the sequence number is loaded before loading the
* associated record in order to guarantee that the record can be
* seen by this CPU. This pairs with desc_update_last_finalized:A.
*/
ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
); /* LMM(desc_last_finalized_seq:A) */
return __ulseq_to_u64seq(rb, ulseq);
}
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
struct printk_record *r, unsigned int *line_count);
/*
* Check if there are records directly following @last_finalized_seq that are
* finalized. If so, update @last_finalized_seq to the latest of these
* records. It is not allowed to skip over records that are not yet finalized.
*/
static void desc_update_last_finalized(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
u64 old_seq = desc_last_finalized_seq(rb);
unsigned long oldval;
unsigned long newval;
u64 finalized_seq;
u64 try_seq;
try_again:
finalized_seq = old_seq;
try_seq = finalized_seq + 1;
/* Try to find later finalized records. */
while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
finalized_seq = try_seq;
try_seq++;
}
/* No update needed if no later finalized record was found. */
if (finalized_seq == old_seq)
return;
oldval = __u64seq_to_ulseq(old_seq);
newval = __u64seq_to_ulseq(finalized_seq);
/*
* Set the sequence number of a later finalized record that has been
* seen.
*
* Guarantee the record data is visible to other CPUs before storing
* its sequence number. This pairs with desc_last_finalized_seq:A.
*
* Memory barrier involvement:
*
* If desc_last_finalized_seq:A reads from
* desc_update_last_finalized:A, then desc_read:A reads from
* _prb_commit:B.
*
* Relies on:
*
* RELEASE from _prb_commit:B to desc_update_last_finalized:A
* matching
* ACQUIRE from desc_last_finalized_seq:A to desc_read:A
*
* Note: _prb_commit:B and desc_update_last_finalized:A can be
* different CPUs. However, the desc_update_last_finalized:A
* CPU (which performs the release) must have previously seen
* _prb_commit:B.
*/
if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
old_seq = __ulseq_to_u64seq(rb, oldval);
goto try_again;
}
}
/* /*
* Attempt to finalize a specified descriptor. If this fails, the descriptor * Attempt to finalize a specified descriptor. If this fails, the descriptor
* is either already final or it will finalize itself when the writer commits. * is either already final or it will finalize itself when the writer commits.
*/ */
static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id) static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
{ {
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long prev_state_val = DESC_SV(id, desc_committed); unsigned long prev_state_val = DESC_SV(id, desc_committed);
struct prb_desc *d = to_desc(desc_ring, id); struct prb_desc *d = to_desc(desc_ring, id);
atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val, if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */ DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
desc_update_last_finalized(rb);
/* Best effort to remember the last finalized @id. */ }
atomic_long_set(&desc_ring->last_finalized_id, id);
} }
/** /**
...@@ -1550,7 +1664,7 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, ...@@ -1550,7 +1664,7 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
* readers. (For seq==0 there is no previous descriptor.) * readers. (For seq==0 there is no previous descriptor.)
*/ */
if (info->seq > 0) if (info->seq > 0)
desc_make_final(desc_ring, DESC_ID(id - 1)); desc_make_final(rb, DESC_ID(id - 1));
r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id); r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
/* If text data allocation fails, a data-less record is committed. */ /* If text data allocation fails, a data-less record is committed. */
...@@ -1643,7 +1757,7 @@ void prb_commit(struct prb_reserved_entry *e) ...@@ -1643,7 +1757,7 @@ void prb_commit(struct prb_reserved_entry *e)
*/ */
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */ head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
if (head_id != e->id) if (head_id != e->id)
desc_make_final(desc_ring, e->id); desc_make_final(e->rb, e->id);
} }
/** /**
...@@ -1663,12 +1777,9 @@ void prb_commit(struct prb_reserved_entry *e) ...@@ -1663,12 +1777,9 @@ void prb_commit(struct prb_reserved_entry *e)
*/ */
void prb_final_commit(struct prb_reserved_entry *e) void prb_final_commit(struct prb_reserved_entry *e)
{ {
struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
_prb_commit(e, desc_finalized); _prb_commit(e, desc_finalized);
/* Best effort to remember the last finalized @id. */ desc_update_last_finalized(e->rb);
atomic_long_set(&desc_ring->last_finalized_id, e->id);
} }
/* /*
...@@ -1832,7 +1943,7 @@ static int prb_read(struct printk_ringbuffer *rb, u64 seq, ...@@ -1832,7 +1943,7 @@ static int prb_read(struct printk_ringbuffer *rb, u64 seq,
} }
/* Get the sequence number of the tail descriptor. */ /* Get the sequence number of the tail descriptor. */
static u64 prb_first_seq(struct printk_ringbuffer *rb) u64 prb_first_seq(struct printk_ringbuffer *rb)
{ {
struct prb_desc_ring *desc_ring = &rb->desc_ring; struct prb_desc_ring *desc_ring = &rb->desc_ring;
enum desc_state d_state; enum desc_state d_state;
...@@ -1875,12 +1986,123 @@ static u64 prb_first_seq(struct printk_ringbuffer *rb) ...@@ -1875,12 +1986,123 @@ static u64 prb_first_seq(struct printk_ringbuffer *rb)
return seq; return seq;
} }
/**
* prb_next_reserve_seq() - Get the sequence number after the most recently
* reserved record.
*
* @rb: The ringbuffer to get the sequence number from.
*
* This is the public function available to readers to see what sequence
* number will be assigned to the next reserved record.
*
* Note that depending on the situation, this value can be equal to or
* higher than the sequence number returned by prb_next_seq().
*
* Context: Any context.
* Return: The sequence number that will be assigned to the next record
* reserved.
*/
u64 prb_next_reserve_seq(struct printk_ringbuffer *rb)
{
struct prb_desc_ring *desc_ring = &rb->desc_ring;
unsigned long last_finalized_id;
atomic_long_t *state_var;
u64 last_finalized_seq;
unsigned long head_id;
struct prb_desc desc;
unsigned long diff;
struct prb_desc *d;
int err;
/*
* It may not be possible to read a sequence number for @head_id.
* So the ID of @last_finailzed_seq is used to calculate what the
* sequence number of @head_id will be.
*/
try_again:
last_finalized_seq = desc_last_finalized_seq(rb);
/*
* @head_id is loaded after @last_finalized_seq to ensure that
* it points to the record with @last_finalized_seq or newer.
*
* Memory barrier involvement:
*
* If desc_last_finalized_seq:A reads from
* desc_update_last_finalized:A, then
* prb_next_reserve_seq:A reads from desc_reserve:D.
*
* Relies on:
*
* RELEASE from desc_reserve:D to desc_update_last_finalized:A
* matching
* ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A
*
* Note: desc_reserve:D and desc_update_last_finalized:A can be
* different CPUs. However, the desc_update_last_finalized:A CPU
* (which performs the release) must have previously seen
* desc_read:C, which implies desc_reserve:D can be seen.
*/
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */
d = to_desc(desc_ring, last_finalized_seq);
state_var = &d->state_var;
/* Extract the ID, used to specify the descriptor to read. */
last_finalized_id = DESC_ID(atomic_long_read(state_var));
/* Ensure @last_finalized_id is correct. */
err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc);
if (err == -EINVAL) {
if (last_finalized_seq == 0) {
/*
* No record has been finalized or even reserved yet.
*
* The @head_id is initialized such that the first
* increment will yield the first record (seq=0).
* Handle it separately to avoid a negative @diff
* below.
*/
if (head_id == DESC0_ID(desc_ring->count_bits))
return 0;
/*
* One or more descriptors are already reserved. Use
* the descriptor ID of the first one (@seq=0) for
* the @diff below.
*/
last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1;
} else {
/* Record must have been overwritten. Try again. */
goto try_again;
}
}
/* Diff of known descriptor IDs to compute related sequence numbers. */
diff = head_id - last_finalized_id;
/*
* @head_id points to the most recently reserved record, but this
* function returns the sequence number that will be assigned to the
* next (not yet reserved) record. Thus +1 is needed.
*/
return (last_finalized_seq + diff + 1);
}
/* /*
* Non-blocking read of a record. Updates @seq to the last finalized record * Non-blocking read of a record.
* (which may have no data available).
* *
* See the description of prb_read_valid() and prb_read_valid_info() * On success @seq is updated to the record that was read and (if provided)
* for details. * @r and @line_count will contain the read/calculated data.
*
* On failure @seq is updated to a record that is not yet available to the
* reader, but it will be the next record available to the reader.
*
* Note: When the current CPU is in panic, this function will skip over any
* non-existent/non-finalized records in order to allow the panic CPU
* to print any and all records that have been finalized.
*/ */
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
struct printk_record *r, unsigned int *line_count) struct printk_record *r, unsigned int *line_count)
...@@ -1899,11 +2121,31 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, ...@@ -1899,11 +2121,31 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
*seq = tail_seq; *seq = tail_seq;
} else if (err == -ENOENT) { } else if (err == -ENOENT) {
/* Record exists, but no data available. Skip. */ /* Record exists, but the data was lost. Skip. */
(*seq)++; (*seq)++;
} else { } else {
/* Non-existent/non-finalized record. Must stop. */ /*
* Non-existent/non-finalized record. Must stop.
*
* For panic situations it cannot be expected that
* non-finalized records will become finalized. But
* there may be other finalized records beyond that
* need to be printed for a panic situation. If this
* is the panic CPU, skip this
* non-existent/non-finalized record unless it is
* at or beyond the head, in which case it is not
* possible to continue.
*
* Note that new messages printed on panic CPU are
* finalized when we are here. The only exception
* might be the last message without trailing newline.
* But it would have the sequence number returned
* by "prb_next_reserve_seq() - 1".
*/
if (this_cpu_in_panic() && ((*seq + 1) < prb_next_reserve_seq(rb)))
(*seq)++;
else
return false; return false;
} }
} }
...@@ -1932,7 +2174,7 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, ...@@ -1932,7 +2174,7 @@ static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
* On success, the reader must check r->info.seq to see which record was * On success, the reader must check r->info.seq to see which record was
* actually read. This allows the reader to detect dropped records. * actually read. This allows the reader to detect dropped records.
* *
* Failure means @seq refers to a not yet written record. * Failure means @seq refers to a record not yet available to the reader.
*/ */
bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq, bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
struct printk_record *r) struct printk_record *r)
...@@ -1962,7 +2204,7 @@ bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq, ...@@ -1962,7 +2204,7 @@ bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
* On success, the reader must check info->seq to see which record meta data * On success, the reader must check info->seq to see which record meta data
* was actually read. This allows the reader to detect dropped records. * was actually read. This allows the reader to detect dropped records.
* *
* Failure means @seq refers to a not yet written record. * Failure means @seq refers to a record not yet available to the reader.
*/ */
bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq, bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
struct printk_info *info, unsigned int *line_count) struct printk_info *info, unsigned int *line_count)
...@@ -2008,7 +2250,9 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb) ...@@ -2008,7 +2250,9 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
* newest sequence number available to readers will be. * newest sequence number available to readers will be.
* *
* This provides readers a sequence number to jump to if all currently * This provides readers a sequence number to jump to if all currently
* available records should be skipped. * available records should be skipped. It is guaranteed that all records
* previous to the returned value have been finalized and are (or were)
* available to the reader.
* *
* Context: Any context. * Context: Any context.
* Return: The sequence number of the next newest (not yet available) record * Return: The sequence number of the next newest (not yet available) record
...@@ -2016,16 +2260,10 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb) ...@@ -2016,16 +2260,10 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
*/ */
u64 prb_next_seq(struct printk_ringbuffer *rb) u64 prb_next_seq(struct printk_ringbuffer *rb)
{ {
struct prb_desc_ring *desc_ring = &rb->desc_ring;
enum desc_state d_state;
unsigned long id;
u64 seq; u64 seq;
/* Check if the cached @id still points to a valid @seq. */ seq = desc_last_finalized_seq(rb);
id = atomic_long_read(&desc_ring->last_finalized_id);
d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
if (d_state == desc_finalized || d_state == desc_reusable) {
/* /*
* Begin searching after the last finalized record. * Begin searching after the last finalized record.
* *
...@@ -2035,15 +2273,6 @@ u64 prb_next_seq(struct printk_ringbuffer *rb) ...@@ -2035,15 +2273,6 @@ u64 prb_next_seq(struct printk_ringbuffer *rb)
*/ */
if (seq != 0) if (seq != 0)
seq++; seq++;
} else {
/*
* The information about the last finalized sequence number
* has gone. It should happen only when there is a flood of
* new messages and the ringbuffer is rapidly recycled.
* Give up and start from the beginning.
*/
seq = 0;
}
/* /*
* The information about the last finalized @seq might be inaccurate. * The information about the last finalized @seq might be inaccurate.
...@@ -2085,7 +2314,7 @@ void prb_init(struct printk_ringbuffer *rb, ...@@ -2085,7 +2314,7 @@ void prb_init(struct printk_ringbuffer *rb,
rb->desc_ring.infos = infos; rb->desc_ring.infos = infos;
atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits)); atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits)); atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
atomic_long_set(&rb->desc_ring.last_finalized_id, DESC0_ID(descbits)); atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
rb->text_data_ring.size_bits = textbits; rb->text_data_ring.size_bits = textbits;
rb->text_data_ring.data = text_buf; rb->text_data_ring.data = text_buf;
......
...@@ -75,7 +75,7 @@ struct prb_desc_ring { ...@@ -75,7 +75,7 @@ struct prb_desc_ring {
struct printk_info *infos; struct printk_info *infos;
atomic_long_t head_id; atomic_long_t head_id;
atomic_long_t tail_id; atomic_long_t tail_id;
atomic_long_t last_finalized_id; atomic_long_t last_finalized_seq;
}; };
/* /*
...@@ -127,8 +127,22 @@ enum desc_state { ...@@ -127,8 +127,22 @@ enum desc_state {
#define DESC_SV(id, state) (((unsigned long)state << DESC_FLAGS_SHIFT) | id) #define DESC_SV(id, state) (((unsigned long)state << DESC_FLAGS_SHIFT) | id)
#define DESC_ID_MASK (~DESC_FLAGS_MASK) #define DESC_ID_MASK (~DESC_FLAGS_MASK)
#define DESC_ID(sv) ((sv) & DESC_ID_MASK) #define DESC_ID(sv) ((sv) & DESC_ID_MASK)
/*
* Special data block logical position values (for fields of
* @prb_desc.text_blk_lpos).
*
* - Bit0 is used to identify if the record has no data block. (Implemented in
* the LPOS_DATALESS() macro.)
*
* - Bit1 specifies the reason for not having a data block.
*
* These special values could never be real lpos values because of the
* meta data and alignment padding of data blocks. (See to_blk_size() for
* details.)
*/
#define FAILED_LPOS 0x1 #define FAILED_LPOS 0x1
#define NO_LPOS 0x3 #define EMPTY_LINE_LPOS 0x3
#define FAILED_BLK_LPOS \ #define FAILED_BLK_LPOS \
{ \ { \
...@@ -259,7 +273,7 @@ static struct printk_ringbuffer name = { \ ...@@ -259,7 +273,7 @@ static struct printk_ringbuffer name = { \
.infos = &_##name##_infos[0], \ .infos = &_##name##_infos[0], \
.head_id = ATOMIC_INIT(DESC0_ID(descbits)), \ .head_id = ATOMIC_INIT(DESC0_ID(descbits)), \
.tail_id = ATOMIC_INIT(DESC0_ID(descbits)), \ .tail_id = ATOMIC_INIT(DESC0_ID(descbits)), \
.last_finalized_id = ATOMIC_INIT(DESC0_ID(descbits)), \ .last_finalized_seq = ATOMIC_INIT(0), \
}, \ }, \
.text_data_ring = { \ .text_data_ring = { \
.size_bits = (avgtextbits) + (descbits), \ .size_bits = (avgtextbits) + (descbits), \
...@@ -378,7 +392,41 @@ bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq, ...@@ -378,7 +392,41 @@ bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq, bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
struct printk_info *info, unsigned int *line_count); struct printk_info *info, unsigned int *line_count);
u64 prb_first_seq(struct printk_ringbuffer *rb);
u64 prb_first_valid_seq(struct printk_ringbuffer *rb); u64 prb_first_valid_seq(struct printk_ringbuffer *rb);
u64 prb_next_seq(struct printk_ringbuffer *rb); u64 prb_next_seq(struct printk_ringbuffer *rb);
u64 prb_next_reserve_seq(struct printk_ringbuffer *rb);
#ifdef CONFIG_64BIT
#define __u64seq_to_ulseq(u64seq) (u64seq)
#define __ulseq_to_u64seq(rb, ulseq) (ulseq)
#else /* CONFIG_64BIT */
#define __u64seq_to_ulseq(u64seq) ((u32)u64seq)
static inline u64 __ulseq_to_u64seq(struct printk_ringbuffer *rb, u32 ulseq)
{
u64 rb_first_seq = prb_first_seq(rb);
u64 seq;
/*
* The provided sequence is only the lower 32 bits of the ringbuffer
* sequence. It needs to be expanded to 64bit. Get the first sequence
* number from the ringbuffer and fold it.
*
* Having a 32bit representation in the console is sufficient.
* If a console ever gets more than 2^31 records behind
* the ringbuffer then this is the least of the problems.
*
* Also the access to the ring buffer is always safe.
*/
seq = rb_first_seq - (s32)((u32)rb_first_seq - ulseq);
return seq;
}
#endif /* CONFIG_64BIT */
#endif /* _KERNEL_PRINTK_RINGBUFFER_H */ #endif /* _KERNEL_PRINTK_RINGBUFFER_H */
...@@ -96,14 +96,24 @@ static void __dump_stack(const char *log_lvl) ...@@ -96,14 +96,24 @@ static void __dump_stack(const char *log_lvl)
*/ */
asmlinkage __visible void dump_stack_lvl(const char *log_lvl) asmlinkage __visible void dump_stack_lvl(const char *log_lvl)
{ {
bool in_panic = this_cpu_in_panic();
unsigned long flags; unsigned long flags;
/* /*
* Permit this cpu to perform nested stack dumps while serialising * Permit this cpu to perform nested stack dumps while serialising
* against other CPUs * against other CPUs, unless this CPU is in panic.
*
* When in panic, non-panic CPUs are not permitted to store new
* printk messages so there is no need to synchronize the output.
* This avoids potential deadlock in panic() if another CPU is
* holding and unable to release the printk_cpu_sync.
*/ */
if (!in_panic)
printk_cpu_sync_get_irqsave(flags); printk_cpu_sync_get_irqsave(flags);
__dump_stack(log_lvl); __dump_stack(log_lvl);
if (!in_panic)
printk_cpu_sync_put_irqrestore(flags); printk_cpu_sync_put_irqrestore(flags);
} }
EXPORT_SYMBOL(dump_stack_lvl); EXPORT_SYMBOL(dump_stack_lvl);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment