Commit 15693458 authored by Steven Rostedt (Red Hat)'s avatar Steven Rostedt (Red Hat) Committed by Steven Rostedt

tracing/ring-buffer: Move poll wake ups into ring buffer code

Move the logic to wake up on ring buffer data into the ring buffer
code itself. This simplifies the tracing code a lot and also has the
added benefit that waiters on one of the instance buffers can be woken
only when data is added to that instance instead of data added to
any instance.
Signed-off-by: default avatarSteven Rostedt <rostedt@goodmis.org>
parent b627344f
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <linux/kmemcheck.h> #include <linux/kmemcheck.h>
#include <linux/mm.h> #include <linux/mm.h>
#include <linux/seq_file.h> #include <linux/seq_file.h>
#include <linux/poll.h>
struct ring_buffer; struct ring_buffer;
struct ring_buffer_iter; struct ring_buffer_iter;
...@@ -96,6 +97,11 @@ __ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *k ...@@ -96,6 +97,11 @@ __ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *k
__ring_buffer_alloc((size), (flags), &__key); \ __ring_buffer_alloc((size), (flags), &__key); \
}) })
void ring_buffer_wait(struct ring_buffer *buffer, int cpu);
int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
struct file *filp, poll_table *poll_table);
#define RING_BUFFER_ALL_CPUS -1 #define RING_BUFFER_ALL_CPUS -1
void ring_buffer_free(struct ring_buffer *buffer); void ring_buffer_free(struct ring_buffer *buffer);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <linux/trace_clock.h> #include <linux/trace_clock.h>
#include <linux/trace_seq.h> #include <linux/trace_seq.h>
#include <linux/spinlock.h> #include <linux/spinlock.h>
#include <linux/irq_work.h>
#include <linux/debugfs.h> #include <linux/debugfs.h>
#include <linux/uaccess.h> #include <linux/uaccess.h>
#include <linux/hardirq.h> #include <linux/hardirq.h>
...@@ -442,6 +443,12 @@ int ring_buffer_print_page_header(struct trace_seq *s) ...@@ -442,6 +443,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)
return ret; return ret;
} }
struct rb_irq_work {
struct irq_work work;
wait_queue_head_t waiters;
bool waiters_pending;
};
/* /*
* head_page == tail_page && head == tail then buffer is empty. * head_page == tail_page && head == tail then buffer is empty.
*/ */
...@@ -476,6 +483,8 @@ struct ring_buffer_per_cpu { ...@@ -476,6 +483,8 @@ struct ring_buffer_per_cpu {
struct list_head new_pages; /* new pages to add */ struct list_head new_pages; /* new pages to add */
struct work_struct update_pages_work; struct work_struct update_pages_work;
struct completion update_done; struct completion update_done;
struct rb_irq_work irq_work;
}; };
struct ring_buffer { struct ring_buffer {
...@@ -495,6 +504,8 @@ struct ring_buffer { ...@@ -495,6 +504,8 @@ struct ring_buffer {
struct notifier_block cpu_notify; struct notifier_block cpu_notify;
#endif #endif
u64 (*clock)(void); u64 (*clock)(void);
struct rb_irq_work irq_work;
}; };
struct ring_buffer_iter { struct ring_buffer_iter {
...@@ -506,6 +517,118 @@ struct ring_buffer_iter { ...@@ -506,6 +517,118 @@ struct ring_buffer_iter {
u64 read_stamp; u64 read_stamp;
}; };
/*
* rb_wake_up_waiters - wake up tasks waiting for ring buffer input
*
* Schedules a delayed work to wake up any task that is blocked on the
* ring buffer waiters queue.
*/
static void rb_wake_up_waiters(struct irq_work *work)
{
struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
wake_up_all(&rbwork->waiters);
}
/**
* ring_buffer_wait - wait for input to the ring buffer
* @buffer: buffer to wait on
* @cpu: the cpu buffer to wait on
*
* If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
* as data is added to any of the @buffer's cpu buffers. Otherwise
* it will wait for data to be added to a specific cpu buffer.
*/
void ring_buffer_wait(struct ring_buffer *buffer, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
DEFINE_WAIT(wait);
struct rb_irq_work *work;
/*
* Depending on what the caller is waiting for, either any
* data in any cpu buffer, or a specific buffer, put the
* caller on the appropriate wait queue.
*/
if (cpu == RING_BUFFER_ALL_CPUS)
work = &buffer->irq_work;
else {
cpu_buffer = buffer->buffers[cpu];
work = &cpu_buffer->irq_work;
}
prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
/*
* The events can happen in critical sections where
* checking a work queue can cause deadlocks.
* After adding a task to the queue, this flag is set
* only to notify events to try to wake up the queue
* using irq_work.
*
* We don't clear it even if the buffer is no longer
* empty. The flag only causes the next event to run
* irq_work to do the work queue wake up. The worse
* that can happen if we race with !trace_empty() is that
* an event will cause an irq_work to try to wake up
* an empty queue.
*
* There's no reason to protect this flag either, as
* the work queue and irq_work logic will do the necessary
* synchronization for the wake ups. The only thing
* that is necessary is that the wake up happens after
* a task has been queued. It's OK for spurious wake ups.
*/
work->waiters_pending = true;
if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty(buffer)) ||
(cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu(buffer, cpu)))
schedule();
finish_wait(&work->waiters, &wait);
}
/**
* ring_buffer_poll_wait - poll on buffer input
* @buffer: buffer to wait on
* @cpu: the cpu buffer to wait on
* @filp: the file descriptor
* @poll_table: The poll descriptor
*
* If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
* as data is added to any of the @buffer's cpu buffers. Otherwise
* it will wait for data to be added to a specific cpu buffer.
*
* Returns POLLIN | POLLRDNORM if data exists in the buffers,
* zero otherwise.
*/
int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
struct file *filp, poll_table *poll_table)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct rb_irq_work *work;
if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
(cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
return POLLIN | POLLRDNORM;
if (cpu == RING_BUFFER_ALL_CPUS)
work = &buffer->irq_work;
else {
cpu_buffer = buffer->buffers[cpu];
work = &cpu_buffer->irq_work;
}
work->waiters_pending = true;
poll_wait(filp, &work->waiters, poll_table);
if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
(cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
return POLLIN | POLLRDNORM;
return 0;
}
/* buffer may be either ring_buffer or ring_buffer_per_cpu */ /* buffer may be either ring_buffer or ring_buffer_per_cpu */
#define RB_WARN_ON(b, cond) \ #define RB_WARN_ON(b, cond) \
({ \ ({ \
...@@ -1061,6 +1184,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu) ...@@ -1061,6 +1184,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
init_completion(&cpu_buffer->update_done); init_completion(&cpu_buffer->update_done);
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu)); GFP_KERNEL, cpu_to_node(cpu));
...@@ -1156,6 +1280,8 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, ...@@ -1156,6 +1280,8 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
buffer->clock = trace_clock_local; buffer->clock = trace_clock_local;
buffer->reader_lock_key = key; buffer->reader_lock_key = key;
init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
/* need at least two pages */ /* need at least two pages */
if (nr_pages < 2) if (nr_pages < 2)
nr_pages = 2; nr_pages = 2;
...@@ -2610,6 +2736,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, ...@@ -2610,6 +2736,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
rb_end_commit(cpu_buffer); rb_end_commit(cpu_buffer);
} }
static __always_inline void
rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
{
if (buffer->irq_work.waiters_pending) {
buffer->irq_work.waiters_pending = false;
/* irq_work_queue() supplies it's own memory barriers */
irq_work_queue(&buffer->irq_work.work);
}
if (cpu_buffer->irq_work.waiters_pending) {
cpu_buffer->irq_work.waiters_pending = false;
/* irq_work_queue() supplies it's own memory barriers */
irq_work_queue(&cpu_buffer->irq_work.work);
}
}
/** /**
* ring_buffer_unlock_commit - commit a reserved * ring_buffer_unlock_commit - commit a reserved
* @buffer: The buffer to commit to * @buffer: The buffer to commit to
...@@ -2629,6 +2771,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer, ...@@ -2629,6 +2771,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,
rb_commit(cpu_buffer, event); rb_commit(cpu_buffer, event);
rb_wakeups(buffer, cpu_buffer);
trace_recursive_unlock(); trace_recursive_unlock();
preempt_enable_notrace(); preempt_enable_notrace();
...@@ -2801,6 +2945,8 @@ int ring_buffer_write(struct ring_buffer *buffer, ...@@ -2801,6 +2945,8 @@ int ring_buffer_write(struct ring_buffer *buffer,
rb_commit(cpu_buffer, event); rb_commit(cpu_buffer, event);
rb_wakeups(buffer, cpu_buffer);
ret = 0; ret = 0;
out: out:
preempt_enable_notrace(); preempt_enable_notrace();
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include <linux/seq_file.h> #include <linux/seq_file.h>
#include <linux/notifier.h> #include <linux/notifier.h>
#include <linux/irqflags.h> #include <linux/irqflags.h>
#include <linux/irq_work.h>
#include <linux/debugfs.h> #include <linux/debugfs.h>
#include <linux/pagemap.h> #include <linux/pagemap.h>
#include <linux/hardirq.h> #include <linux/hardirq.h>
...@@ -86,14 +85,6 @@ static int dummy_set_flag(u32 old_flags, u32 bit, int set) ...@@ -86,14 +85,6 @@ static int dummy_set_flag(u32 old_flags, u32 bit, int set)
*/ */
static DEFINE_PER_CPU(bool, trace_cmdline_save); static DEFINE_PER_CPU(bool, trace_cmdline_save);
/*
* When a reader is waiting for data, then this variable is
* set to true.
*/
static bool trace_wakeup_needed;
static struct irq_work trace_work_wakeup;
/* /*
* Kill all tracing for good (never come back). * Kill all tracing for good (never come back).
* It is initialized to 1 but will turn to zero if the initialization * It is initialized to 1 but will turn to zero if the initialization
...@@ -334,28 +325,12 @@ static inline void trace_access_lock_init(void) ...@@ -334,28 +325,12 @@ static inline void trace_access_lock_init(void)
#endif #endif
/* trace_wait is a waitqueue for tasks blocked on trace_poll */
static DECLARE_WAIT_QUEUE_HEAD(trace_wait);
/* trace_flags holds trace_options default values */ /* trace_flags holds trace_options default values */
unsigned long trace_flags = TRACE_ITER_PRINT_PARENT | TRACE_ITER_PRINTK | unsigned long trace_flags = TRACE_ITER_PRINT_PARENT | TRACE_ITER_PRINTK |
TRACE_ITER_ANNOTATE | TRACE_ITER_CONTEXT_INFO | TRACE_ITER_SLEEP_TIME | TRACE_ITER_ANNOTATE | TRACE_ITER_CONTEXT_INFO | TRACE_ITER_SLEEP_TIME |
TRACE_ITER_GRAPH_TIME | TRACE_ITER_RECORD_CMD | TRACE_ITER_OVERWRITE | TRACE_ITER_GRAPH_TIME | TRACE_ITER_RECORD_CMD | TRACE_ITER_OVERWRITE |
TRACE_ITER_IRQ_INFO | TRACE_ITER_MARKERS; TRACE_ITER_IRQ_INFO | TRACE_ITER_MARKERS;
/**
* trace_wake_up - wake up tasks waiting for trace input
*
* Schedules a delayed work to wake up any task that is blocked on the
* trace_wait queue. These is used with trace_poll for tasks polling the
* trace.
*/
static void trace_wake_up(struct irq_work *work)
{
wake_up_all(&trace_wait);
}
/** /**
* tracing_on - enable tracing buffers * tracing_on - enable tracing buffers
* *
...@@ -763,36 +738,11 @@ update_max_tr_single(struct trace_array *tr, struct task_struct *tsk, int cpu) ...@@ -763,36 +738,11 @@ update_max_tr_single(struct trace_array *tr, struct task_struct *tsk, int cpu)
static void default_wait_pipe(struct trace_iterator *iter) static void default_wait_pipe(struct trace_iterator *iter)
{ {
DEFINE_WAIT(wait); /* Iterators are static, they should be filled or empty */
if (trace_buffer_iter(iter, iter->cpu_file))
prepare_to_wait(&trace_wait, &wait, TASK_INTERRUPTIBLE); return;
/*
* The events can happen in critical sections where
* checking a work queue can cause deadlocks.
* After adding a task to the queue, this flag is set
* only to notify events to try to wake up the queue
* using irq_work.
*
* We don't clear it even if the buffer is no longer
* empty. The flag only causes the next event to run
* irq_work to do the work queue wake up. The worse
* that can happen if we race with !trace_empty() is that
* an event will cause an irq_work to try to wake up
* an empty queue.
*
* There's no reason to protect this flag either, as
* the work queue and irq_work logic will do the necessary
* synchronization for the wake ups. The only thing
* that is necessary is that the wake up happens after
* a task has been queued. It's OK for spurious wake ups.
*/
trace_wakeup_needed = true;
if (trace_empty(iter))
schedule();
finish_wait(&trace_wait, &wait); ring_buffer_wait(iter->tr->buffer, iter->cpu_file);
} }
/** /**
...@@ -1262,11 +1212,6 @@ void ...@@ -1262,11 +1212,6 @@ void
__buffer_unlock_commit(struct ring_buffer *buffer, struct ring_buffer_event *event) __buffer_unlock_commit(struct ring_buffer *buffer, struct ring_buffer_event *event)
{ {
__this_cpu_write(trace_cmdline_save, true); __this_cpu_write(trace_cmdline_save, true);
if (trace_wakeup_needed) {
trace_wakeup_needed = false;
/* irq_work_queue() supplies it's own memory barriers */
irq_work_queue(&trace_work_wakeup);
}
ring_buffer_unlock_commit(buffer, event); ring_buffer_unlock_commit(buffer, event);
} }
...@@ -3557,21 +3502,18 @@ static int tracing_release_pipe(struct inode *inode, struct file *file) ...@@ -3557,21 +3502,18 @@ static int tracing_release_pipe(struct inode *inode, struct file *file)
static unsigned int static unsigned int
trace_poll(struct trace_iterator *iter, struct file *filp, poll_table *poll_table) trace_poll(struct trace_iterator *iter, struct file *filp, poll_table *poll_table)
{ {
if (trace_flags & TRACE_ITER_BLOCK) { /* Iterators are static, they should be filled or empty */
if (trace_buffer_iter(iter, iter->cpu_file))
return POLLIN | POLLRDNORM;
if (trace_flags & TRACE_ITER_BLOCK)
/* /*
* Always select as readable when in blocking mode * Always select as readable when in blocking mode
*/ */
return POLLIN | POLLRDNORM; return POLLIN | POLLRDNORM;
} else { else
if (!trace_empty(iter)) return ring_buffer_poll_wait(iter->tr->buffer, iter->cpu_file,
return POLLIN | POLLRDNORM; filp, poll_table);
trace_wakeup_needed = true;
poll_wait(filp, &trace_wait, poll_table);
if (!trace_empty(iter))
return POLLIN | POLLRDNORM;
return 0;
}
} }
static unsigned int static unsigned int
...@@ -5701,7 +5643,6 @@ __init static int tracer_alloc_buffers(void) ...@@ -5701,7 +5643,6 @@ __init static int tracer_alloc_buffers(void)
#endif #endif
trace_init_cmdlines(); trace_init_cmdlines();
init_irq_work(&trace_work_wakeup, trace_wake_up);
register_tracer(&nop_trace); register_tracer(&nop_trace);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment