ring-buffer: Return reader page back into existing ring buffer

When reading the ring buffer for consuming, it is optimized for splice,
where a page is taken out of the ring buffer (zero copy) and sent to the
reading consumer. When the read is finished with the page, it calls
ring_buffer_free_read_page(), which simply frees the page. The next time the
reader needs to get a page from the ring buffer, it must call
ring_buffer_alloc_read_page() which allocates and initializes a reader page
for the ring buffer to be swapped into the ring buffer for a new filled page
for the reader.

The problem is that there's no reason to actually free the page when it is
passed back to the ring buffer. It can hold it off and reuse it for the next
iteration. This completely removes the interaction with the page_alloc
mechanism.

Using the trace-cmd utility to record all events (causing trace-cmd to
require reading lots of pages from the ring buffer, and calling
ring_buffer_alloc/free_read_page() several times), and also assigning a
stack trace trigger to the mm_page_alloc event, we can see how many times
the ring_buffer_alloc_read_page() needed to allocate a page for the ring
buffer.

Before this change:

  # trace-cmd record -e all -e mem_page_alloc -R stacktrace sleep 1
  # trace-cmd report |grep ring_buffer_alloc_read_page | wc -l
  9968

After this change:

  # trace-cmd record -e all -e mem_page_alloc -R stacktrace sleep 1
  # trace-cmd report |grep ring_buffer_alloc_read_page | wc -l
  4
Signed-off-by: default avatarSteven Rostedt (VMware) <rostedt@goodmis.org>
parent ca2958f1
...@@ -185,7 +185,7 @@ size_t ring_buffer_page_len(void *page); ...@@ -185,7 +185,7 @@ size_t ring_buffer_page_len(void *page);
void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu); void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu);
void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data); void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data);
int ring_buffer_read_page(struct ring_buffer *buffer, void **data_page, int ring_buffer_read_page(struct ring_buffer *buffer, void **data_page,
size_t len, int cpu, int full); size_t len, int cpu, int full);
......
...@@ -438,6 +438,7 @@ struct ring_buffer_per_cpu { ...@@ -438,6 +438,7 @@ struct ring_buffer_per_cpu {
raw_spinlock_t reader_lock; /* serialize readers */ raw_spinlock_t reader_lock; /* serialize readers */
arch_spinlock_t lock; arch_spinlock_t lock;
struct lock_class_key lock_key; struct lock_class_key lock_key;
struct buffer_data_page *free_page;
unsigned long nr_pages; unsigned long nr_pages;
unsigned int current_context; unsigned int current_context;
struct list_head *pages; struct list_head *pages;
...@@ -4377,9 +4378,25 @@ EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); ...@@ -4377,9 +4378,25 @@ EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
*/ */
void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu) void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
{ {
struct buffer_data_page *bpage; struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
struct buffer_data_page *bpage = NULL;
unsigned long flags;
struct page *page; struct page *page;
local_irq_save(flags);
arch_spin_lock(&cpu_buffer->lock);
if (cpu_buffer->free_page) {
bpage = cpu_buffer->free_page;
cpu_buffer->free_page = NULL;
}
arch_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
if (bpage)
goto out;
page = alloc_pages_node(cpu_to_node(cpu), page = alloc_pages_node(cpu_to_node(cpu),
GFP_KERNEL | __GFP_NORETRY, 0); GFP_KERNEL | __GFP_NORETRY, 0);
if (!page) if (!page)
...@@ -4387,6 +4404,7 @@ void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu) ...@@ -4387,6 +4404,7 @@ void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
bpage = page_address(page); bpage = page_address(page);
out:
rb_init_page(bpage); rb_init_page(bpage);
return bpage; return bpage;
...@@ -4396,13 +4414,29 @@ EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); ...@@ -4396,13 +4414,29 @@ EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
/** /**
* ring_buffer_free_read_page - free an allocated read page * ring_buffer_free_read_page - free an allocated read page
* @buffer: the buffer the page was allocate for * @buffer: the buffer the page was allocate for
* @cpu: the cpu buffer the page came from
* @data: the page to free * @data: the page to free
* *
* Free a page allocated from ring_buffer_alloc_read_page. * Free a page allocated from ring_buffer_alloc_read_page.
*/ */
void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data)
{ {
free_page((unsigned long)data); struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
struct buffer_data_page *bpage = data;
unsigned long flags;
local_irq_save(flags);
arch_spin_lock(&cpu_buffer->lock);
if (!cpu_buffer->free_page) {
cpu_buffer->free_page = bpage;
bpage = NULL;
}
arch_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
free_page((unsigned long)bpage);
} }
EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
......
...@@ -171,7 +171,7 @@ static enum event_status read_page(int cpu) ...@@ -171,7 +171,7 @@ static enum event_status read_page(int cpu)
} }
} }
} }
ring_buffer_free_read_page(buffer, bpage); ring_buffer_free_read_page(buffer, cpu, bpage);
if (ret < 0) if (ret < 0)
return EVENT_DROPPED; return EVENT_DROPPED;
......
...@@ -6054,6 +6054,7 @@ static int tracing_clock_open(struct inode *inode, struct file *file) ...@@ -6054,6 +6054,7 @@ static int tracing_clock_open(struct inode *inode, struct file *file)
struct ftrace_buffer_info { struct ftrace_buffer_info {
struct trace_iterator iter; struct trace_iterator iter;
void *spare; void *spare;
unsigned int spare_cpu;
unsigned int read; unsigned int read;
}; };
...@@ -6383,9 +6384,11 @@ tracing_buffers_read(struct file *filp, char __user *ubuf, ...@@ -6383,9 +6384,11 @@ tracing_buffers_read(struct file *filp, char __user *ubuf,
return -EBUSY; return -EBUSY;
#endif #endif
if (!info->spare) if (!info->spare) {
info->spare = ring_buffer_alloc_read_page(iter->trace_buffer->buffer, info->spare = ring_buffer_alloc_read_page(iter->trace_buffer->buffer,
iter->cpu_file); iter->cpu_file);
info->spare_cpu = iter->cpu_file;
}
if (!info->spare) if (!info->spare)
return -ENOMEM; return -ENOMEM;
...@@ -6445,7 +6448,8 @@ static int tracing_buffers_release(struct inode *inode, struct file *file) ...@@ -6445,7 +6448,8 @@ static int tracing_buffers_release(struct inode *inode, struct file *file)
__trace_array_put(iter->tr); __trace_array_put(iter->tr);
if (info->spare) if (info->spare)
ring_buffer_free_read_page(iter->trace_buffer->buffer, info->spare); ring_buffer_free_read_page(iter->trace_buffer->buffer,
info->spare_cpu, info->spare);
kfree(info); kfree(info);
mutex_unlock(&trace_types_lock); mutex_unlock(&trace_types_lock);
...@@ -6456,6 +6460,7 @@ static int tracing_buffers_release(struct inode *inode, struct file *file) ...@@ -6456,6 +6460,7 @@ static int tracing_buffers_release(struct inode *inode, struct file *file)
struct buffer_ref { struct buffer_ref {
struct ring_buffer *buffer; struct ring_buffer *buffer;
void *page; void *page;
int cpu;
int ref; int ref;
}; };
...@@ -6467,7 +6472,7 @@ static void buffer_pipe_buf_release(struct pipe_inode_info *pipe, ...@@ -6467,7 +6472,7 @@ static void buffer_pipe_buf_release(struct pipe_inode_info *pipe,
if (--ref->ref) if (--ref->ref)
return; return;
ring_buffer_free_read_page(ref->buffer, ref->page); ring_buffer_free_read_page(ref->buffer, ref->cpu, ref->page);
kfree(ref); kfree(ref);
buf->private = 0; buf->private = 0;
} }
...@@ -6501,7 +6506,7 @@ static void buffer_spd_release(struct splice_pipe_desc *spd, unsigned int i) ...@@ -6501,7 +6506,7 @@ static void buffer_spd_release(struct splice_pipe_desc *spd, unsigned int i)
if (--ref->ref) if (--ref->ref)
return; return;
ring_buffer_free_read_page(ref->buffer, ref->page); ring_buffer_free_read_page(ref->buffer, ref->cpu, ref->page);
kfree(ref); kfree(ref);
spd->partial[i].private = 0; spd->partial[i].private = 0;
} }
...@@ -6566,11 +6571,13 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos, ...@@ -6566,11 +6571,13 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
kfree(ref); kfree(ref);
break; break;
} }
ref->cpu = iter->cpu_file;
r = ring_buffer_read_page(ref->buffer, &ref->page, r = ring_buffer_read_page(ref->buffer, &ref->page,
len, iter->cpu_file, 1); len, iter->cpu_file, 1);
if (r < 0) { if (r < 0) {
ring_buffer_free_read_page(ref->buffer, ref->page); ring_buffer_free_read_page(ref->buffer, ref->cpu,
ref->page);
kfree(ref); kfree(ref);
break; break;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment