Commit e2026879 authored by Ingo Molnar's avatar Ingo Molnar

Merge branch 'tip/tracing/ring-buffer-3' of...

Merge branch 'tip/tracing/ring-buffer-3' of git://git.kernel.org/pub/scm/linux/kernel/git/rostedt/linux-2.6-trace into tracing/core
parents a3578000 8b2c70d1
Lockless Ring Buffer Design
===========================
Copyright 2009 Red Hat Inc.
Author: Steven Rostedt <srostedt@redhat.com>
License: The GNU Free Documentation License, Version 1.2
(dual licensed under the GPL v2)
Reviewers: Mathieu Desnoyers, Huang Ying, Hidetoshi Seto,
and Frederic Weisbecker.
Written for: 2.6.31
Terminology used in this Document
---------------------------------
tail - where new writes happen in the ring buffer.
head - where new reads happen in the ring buffer.
producer - the task that writes into the ring buffer (same as writer)
writer - same as producer
consumer - the task that reads from the buffer (same as reader)
reader - same as consumer.
reader_page - A page outside the ring buffer used solely (for the most part)
by the reader.
head_page - a pointer to the page that the reader will use next
tail_page - a pointer to the page that will be written to next
commit_page - a pointer to the page with the last finished non nested write.
cmpxchg - hardware assisted atomic transaction that performs the following:
A = B iff previous A == C
R = cmpxchg(A, C, B) is saying that we replace A with B if and only if
current A is equal to C, and we put the old (current) A into R
R gets the previous A regardless if A is updated with B or not.
To see if the update was successful a compare of R == C may be used.
The Generic Ring Buffer
-----------------------
The ring buffer can be used in either an overwrite mode or in
producer/consumer mode.
Producer/consumer mode is where the producer were to fill up the
buffer before the consumer could free up anything, the producer
will stop writing to the buffer. This will lose most recent events.
Overwrite mode is where the produce were to fill up the buffer
before the consumer could free up anything, the producer will
overwrite the older data. This will lose the oldest events.
No two writers can write at the same time (on the same per cpu buffer),
but a writer may interrupt another writer, but it must finish writing
before the previous writer may continue. This is very important to the
algorithm. The writers act like a "stack". The way interrupts works
enforces this behavior.
writer1 start
<preempted> writer2 start
<preempted> writer3 start
writer3 finishes
writer2 finishes
writer1 finishes
This is very much like a writer being preempted by an interrupt and
the interrupt doing a write as well.
Readers can happen at any time. But no two readers may run at the
same time, nor can a reader preempt/interrupt another reader. A reader
can not preempt/interrupt a writer, but it may read/consume from the
buffer at the same time as a writer is writing, but the reader must be
on another processor to do so. A reader may read on its own processor
and can be preempted by a writer.
A writer can preempt a reader, but a reader can not preempt a writer.
But a reader can read the buffer at the same time (on another processor)
as a writer.
The ring buffer is made up of a list of pages held together by a link list.
At initialization a reader page is allocated for the reader that is not
part of the ring buffer.
The head_page, tail_page and commit_page are all initialized to point
to the same page.
The reader page is initialized to have its next pointer pointing to
the head page, and its previous pointer pointing to a page before
the head page.
The reader has its own page to use. At start up time, this page is
allocated but is not attached to the list. When the reader wants
to read from the buffer, if its page is empty (like it is on start up)
it will swap its page with the head_page. The old reader page will
become part of the ring buffer and the head_page will be removed.
The page after the inserted page (old reader_page) will become the
new head page.
Once the new page is given to the reader, the reader could do what
it wants with it, as long as a writer has left that page.
A sample of how the reader page is swapped: Note this does not
show the head page in the buffer, it is for demonstrating a swap
only.
+------+
|reader| RING BUFFER
|page |
+------+
+---+ +---+ +---+
| |-->| |-->| |
| |<--| |<--| |
+---+ +---+ +---+
^ | ^ |
| +-------------+ |
+-----------------+
+------+
|reader| RING BUFFER
|page |-------------------+
+------+ v
| +---+ +---+ +---+
| | |-->| |-->| |
| | |<--| |<--| |<-+
| +---+ +---+ +---+ |
| ^ | ^ | |
| | +-------------+ | |
| +-----------------+ |
+------------------------------------+
+------+
|reader| RING BUFFER
|page |-------------------+
+------+ <---------------+ v
| ^ +---+ +---+ +---+
| | | |-->| |-->| |
| | | | | |<--| |<-+
| | +---+ +---+ +---+ |
| | | ^ | |
| | +-------------+ | |
| +-----------------------------+ |
+------------------------------------+
+------+
|buffer| RING BUFFER
|page |-------------------+
+------+ <---------------+ v
| ^ +---+ +---+ +---+
| | | | | |-->| |
| | New | | | |<--| |<-+
| | Reader +---+ +---+ +---+ |
| | page ----^ | |
| | | |
| +-----------------------------+ |
+------------------------------------+
It is possible that the page swapped is the commit page and the tail page,
if what is in the ring buffer is less than what is held in a buffer page.
reader page commit page tail page
| | |
v | |
+---+ | |
| |<----------+ |
| |<------------------------+
| |------+
+---+ |
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
This case is still valid for this algorithm.
When the writer leaves the page, it simply goes into the ring buffer
since the reader page still points to the next location in the ring
buffer.
The main pointers:
reader page - The page used solely by the reader and is not part
of the ring buffer (may be swapped in)
head page - the next page in the ring buffer that will be swapped
with the reader page.
tail page - the page where the next write will take place.
commit page - the page that last finished a write.
The commit page only is updated by the outer most writer in the
writer stack. A writer that preempts another writer will not move the
commit page.
When data is written into the ring buffer, a position is reserved
in the ring buffer and passed back to the writer. When the writer
is finished writing data into that position, it commits the write.
Another write (or a read) may take place at anytime during this
transaction. If another write happens it must finish before continuing
with the previous write.
Write reserve:
Buffer page
+---------+
|written |
+---------+ <--- given back to writer (current commit)
|reserved |
+---------+ <--- tail pointer
| empty |
+---------+
Write commit:
Buffer page
+---------+
|written |
+---------+
|written |
+---------+ <--- next positon for write (current commit)
| empty |
+---------+
If a write happens after the first reserve:
Buffer page
+---------+
|written |
+---------+ <-- current commit
|reserved |
+---------+ <--- given back to second writer
|reserved |
+---------+ <--- tail pointer
After second writer commits:
Buffer page
+---------+
|written |
+---------+ <--(last full commit)
|reserved |
+---------+
|pending |
|commit |
+---------+ <--- tail pointer
When the first writer commits:
Buffer page
+---------+
|written |
+---------+
|written |
+---------+
|written |
+---------+ <--(last full commit and tail pointer)
The commit pointer points to the last write location that was
committed without preempting another write. When a write that
preempted another write is committed, it only becomes a pending commit
and will not be a full commit till all writes have been committed.
The commit page points to the page that has the last full commit.
The tail page points to the page with the last write (before
committing).
The tail page is always equal to or after the commit page. It may
be several pages ahead. If the tail page catches up to the commit
page then no more writes may take place (regardless of the mode
of the ring buffer: overwrite and produce/consumer).
The order of pages are:
head page
commit page
tail page
Possible scenario:
tail page
head page commit page |
| | |
v v v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
There is a special case that the head page is after either the commit page
and possibly the tail page. That is when the commit (and tail) page has been
swapped with the reader page. This is because the head page is always
part of the ring buffer, but the reader page is not. When ever there
has been less than a full page that has been committed inside the ring buffer,
and a reader swaps out a page, it will be swapping out the commit page.
reader page commit page tail page
| | |
v | |
+---+ | |
| |<----------+ |
| |<------------------------+
| |------+
+---+ |
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
^
|
head page
In this case, the head page will not move when the tail and commit
move back into the ring buffer.
The reader can not swap a page into the ring buffer if the commit page
is still on that page. If the read meets the last commit (real commit
not pending or reserved), then there is nothing more to read.
The buffer is considered empty until another full commit finishes.
When the tail meets the head page, if the buffer is in overwrite mode,
the head page will be pushed ahead one. If the buffer is in producer/consumer
mode, the write will fail.
Overwrite mode:
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
^
|
head page
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
^
|
head page
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
^
|
head page
Note, the reader page will still point to the previous head page.
But when a swap takes place, it will use the most recent head page.
Making the Ring Buffer Lockless:
--------------------------------
The main idea behind the lockless algorithm is to combine the moving
of the head_page pointer with the swapping of pages with the reader.
State flags are placed inside the pointer to the page. To do this,
each page must be aligned in memory by 4 bytes. This will allow the 2
least significant bits of the address to be used as flags. Since
they will always be zero for the address. To get the address,
simply mask out the flags.
MASK = ~3
address & MASK
Two flags will be kept by these two bits:
HEADER - the page being pointed to is a head page
UPDATE - the page being pointed to is being updated by a writer
and was or is about to be a head page.
reader page
|
v
+---+
| |------+
+---+ |
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-H->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The above pointer "-H->" would have the HEADER flag set. That is
the next page is the next page to be swapped out by the reader.
This pointer means the next page is the head page.
When the tail page meets the head pointer, it will use cmpxchg to
change the pointer to the UPDATE state:
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-H->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
"-U->" represents a pointer in the UPDATE state.
Any access to the reader will need to take some sort of lock to serialize
the readers. But the writers will never take a lock to write to the
ring buffer. This means we only need to worry about a single reader,
and writes only preempt in "stack" formation.
When the reader tries to swap the page with the ring buffer, it
will also use cmpxchg. If the flag bit in the pointer to the
head page does not have the HEADER flag set, the compare will fail
and the reader will need to look for the new head page and try again.
Note, the flag UPDATE and HEADER are never set at the same time.
The reader swaps the reader page as follows:
+------+
|reader| RING BUFFER
|page |
+------+
+---+ +---+ +---+
| |--->| |--->| |
| |<---| |<---| |
+---+ +---+ +---+
^ | ^ |
| +---------------+ |
+-----H-------------+
The reader sets the reader page next pointer as HEADER to the page after
the head page.
+------+
|reader| RING BUFFER
|page |-------H-----------+
+------+ v
| +---+ +---+ +---+
| | |--->| |--->| |
| | |<---| |<---| |<-+
| +---+ +---+ +---+ |
| ^ | ^ | |
| | +---------------+ | |
| +-----H-------------+ |
+--------------------------------------+
It does a cmpxchg with the pointer to the previous head page to make it
point to the reader page. Note that the new pointer does not have the HEADER
flag set. This action atomically moves the head page forward.
+------+
|reader| RING BUFFER
|page |-------H-----------+
+------+ v
| ^ +---+ +---+ +---+
| | | |-->| |-->| |
| | | |<--| |<--| |<-+
| | +---+ +---+ +---+ |
| | | ^ | |
| | +-------------+ | |
| +-----------------------------+ |
+------------------------------------+
After the new head page is set, the previous pointer of the head page is
updated to the reader page.
+------+
|reader| RING BUFFER
|page |-------H-----------+
+------+ <---------------+ v
| ^ +---+ +---+ +---+
| | | |-->| |-->| |
| | | | | |<--| |<-+
| | +---+ +---+ +---+ |
| | | ^ | |
| | +-------------+ | |
| +-----------------------------+ |
+------------------------------------+
+------+
|buffer| RING BUFFER
|page |-------H-----------+ <--- New head page
+------+ <---------------+ v
| ^ +---+ +---+ +---+
| | | | | |-->| |
| | New | | | |<--| |<-+
| | Reader +---+ +---+ +---+ |
| | page ----^ | |
| | | |
| +-----------------------------+ |
+------------------------------------+
Another important point. The page that the reader page points back to
by its previous pointer (the one that now points to the new head page)
never points back to the reader page. That is because the reader page is
not part of the ring buffer. Traversing the ring buffer via the next pointers
will always stay in the ring buffer. Traversing the ring buffer via the
prev pointers may not.
Note, the way to determine a reader page is simply by examining the previous
pointer of the page. If the next pointer of the previous page does not
point back to the original page, then the original page is a reader page:
+--------+
| reader | next +----+
| page |-------->| |<====== (buffer page)
+--------+ +----+
| | ^
| v | next
prev | +----+
+------------->| |
+----+
The way the head page moves forward:
When the tail page meets the head page and the buffer is in overwrite mode
and more writes take place, the head page must be moved forward before the
writer may move the tail page. The way this is done is that the writer
performs a cmpxchg to convert the pointer to the head page from the HEADER
flag to have the UPDATE flag set. Once this is done, the reader will
not be able to swap the head page from the buffer, nor will it be able to
move the head page, until the writer is finished with the move.
This eliminates any races that the reader can have on the writer. The reader
must spin, and this is why the reader can not preempt the writer.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-H->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The following page will be made into the new head page.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
After the new head page has been set, we can set the old head page
pointer back to NORMAL.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
After the head page has been moved, the tail page may now move forward.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The above are the trivial updates. Now for the more complex scenarios.
As stated before, if enough writes preempt the first write, the
tail page may make it all the way around the buffer and meet the commit
page. At this time, we must start dropping writes (usually with some kind
of warning to the user). But what happens if the commit was still on the
reader page? The commit page is not part of the ring buffer. The tail page
must account for this.
reader page commit page
| |
v |
+---+ |
| |<----------+
| |
| |------+
+---+ |
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-H->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
^
|
tail page
If the tail page were to simply push the head page forward, the commit when
leaving the reader page would not be pointing to the correct page.
The solution to this is to test if the commit page is on the reader page
before pushing the head page. If it is, then it can be assumed that the
tail page wrapped the buffer, and we must drop new writes.
This is not a race condition, because the commit page can only be moved
by the outter most writer (the writer that was preempted).
This means that the commit will not move while a writer is moving the
tail page. The reader can not swap the reader page if it is also being
used as the commit page. The reader can simply check that the commit
is off the reader page. Once the commit page leaves the reader page
it will never go back on it unless a reader does another swap with the
buffer page that is also the commit page.
Nested writes
-------------
In the pushing forward of the tail page we must first push forward
the head page if the head page is the next page. If the head page
is not the next page, the tail page is simply updated with a cmpxchg.
Only writers move the tail page. This must be done atomically to protect
against nested writers.
temp_page = tail_page
next_page = temp_page->next
cmpxchg(tail_page, temp_page, next_page)
The above will update the tail page if it is still pointing to the expected
page. If this fails, a nested write pushed it forward, the the current write
does not need to push it.
temp page
|
v
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
Nested write comes in and moves the tail page forward:
tail page (moved by nested writer)
temp page |
| |
v v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The above would fail the cmpxchg, but since the tail page has already
been moved forward, the writer will just try again to reserve storage
on the new tail page.
But the moving of the head page is a bit more complex.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-H->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The write converts the head page pointer to UPDATE.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
But if a nested writer preempts here. It will see that the next
page is a head page, but it is also nested. It will detect that
it is nested and will save that information. The detection is the
fact that it sees the UPDATE flag instead of a HEADER or NORMAL
pointer.
The nested writer will set the new head page pointer.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
But it will not reset the update back to normal. Only the writer
that converted a pointer from HEAD to UPDATE will convert it back
to NORMAL.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
After the nested writer finishes, the outer most writer will convert
the UPDATE pointer to NORMAL.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
It can be even more complex if several nested writes came in and moved
the tail page ahead several pages:
(first writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-H->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The write converts the head page pointer to UPDATE.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
Next writer comes in, and sees the update and sets up the new
head page.
(second writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The nested writer moves the tail page forward. But does not set the old
update page to NORMAL because it is not the outer most writer.
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
Another writer preempts and sees the page after the tail page is a head page.
It changes it from HEAD to UPDATE.
(third writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-U->| |--->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The writer will move the head page forward:
(third writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-U->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
But now that the third writer did change the HEAD flag to UPDATE it
will convert it to normal:
(third writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
Then it will move the tail page, and return back to the second writer.
(second writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The second writer will fail to move the tail page because it was already
moved, so it will try again and add its data to the new tail page.
It will return to the first writer.
(first writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
The first writer can not know atomically test if the tail page moved
while it updates the HEAD page. It will then update the head page to
what it thinks is the new head page.
(first writer)
tail page
|
v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
Since the cmpxchg returns the old value of the pointer the first writer
will see it succeeded in updating the pointer from NORMAL to HEAD.
But as we can see, this is not good enough. It must also check to see
if the tail page is either where it use to be or on the next page:
(first writer)
A B tail page
| | |
v v v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |-H->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
If tail page != A and tail page does not equal B, then it must reset the
pointer back to NORMAL. The fact that it only needs to worry about
nested writers, it only needs to check this after setting the HEAD page.
(first writer)
A B tail page
| | |
v v v
+---+ +---+ +---+ +---+
<---| |--->| |-U->| |--->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
Now the writer can update the head page. This is also why the head page must
remain in UPDATE and only reset by the outer most writer. This prevents
the reader from seeing the incorrect head page.
(first writer)
A B tail page
| | |
v v v
+---+ +---+ +---+ +---+
<---| |--->| |--->| |--->| |-H->
--->| |<---| |<---| |<---| |<---
+---+ +---+ +---+ +---+
......@@ -170,7 +170,6 @@ unsigned long ring_buffer_overruns(struct ring_buffer *buffer);
unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu);
unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu);
unsigned long ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu);
unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu);
u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu);
void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
......
......@@ -322,6 +322,14 @@ struct buffer_data_page {
unsigned char data[]; /* data of buffer page */
};
/*
* Note, the buffer_page list must be first. The buffer pages
* are allocated in cache lines, which means that each buffer
* page will be at the beginning of a cache line, and thus
* the least significant bits will be zero. We use this to
* add flags in the list struct pointers, to make the ring buffer
* lockless.
*/
struct buffer_page {
struct list_head list; /* list of buffer pages */
local_t write; /* index for next write */
......@@ -330,6 +338,21 @@ struct buffer_page {
struct buffer_data_page *page; /* Actual data page */
};
/*
* The buffer page counters, write and entries, must be reset
* atomically when crossing page boundaries. To synchronize this
* update, two counters are inserted into the number. One is
* the actual counter for the write position or count on the page.
*
* The other is a counter of updaters. Before an update happens
* the update partition of the counter is incremented. This will
* allow the updater to update the counter atomically.
*
* The counter is 20 bits, and the state data is 12.
*/
#define RB_WRITE_MASK 0xfffff
#define RB_WRITE_INTCNT (1 << 20)
static void rb_init_page(struct buffer_data_page *bpage)
{
local_set(&bpage->commit, 0);
......@@ -403,21 +426,20 @@ int ring_buffer_print_page_header(struct trace_seq *s)
struct ring_buffer_per_cpu {
int cpu;
struct ring_buffer *buffer;
spinlock_t reader_lock; /* serialize readers */
spinlock_t reader_lock; /* serialize readers */
raw_spinlock_t lock;
struct lock_class_key lock_key;
struct list_head pages;
struct list_head *pages;
struct buffer_page *head_page; /* read from head */
struct buffer_page *tail_page; /* write to tail */
struct buffer_page *commit_page; /* committed pages */
struct buffer_page *reader_page;
unsigned long nmi_dropped;
unsigned long commit_overrun;
unsigned long overrun;
unsigned long read;
local_t commit_overrun;
local_t overrun;
local_t entries;
local_t committing;
local_t commits;
unsigned long read;
u64 write_stamp;
u64 read_stamp;
atomic_t record_disabled;
......@@ -489,6 +511,385 @@ void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
}
EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
/*
* Making the ring buffer lockless makes things tricky.
* Although writes only happen on the CPU that they are on,
* and they only need to worry about interrupts. Reads can
* happen on any CPU.
*
* The reader page is always off the ring buffer, but when the
* reader finishes with a page, it needs to swap its page with
* a new one from the buffer. The reader needs to take from
* the head (writes go to the tail). But if a writer is in overwrite
* mode and wraps, it must push the head page forward.
*
* Here lies the problem.
*
* The reader must be careful to replace only the head page, and
* not another one. As described at the top of the file in the
* ASCII art, the reader sets its old page to point to the next
* page after head. It then sets the page after head to point to
* the old reader page. But if the writer moves the head page
* during this operation, the reader could end up with the tail.
*
* We use cmpxchg to help prevent this race. We also do something
* special with the page before head. We set the LSB to 1.
*
* When the writer must push the page forward, it will clear the
* bit that points to the head page, move the head, and then set
* the bit that points to the new head page.
*
* We also don't want an interrupt coming in and moving the head
* page on another writer. Thus we use the second LSB to catch
* that too. Thus:
*
* head->list->prev->next bit 1 bit 0
* ------- -------
* Normal page 0 0
* Points to head page 0 1
* New head page 1 0
*
* Note we can not trust the prev pointer of the head page, because:
*
* +----+ +-----+ +-----+
* | |------>| T |---X--->| N |
* | |<------| | | |
* +----+ +-----+ +-----+
* ^ ^ |
* | +-----+ | |
* +----------| R |----------+ |
* | |<-----------+
* +-----+
*
* Key: ---X--> HEAD flag set in pointer
* T Tail page
* R Reader page
* N Next page
*
* (see __rb_reserve_next() to see where this happens)
*
* What the above shows is that the reader just swapped out
* the reader page with a page in the buffer, but before it
* could make the new header point back to the new page added
* it was preempted by a writer. The writer moved forward onto
* the new page added by the reader and is about to move forward
* again.
*
* You can see, it is legitimate for the previous pointer of
* the head (or any page) not to point back to itself. But only
* temporarially.
*/
#define RB_PAGE_NORMAL 0UL
#define RB_PAGE_HEAD 1UL
#define RB_PAGE_UPDATE 2UL
#define RB_FLAG_MASK 3UL
/* PAGE_MOVED is not part of the mask */
#define RB_PAGE_MOVED 4UL
/*
* rb_list_head - remove any bit
*/
static struct list_head *rb_list_head(struct list_head *list)
{
unsigned long val = (unsigned long)list;
return (struct list_head *)(val & ~RB_FLAG_MASK);
}
/*
* rb_is_head_page - test if the give page is the head page
*
* Because the reader may move the head_page pointer, we can
* not trust what the head page is (it may be pointing to
* the reader page). But if the next page is a header page,
* its flags will be non zero.
*/
static int inline
rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *page, struct list_head *list)
{
unsigned long val;
val = (unsigned long)list->next;
if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
return RB_PAGE_MOVED;
return val & RB_FLAG_MASK;
}
/*
* rb_is_reader_page
*
* The unique thing about the reader page, is that, if the
* writer is ever on it, the previous pointer never points
* back to the reader page.
*/
static int rb_is_reader_page(struct buffer_page *page)
{
struct list_head *list = page->list.prev;
return rb_list_head(list->next) != &page->list;
}
/*
* rb_set_list_to_head - set a list_head to be pointing to head.
*/
static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
struct list_head *list)
{
unsigned long *ptr;
ptr = (unsigned long *)&list->next;
*ptr |= RB_PAGE_HEAD;
*ptr &= ~RB_PAGE_UPDATE;
}
/*
* rb_head_page_activate - sets up head page
*/
static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *head;
head = cpu_buffer->head_page;
if (!head)
return;
/*
* Set the previous list pointer to have the HEAD flag.
*/
rb_set_list_to_head(cpu_buffer, head->list.prev);
}
static void rb_list_head_clear(struct list_head *list)
{
unsigned long *ptr = (unsigned long *)&list->next;
*ptr &= ~RB_FLAG_MASK;
}
/*
* rb_head_page_dactivate - clears head page ptr (for free list)
*/
static void
rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *hd;
/* Go through the whole list and clear any pointers found. */
rb_list_head_clear(cpu_buffer->pages);
list_for_each(hd, cpu_buffer->pages)
rb_list_head_clear(hd);
}
static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag, int new_flag)
{
struct list_head *list;
unsigned long val = (unsigned long)&head->list;
unsigned long ret;
list = &prev->list;
val &= ~RB_FLAG_MASK;
ret = (unsigned long)cmpxchg(&list->next,
val | old_flag, val | new_flag);
/* check if the reader took the page */
if ((ret & ~RB_FLAG_MASK) != val)
return RB_PAGE_MOVED;
return ret & RB_FLAG_MASK;
}
static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag)
{
return rb_head_page_set(cpu_buffer, head, prev,
old_flag, RB_PAGE_UPDATE);
}
static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag)
{
return rb_head_page_set(cpu_buffer, head, prev,
old_flag, RB_PAGE_HEAD);
}
static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag)
{
return rb_head_page_set(cpu_buffer, head, prev,
old_flag, RB_PAGE_NORMAL);
}
static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page **bpage)
{
struct list_head *p = rb_list_head((*bpage)->list.next);
*bpage = list_entry(p, struct buffer_page, list);
}
static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *head;
struct buffer_page *page;
struct list_head *list;
int i;
if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
return NULL;
/* sanity check */
list = cpu_buffer->pages;
if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
return NULL;
page = head = cpu_buffer->head_page;
/*
* It is possible that the writer moves the header behind
* where we started, and we miss in one loop.
* A second loop should grab the header, but we'll do
* three loops just because I'm paranoid.
*/
for (i = 0; i < 3; i++) {
do {
if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
cpu_buffer->head_page = page;
return page;
}
rb_inc_page(cpu_buffer, &page);
} while (page != head);
}
RB_WARN_ON(cpu_buffer, 1);
return NULL;
}
static int rb_head_page_replace(struct buffer_page *old,
struct buffer_page *new)
{
unsigned long *ptr = (unsigned long *)&old->list.prev->next;
unsigned long val;
unsigned long ret;
val = *ptr & ~RB_FLAG_MASK;
val |= RB_PAGE_HEAD;
ret = cmpxchg(ptr, val, &new->list);
return ret == val;
}
/*
* rb_tail_page_update - move the tail page forward
*
* Returns 1 if moved tail page, 0 if someone else did.
*/
static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *tail_page,
struct buffer_page *next_page)
{
struct buffer_page *old_tail;
unsigned long old_entries;
unsigned long old_write;
int ret = 0;
/*
* The tail page now needs to be moved forward.
*
* We need to reset the tail page, but without messing
* with possible erasing of data brought in by interrupts
* that have moved the tail page and are currently on it.
*
* We add a counter to the write field to denote this.
*/
old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
/*
* Just make sure we have seen our old_write and synchronize
* with any interrupts that come in.
*/
barrier();
/*
* If the tail page is still the same as what we think
* it is, then it is up to us to update the tail
* pointer.
*/
if (tail_page == cpu_buffer->tail_page) {
/* Zero the write counter */
unsigned long val = old_write & ~RB_WRITE_MASK;
unsigned long eval = old_entries & ~RB_WRITE_MASK;
/*
* This will only succeed if an interrupt did
* not come in and change it. In which case, we
* do not want to modify it.
*/
local_cmpxchg(&next_page->write, old_write, val);
local_cmpxchg(&next_page->entries, old_entries, eval);
/*
* No need to worry about races with clearing out the commit.
* it only can increment when a commit takes place. But that
* only happens in the outer most nested commit.
*/
local_set(&next_page->page->commit, 0);
old_tail = cmpxchg(&cpu_buffer->tail_page,
tail_page, next_page);
if (old_tail == tail_page)
ret = 1;
}
return ret;
}
static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *bpage)
{
unsigned long val = (unsigned long)bpage;
if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
return 1;
return 0;
}
/**
* rb_check_list - make sure a pointer to a list has the last bits zero
*/
static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
struct list_head *list)
{
if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
return 1;
if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
return 1;
return 0;
}
/**
* check_pages - integrity check of buffer pages
* @cpu_buffer: CPU buffer with pages to test
......@@ -498,14 +899,19 @@ EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
*/
static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *head = &cpu_buffer->pages;
struct list_head *head = cpu_buffer->pages;
struct buffer_page *bpage, *tmp;
rb_head_page_deactivate(cpu_buffer);
if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
return -1;
if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
return -1;
if (rb_check_list(cpu_buffer, head))
return -1;
list_for_each_entry_safe(bpage, tmp, head, list) {
if (RB_WARN_ON(cpu_buffer,
bpage->list.next->prev != &bpage->list))
......@@ -513,25 +919,33 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
if (RB_WARN_ON(cpu_buffer,
bpage->list.prev->next != &bpage->list))
return -1;
if (rb_check_list(cpu_buffer, &bpage->list))
return -1;
}
rb_head_page_activate(cpu_buffer);
return 0;
}
static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
unsigned nr_pages)
{
struct list_head *head = &cpu_buffer->pages;
struct buffer_page *bpage, *tmp;
unsigned long addr;
LIST_HEAD(pages);
unsigned i;
WARN_ON(!nr_pages);
for (i = 0; i < nr_pages; i++) {
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
if (!bpage)
goto free_pages;
rb_check_bpage(cpu_buffer, bpage);
list_add(&bpage->list, &pages);
addr = __get_free_page(GFP_KERNEL);
......@@ -541,7 +955,13 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
rb_init_page(bpage->page);
}
list_splice(&pages, head);
/*
* The ring buffer page list is a circular list that does not
* start and end with a list head. All page list items point to
* other pages.
*/
cpu_buffer->pages = pages.next;
list_del(&pages);
rb_check_pages(cpu_buffer);
......@@ -573,13 +993,14 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
spin_lock_init(&cpu_buffer->reader_lock);
lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
INIT_LIST_HEAD(&cpu_buffer->pages);
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
if (!bpage)
goto fail_free_buffer;
rb_check_bpage(cpu_buffer, bpage);
cpu_buffer->reader_page = bpage;
addr = __get_free_page(GFP_KERNEL);
if (!addr)
......@@ -594,9 +1015,11 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
goto fail_free_reader;
cpu_buffer->head_page
= list_entry(cpu_buffer->pages.next, struct buffer_page, list);
= list_entry(cpu_buffer->pages, struct buffer_page, list);
cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
rb_head_page_activate(cpu_buffer);
return cpu_buffer;
fail_free_reader:
......@@ -609,15 +1032,22 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *head = &cpu_buffer->pages;
struct list_head *head = cpu_buffer->pages;
struct buffer_page *bpage, *tmp;
free_buffer_page(cpu_buffer->reader_page);
list_for_each_entry_safe(bpage, tmp, head, list) {
list_del_init(&bpage->list);
rb_head_page_deactivate(cpu_buffer);
if (head) {
list_for_each_entry_safe(bpage, tmp, head, list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
bpage = list_entry(head, struct buffer_page, list);
free_buffer_page(bpage);
}
kfree(cpu_buffer);
}
......@@ -759,15 +1189,17 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
atomic_inc(&cpu_buffer->record_disabled);
synchronize_sched();
rb_head_page_deactivate(cpu_buffer);
for (i = 0; i < nr_pages; i++) {
if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
return;
p = cpu_buffer->pages.next;
p = cpu_buffer->pages->next;
bpage = list_entry(p, struct buffer_page, list);
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
return;
rb_reset_cpu(cpu_buffer);
......@@ -789,15 +1221,19 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
atomic_inc(&cpu_buffer->record_disabled);
synchronize_sched();
spin_lock_irq(&cpu_buffer->reader_lock);
rb_head_page_deactivate(cpu_buffer);
for (i = 0; i < nr_pages; i++) {
if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
return;
p = pages->next;
bpage = list_entry(p, struct buffer_page, list);
list_del_init(&bpage->list);
list_add_tail(&bpage->list, &cpu_buffer->pages);
list_add_tail(&bpage->list, cpu_buffer->pages);
}
rb_reset_cpu(cpu_buffer);
spin_unlock_irq(&cpu_buffer->reader_lock);
rb_check_pages(cpu_buffer);
......@@ -947,22 +1383,15 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
cpu_buffer->reader_page->read);
}
static inline struct ring_buffer_event *
rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
{
return __rb_page_index(cpu_buffer->head_page,
cpu_buffer->head_page->read);
}
static inline struct ring_buffer_event *
rb_iter_head_event(struct ring_buffer_iter *iter)
{
return __rb_page_index(iter->head_page, iter->head);
}
static inline unsigned rb_page_write(struct buffer_page *bpage)
static inline unsigned long rb_page_write(struct buffer_page *bpage)
{
return local_read(&bpage->write);
return local_read(&bpage->write) & RB_WRITE_MASK;
}
static inline unsigned rb_page_commit(struct buffer_page *bpage)
......@@ -970,6 +1399,11 @@ static inline unsigned rb_page_commit(struct buffer_page *bpage)
return local_read(&bpage->page->commit);
}
static inline unsigned long rb_page_entries(struct buffer_page *bpage)
{
return local_read(&bpage->entries) & RB_WRITE_MASK;
}
/* Size is determined by what has been commited */
static inline unsigned rb_page_size(struct buffer_page *bpage)
{
......@@ -982,22 +1416,6 @@ rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
return rb_page_commit(cpu_buffer->commit_page);
}
static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
{
return rb_page_commit(cpu_buffer->head_page);
}
static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page **bpage)
{
struct list_head *p = (*bpage)->list.next;
if (p == &cpu_buffer->pages)
p = p->next;
*bpage = list_entry(p, struct buffer_page, list);
}
static inline unsigned
rb_event_index(struct ring_buffer_event *event)
{
......@@ -1023,6 +1441,8 @@ rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
static void
rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned long max_count;
/*
* We only race with interrupts and NMIs on this CPU.
* If we own the commit event, then we can commit
......@@ -1032,9 +1452,16 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
* assign the commit to the tail.
*/
again:
max_count = cpu_buffer->buffer->pages * 100;
while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
cpu_buffer->commit_page->page->commit =
cpu_buffer->commit_page->write;
if (RB_WARN_ON(cpu_buffer, !(--max_count)))
return;
if (RB_WARN_ON(cpu_buffer,
rb_is_reader_page(cpu_buffer->tail_page)))
return;
local_set(&cpu_buffer->commit_page->page->commit,
rb_page_write(cpu_buffer->commit_page));
rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
cpu_buffer->write_stamp =
cpu_buffer->commit_page->page->time_stamp;
......@@ -1043,8 +1470,12 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
}
while (rb_commit_index(cpu_buffer) !=
rb_page_write(cpu_buffer->commit_page)) {
cpu_buffer->commit_page->page->commit =
cpu_buffer->commit_page->write;
local_set(&cpu_buffer->commit_page->page->commit,
rb_page_write(cpu_buffer->commit_page));
RB_WARN_ON(cpu_buffer,
local_read(&cpu_buffer->commit_page->page->commit) &
~RB_WRITE_MASK);
barrier();
}
......@@ -1077,7 +1508,7 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
* to the head page instead of next.
*/
if (iter->head_page == cpu_buffer->reader_page)
iter->head_page = cpu_buffer->head_page;
iter->head_page = rb_set_head_page(cpu_buffer);
else
rb_inc_page(cpu_buffer, &iter->head_page);
......@@ -1121,6 +1552,163 @@ rb_update_event(struct ring_buffer_event *event,
}
}
/*
* rb_handle_head_page - writer hit the head page
*
* Returns: +1 to retry page
* 0 to continue
* -1 on error
*/
static int
rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *tail_page,
struct buffer_page *next_page)
{
struct buffer_page *new_head;
int entries;
int type;
int ret;
entries = rb_page_entries(next_page);
/*
* The hard part is here. We need to move the head
* forward, and protect against both readers on
* other CPUs and writers coming in via interrupts.
*/
type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
RB_PAGE_HEAD);
/*
* type can be one of four:
* NORMAL - an interrupt already moved it for us
* HEAD - we are the first to get here.
* UPDATE - we are the interrupt interrupting
* a current move.
* MOVED - a reader on another CPU moved the next
* pointer to its reader page. Give up
* and try again.
*/
switch (type) {
case RB_PAGE_HEAD:
/*
* We changed the head to UPDATE, thus
* it is our responsibility to update
* the counters.
*/
local_add(entries, &cpu_buffer->overrun);
/*
* The entries will be zeroed out when we move the
* tail page.
*/
/* still more to do */
break;
case RB_PAGE_UPDATE:
/*
* This is an interrupt that interrupt the
* previous update. Still more to do.
*/
break;
case RB_PAGE_NORMAL:
/*
* An interrupt came in before the update
* and processed this for us.
* Nothing left to do.
*/
return 1;
case RB_PAGE_MOVED:
/*
* The reader is on another CPU and just did
* a swap with our next_page.
* Try again.
*/
return 1;
default:
RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
return -1;
}
/*
* Now that we are here, the old head pointer is
* set to UPDATE. This will keep the reader from
* swapping the head page with the reader page.
* The reader (on another CPU) will spin till
* we are finished.
*
* We just need to protect against interrupts
* doing the job. We will set the next pointer
* to HEAD. After that, we set the old pointer
* to NORMAL, but only if it was HEAD before.
* otherwise we are an interrupt, and only
* want the outer most commit to reset it.
*/
new_head = next_page;
rb_inc_page(cpu_buffer, &new_head);
ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
RB_PAGE_NORMAL);
/*
* Valid returns are:
* HEAD - an interrupt came in and already set it.
* NORMAL - One of two things:
* 1) We really set it.
* 2) A bunch of interrupts came in and moved
* the page forward again.
*/
switch (ret) {
case RB_PAGE_HEAD:
case RB_PAGE_NORMAL:
/* OK */
break;
default:
RB_WARN_ON(cpu_buffer, 1);
return -1;
}
/*
* It is possible that an interrupt came in,
* set the head up, then more interrupts came in
* and moved it again. When we get back here,
* the page would have been set to NORMAL but we
* just set it back to HEAD.
*
* How do you detect this? Well, if that happened
* the tail page would have moved.
*/
if (ret == RB_PAGE_NORMAL) {
/*
* If the tail had moved passed next, then we need
* to reset the pointer.
*/
if (cpu_buffer->tail_page != tail_page &&
cpu_buffer->tail_page != next_page)
rb_head_page_set_normal(cpu_buffer, new_head,
next_page,
RB_PAGE_HEAD);
}
/*
* If this was the outer most commit (the one that
* changed the original pointer from HEAD to UPDATE),
* then it is up to us to reset it to NORMAL.
*/
if (type == RB_PAGE_HEAD) {
ret = rb_head_page_set_normal(cpu_buffer, next_page,
tail_page,
RB_PAGE_UPDATE);
if (RB_WARN_ON(cpu_buffer,
ret != RB_PAGE_UPDATE))
return -1;
}
return 0;
}
static unsigned rb_calculate_event_length(unsigned length)
{
struct ring_buffer_event event; /* Used only for sizeof array */
......@@ -1199,96 +1787,93 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *commit_page,
struct buffer_page *tail_page, u64 *ts)
{
struct buffer_page *next_page, *head_page, *reader_page;
struct ring_buffer *buffer = cpu_buffer->buffer;
bool lock_taken = false;
unsigned long flags;
struct buffer_page *next_page;
int ret;
next_page = tail_page;
local_irq_save(flags);
/*
* Since the write to the buffer is still not
* fully lockless, we must be careful with NMIs.
* The locks in the writers are taken when a write
* crosses to a new page. The locks protect against
* races with the readers (this will soon be fixed
* with a lockless solution).
*
* Because we can not protect against NMIs, and we
* want to keep traces reentrant, we need to manage
* what happens when we are in an NMI.
*
* NMIs can happen after we take the lock.
* If we are in an NMI, only take the lock
* if it is not already taken. Otherwise
* simply fail.
*/
if (unlikely(in_nmi())) {
if (!__raw_spin_trylock(&cpu_buffer->lock)) {
cpu_buffer->nmi_dropped++;
goto out_reset;
}
} else
__raw_spin_lock(&cpu_buffer->lock);
lock_taken = true;
rb_inc_page(cpu_buffer, &next_page);
head_page = cpu_buffer->head_page;
reader_page = cpu_buffer->reader_page;
/* we grabbed the lock before incrementing */
if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
goto out_reset;
/*
* If for some reason, we had an interrupt storm that made
* it all the way around the buffer, bail, and warn
* about it.
*/
if (unlikely(next_page == commit_page)) {
cpu_buffer->commit_overrun++;
local_inc(&cpu_buffer->commit_overrun);
goto out_reset;
}
if (next_page == head_page) {
if (!(buffer->flags & RB_FL_OVERWRITE))
goto out_reset;
/* tail_page has not moved yet? */
if (tail_page == cpu_buffer->tail_page) {
/* count overflows */
cpu_buffer->overrun +=
local_read(&head_page->entries);
/*
* This is where the fun begins!
*
* We are fighting against races between a reader that
* could be on another CPU trying to swap its reader
* page with the buffer head.
*
* We are also fighting against interrupts coming in and
* moving the head or tail on us as well.
*
* If the next page is the head page then we have filled
* the buffer, unless the commit page is still on the
* reader page.
*/
if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
rb_inc_page(cpu_buffer, &head_page);
cpu_buffer->head_page = head_page;
cpu_buffer->head_page->read = 0;
/*
* If the commit is not on the reader page, then
* move the header page.
*/
if (!rb_is_reader_page(cpu_buffer->commit_page)) {
/*
* If we are not in overwrite mode,
* this is easy, just stop here.
*/
if (!(buffer->flags & RB_FL_OVERWRITE))
goto out_reset;
ret = rb_handle_head_page(cpu_buffer,
tail_page,
next_page);
if (ret < 0)
goto out_reset;
if (ret)
goto out_again;
} else {
/*
* We need to be careful here too. The
* commit page could still be on the reader
* page. We could have a small buffer, and
* have filled up the buffer with events
* from interrupts and such, and wrapped.
*
* Note, if the tail page is also the on the
* reader_page, we let it move out.
*/
if (unlikely((cpu_buffer->commit_page !=
cpu_buffer->tail_page) &&
(cpu_buffer->commit_page ==
cpu_buffer->reader_page))) {
local_inc(&cpu_buffer->commit_overrun);
goto out_reset;
}
}
}
/*
* If the tail page is still the same as what we think
* it is, then it is up to us to update the tail
* pointer.
*/
if (tail_page == cpu_buffer->tail_page) {
local_set(&next_page->write, 0);
local_set(&next_page->entries, 0);
local_set(&next_page->page->commit, 0);
cpu_buffer->tail_page = next_page;
/* reread the time stamp */
ret = rb_tail_page_update(cpu_buffer, tail_page, next_page);
if (ret) {
/*
* Nested commits always have zero deltas, so
* just reread the time stamp
*/
*ts = rb_time_stamp(buffer, cpu_buffer->cpu);
cpu_buffer->tail_page->page->time_stamp = *ts;
next_page->page->time_stamp = *ts;
}
rb_reset_tail(cpu_buffer, tail_page, tail, length);
out_again:
__raw_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
rb_reset_tail(cpu_buffer, tail_page, tail, length);
/* fail and let the caller try again */
return ERR_PTR(-EAGAIN);
......@@ -1297,9 +1882,6 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
/* reset write */
rb_reset_tail(cpu_buffer, tail_page, tail, length);
if (likely(lock_taken))
__raw_spin_unlock(&cpu_buffer->lock);
local_irq_restore(flags);
return NULL;
}
......@@ -1316,6 +1898,9 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
barrier();
tail_page = cpu_buffer->tail_page;
write = local_add_return(length, &tail_page->write);
/* set write to only the index of the write */
write &= RB_WRITE_MASK;
tail = write - length;
/* See if we shot pass the end of this buffer page */
......@@ -1360,12 +1945,16 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
bpage = cpu_buffer->tail_page;
if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
unsigned long write_mask =
local_read(&bpage->write) & ~RB_WRITE_MASK;
/*
* This is on the tail page. It is possible that
* a write could come in and move the tail page
* and write to the next page. That is fine
* because we just shorten what is on this page.
*/
old_index += write_mask;
new_index += write_mask;
index = local_cmpxchg(&bpage->write, old_index, new_index);
if (index == old_index)
return 1;
......@@ -1874,9 +2463,13 @@ EXPORT_SYMBOL_GPL(ring_buffer_write);
static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *reader = cpu_buffer->reader_page;
struct buffer_page *head = cpu_buffer->head_page;
struct buffer_page *head = rb_set_head_page(cpu_buffer);
struct buffer_page *commit = cpu_buffer->commit_page;
/* In case of error, head will be NULL */
if (unlikely(!head))
return 1;
return reader->read == rb_page_commit(reader) &&
(commit == reader ||
(commit == head &&
......@@ -1967,7 +2560,7 @@ unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
return 0;
cpu_buffer = buffer->buffers[cpu];
ret = (local_read(&cpu_buffer->entries) - cpu_buffer->overrun)
ret = (local_read(&cpu_buffer->entries) - local_read(&cpu_buffer->overrun))
- cpu_buffer->read;
return ret;
......@@ -1988,32 +2581,12 @@ unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
return 0;
cpu_buffer = buffer->buffers[cpu];
ret = cpu_buffer->overrun;
ret = local_read(&cpu_buffer->overrun);
return ret;
}
EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
/**
* ring_buffer_nmi_dropped_cpu - get the number of nmis that were dropped
* @buffer: The ring buffer
* @cpu: The per CPU buffer to get the number of overruns from
*/
unsigned long ring_buffer_nmi_dropped_cpu(struct ring_buffer *buffer, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
unsigned long ret;
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return 0;
cpu_buffer = buffer->buffers[cpu];
ret = cpu_buffer->nmi_dropped;
return ret;
}
EXPORT_SYMBOL_GPL(ring_buffer_nmi_dropped_cpu);
/**
* ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits
* @buffer: The ring buffer
......@@ -2029,7 +2602,7 @@ ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
return 0;
cpu_buffer = buffer->buffers[cpu];
ret = cpu_buffer->commit_overrun;
ret = local_read(&cpu_buffer->commit_overrun);
return ret;
}
......@@ -2052,7 +2625,7 @@ unsigned long ring_buffer_entries(struct ring_buffer *buffer)
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
entries += (local_read(&cpu_buffer->entries) -
cpu_buffer->overrun) - cpu_buffer->read;
local_read(&cpu_buffer->overrun)) - cpu_buffer->read;
}
return entries;
......@@ -2075,7 +2648,7 @@ unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
/* if you care about this being correct, lock the buffer */
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
overruns += cpu_buffer->overrun;
overruns += local_read(&cpu_buffer->overrun);
}
return overruns;
......@@ -2088,8 +2661,10 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
/* Iterator usage is expected to have record disabled */
if (list_empty(&cpu_buffer->reader_page->list)) {
iter->head_page = cpu_buffer->head_page;
iter->head = cpu_buffer->head_page->read;
iter->head_page = rb_set_head_page(cpu_buffer);
if (unlikely(!iter->head_page))
return;
iter->head = iter->head_page->read;
} else {
iter->head_page = cpu_buffer->reader_page;
iter->head = cpu_buffer->reader_page->read;
......@@ -2206,6 +2781,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
struct buffer_page *reader = NULL;
unsigned long flags;
int nr_loops = 0;
int ret;
local_irq_save(flags);
__raw_spin_lock(&cpu_buffer->lock);
......@@ -2239,30 +2815,56 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
goto out;
/*
* Splice the empty reader page into the list around the head.
* Reset the reader page to size zero.
*/
local_set(&cpu_buffer->reader_page->write, 0);
local_set(&cpu_buffer->reader_page->entries, 0);
local_set(&cpu_buffer->reader_page->page->commit, 0);
reader = cpu_buffer->head_page;
spin:
/*
* Splice the empty reader page into the list around the head.
*/
reader = rb_set_head_page(cpu_buffer);
cpu_buffer->reader_page->list.next = reader->list.next;
cpu_buffer->reader_page->list.prev = reader->list.prev;
local_set(&cpu_buffer->reader_page->write, 0);
local_set(&cpu_buffer->reader_page->entries, 0);
local_set(&cpu_buffer->reader_page->page->commit, 0);
/*
* cpu_buffer->pages just needs to point to the buffer, it
* has no specific buffer page to point to. Lets move it out
* of our way so we don't accidently swap it.
*/
cpu_buffer->pages = reader->list.prev;
/* Make the reader page now replace the head */
reader->list.prev->next = &cpu_buffer->reader_page->list;
reader->list.next->prev = &cpu_buffer->reader_page->list;
/* The reader page will be pointing to the new head */
rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
/*
* Here's the tricky part.
*
* We need to move the pointer past the header page.
* But we can only do that if a writer is not currently
* moving it. The page before the header page has the
* flag bit '1' set if it is pointing to the page we want.
* but if the writer is in the process of moving it
* than it will be '2' or already moved '0'.
*/
ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
/*
* If the tail is on the reader, then we must set the head
* to the inserted page, otherwise we set it one before.
* If we did not convert it, then we must try again.
*/
cpu_buffer->head_page = cpu_buffer->reader_page;
if (!ret)
goto spin;
if (cpu_buffer->commit_page != reader)
rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
/*
* Yeah! We succeeded in replacing the page.
*
* Now make the new head point back to the reader page.
*/
reader->list.next->prev = &cpu_buffer->reader_page->list;
rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
/* Finally update the reader page to the new head */
cpu_buffer->reader_page = reader;
......@@ -2718,8 +3320,10 @@ EXPORT_SYMBOL_GPL(ring_buffer_size);
static void
rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
{
rb_head_page_deactivate(cpu_buffer);
cpu_buffer->head_page
= list_entry(cpu_buffer->pages.next, struct buffer_page, list);
= list_entry(cpu_buffer->pages, struct buffer_page, list);
local_set(&cpu_buffer->head_page->write, 0);
local_set(&cpu_buffer->head_page->entries, 0);
local_set(&cpu_buffer->head_page->page->commit, 0);
......@@ -2735,16 +3339,17 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
local_set(&cpu_buffer->reader_page->page->commit, 0);
cpu_buffer->reader_page->read = 0;
cpu_buffer->nmi_dropped = 0;
cpu_buffer->commit_overrun = 0;
cpu_buffer->overrun = 0;
cpu_buffer->read = 0;
local_set(&cpu_buffer->commit_overrun, 0);
local_set(&cpu_buffer->overrun, 0);
local_set(&cpu_buffer->entries, 0);
local_set(&cpu_buffer->committing, 0);
local_set(&cpu_buffer->commits, 0);
cpu_buffer->read = 0;
cpu_buffer->write_stamp = 0;
cpu_buffer->read_stamp = 0;
rb_head_page_activate(cpu_buffer);
}
/**
......@@ -3092,7 +3697,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,
read = 0;
} else {
/* update the entry counter */
cpu_buffer->read += local_read(&reader->entries);
cpu_buffer->read += rb_page_entries(reader);
/* swap the pages */
rb_init_page(bpage);
......
......@@ -3630,9 +3630,6 @@ tracing_stats_read(struct file *filp, char __user *ubuf,
cnt = ring_buffer_commit_overrun_cpu(tr->buffer, cpu);
trace_seq_printf(s, "commit overrun: %ld\n", cnt);
cnt = ring_buffer_nmi_dropped_cpu(tr->buffer, cpu);
trace_seq_printf(s, "nmi dropped: %ld\n", cnt);
count = simple_read_from_buffer(ubuf, count, ppos, s->buffer, s->len);
kfree(s);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment