Commit dd81dc05 authored by Darrick J. Wong's avatar Darrick J. Wong

Merge tag 'xfs-cil-scale-5.20' of...

Merge tag 'xfs-cil-scale-5.20' of git://git.kernel.org/pub/scm/linux/kernel/git/dgc/linux-xfs into xfs-5.20-mergeA

xfs: improve CIL scalability

This series aims to improve the scalability of XFS transaction
commits on large CPU count machines. My 32p machine hits contention
limits in xlog_cil_commit() at about 700,000 transaction commits a
section. It hits this at 16 thread workloads, and 32 thread
workloads go no faster and just burn CPU on the CIL spinlocks.

This patchset gets rid of spinlocks and global serialisation points
in the xlog_cil_commit() path. It does this by moving to a
combination of per-cpu counters, unordered per-cpu lists and
post-ordered per-cpu lists.

This results in transaction commit rates exceeding 1.4 million
commits/s under unlink certain workloads, and while the log lock
contention is largely gone there is still significant lock
contention in the VFS (dentry cache, inode cache and security layers)
at >600,000 transactions/s that still limit scalability.

The changes to the CIL accounting and behaviour, combined with the
structural changes to xlog_write() in prior patchsets make the
per-cpu restructuring possible and sane. This allows us to move to
precalculated reservation requirements that allow for reservation
stealing to be accounted across multiple CPUs accurately.

That is, instead of trying to account for continuation log opheaders
on a "growth" basis, we pre-calculate how many iclogs we'll need to
write out a maximally sized CIL checkpoint and steal that reserveD
that space one commit at a time until the CIL has a full
reservation. If we ever run a commit when we are already at the hard
limit (because post-throttling) we simply take an extra reservation
from each commit that is run when over the limit. Hence we don't
need to do space usage math in the fast path and so never need to
sum the per-cpu counters in this fast path.

Similarly, per-cpu lists have the problem of ordering - we can't
remove an item from a per-cpu list if we want to move it forward in
the CIL. We solve this problem by using an atomic counter to give
every commit a sequence number that is copied into the log items in
that transaction. Hence relogging items just overwrites the sequence
number in the log item, and does not move it in the per-cpu lists.
Once we reaggregate the per-cpu lists back into a single list in the
CIL push work, we can run it through list-sort() and reorder it back
into a globally ordered list. This costs a bit of CPU time, but now
that the CIL can run multiple works and pipelines properly, this is
not a limiting factor for performance. It does increase fsync
latency when the CIL is full, but workloads issuing large numbers of
fsync()s or sync transactions end up with very small CILs and so the
latency impact or sorting is not measurable for such workloads.

OVerall, this pushes the transaction commit bottleneck out to the
lockless reservation grant head updates. These atomic updates don't
start to be a limiting fact until > 1.5 million transactions/s are
being run, at which point the accounting functions start to show up
in profiles as the highest CPU users. Still, this series doubles
transaction throughput without increasing CPU usage before we get
to that cacheline contention breakdown point...
`
Signed-off-by: default avatarDave Chinner <dchinner@redhat.com>
Signed-off-by: default avatarDarrick J. Wong <djwong@kernel.org>

* tag 'xfs-cil-scale-5.20' of git://git.kernel.org/pub/scm/linux/kernel/git/dgc/linux-xfs:
  xfs: expanding delayed logging design with background material
  xfs: xlog_sync() manually adjusts grant head space
  xfs: avoid cil push lock if possible
  xfs: move CIL ordering to the logvec chain
  xfs: convert log vector chain to use list heads
  xfs: convert CIL to unordered per cpu lists
  xfs: Add order IDs to log items in CIL
  xfs: convert CIL busy extents to per-cpu
  xfs: track CIL ticket reservation in percpu structure
  xfs: implement percpu cil space used calculation
  xfs: introduce per-cpu CIL tracking structure
  xfs: rework per-iclog header CIL reservation
  xfs: lift init CIL reservation out of xc_cil_lock
  xfs: use the CIL space used counter for emptiness checks
parents 88084a3d 51a117ed
.. SPDX-License-Identifier: GPL-2.0 .. SPDX-License-Identifier: GPL-2.0
========================== ==================
XFS Delayed Logging Design XFS Logging Design
========================== ==================
Introduction to Re-logging in XFS Preamble
================================= ========
XFS logging is a combination of logical and physical logging. Some objects, This document describes the design and algorithms that the XFS journalling
such as inodes and dquots, are logged in logical format where the details subsystem is based on. This document describes the design and algorithms that
logged are made up of the changes to in-core structures rather than on-disk the XFS journalling subsystem is based on so that readers may familiarize
structures. Other objects - typically buffers - have their physical changes themselves with the general concepts of how transaction processing in XFS works.
logged. The reason for these differences is to reduce the amount of log space
required for objects that are frequently logged. Some parts of inodes are more We begin with an overview of transactions in XFS, followed by describing how
frequently logged than others, and inodes are typically more frequently logged transaction reservations are structured and accounted, and then move into how we
than any other object (except maybe the superblock buffer) so keeping the guarantee forwards progress for long running transactions with finite initial
amount of metadata logged low is of prime importance. reservations bounds. At this point we need to explain how relogging works. With
the basic concepts covered, the design of the delayed logging mechanism is
The reason that this is such a concern is that XFS allows multiple separate documented.
modifications to a single object to be carried in the log at any given time.
This allows the log to avoid needing to flush each change to disk before
recording a new change to the object. XFS does this via a method called Introduction
"re-logging". Conceptually, this is quite simple - all it requires is that any ============
new change to the object is recorded with a *new copy* of all the existing
changes in the new transaction that is written to the log. XFS uses Write Ahead Logging for ensuring changes to the filesystem metadata
are atomic and recoverable. For reasons of space and time efficiency, the
logging mechanisms are varied and complex, combining intents, logical and
physical logging mechanisms to provide the necessary recovery guarantees the
filesystem requires.
Some objects, such as inodes and dquots, are logged in logical format where the
details logged are made up of the changes to in-core structures rather than
on-disk structures. Other objects - typically buffers - have their physical
changes logged. Long running atomic modifications have individual changes
chained together by intents, ensuring that journal recovery can restart and
finish an operation that was only partially done when the system stopped
functioning.
The reason for these differences is to keep the amount of log space and CPU time
required to process objects being modified as small as possible and hence the
logging overhead as low as possible. Some items are very frequently modified,
and some parts of objects are more frequently modified than others, so keeping
the overhead of metadata logging low is of prime importance.
The method used to log an item or chain modifications together isn't
particularly important in the scope of this document. It suffices to know that
the method used for logging a particular object or chaining modifications
together are different and are dependent on the object and/or modification being
performed. The logging subsystem only cares that certain specific rules are
followed to guarantee forwards progress and prevent deadlocks.
Transactions in XFS
===================
XFS has two types of high level transactions, defined by the type of log space
reservation they take. These are known as "one shot" and "permanent"
transactions. Permanent transaction reservations can take reservations that span
commit boundaries, whilst "one shot" transactions are for a single atomic
modification.
The type and size of reservation must be matched to the modification taking
place. This means that permanent transactions can be used for one-shot
modifications, but one-shot reservations cannot be used for permanent
transactions.
In the code, a one-shot transaction pattern looks somewhat like this::
tp = xfs_trans_alloc(<reservation>)
<lock items>
<join item to transaction>
<do modification>
xfs_trans_commit(tp);
As items are modified in the transaction, the dirty regions in those items are
tracked via the transaction handle. Once the transaction is committed, all
resources joined to it are released, along with the remaining unused reservation
space that was taken at the transaction allocation time.
In contrast, a permanent transaction is made up of multiple linked individual
transactions, and the pattern looks like this::
tp = xfs_trans_alloc(<reservation>)
xfs_ilock(ip, XFS_ILOCK_EXCL)
loop {
xfs_trans_ijoin(tp, 0);
<do modification>
xfs_trans_log_inode(tp, ip);
xfs_trans_roll(&tp);
}
xfs_trans_commit(tp);
xfs_iunlock(ip, XFS_ILOCK_EXCL);
While this might look similar to a one-shot transaction, there is an important
difference: xfs_trans_roll() performs a specific operation that links two
transactions together::
ntp = xfs_trans_dup(tp);
xfs_trans_commit(tp);
xfs_log_reserve(ntp);
This results in a series of "rolling transactions" where the inode is locked
across the entire chain of transactions. Hence while this series of rolling
transactions is running, nothing else can read from or write to the inode and
this provides a mechanism for complex changes to appear atomic from an external
observer's point of view.
It is important to note that a series of rolling transactions in a permanent
transaction does not form an atomic change in the journal. While each
individual modification is atomic, the chain is *not atomic*. If we crash half
way through, then recovery will only replay up to the last transactional
modification the loop made that was committed to the journal.
This affects long running permanent transactions in that it is not possible to
predict how much of a long running operation will actually be recovered because
there is no guarantee of how much of the operation reached stale storage. Hence
if a long running operation requires multiple transactions to fully complete,
the high level operation must use intents and deferred operations to guarantee
recovery can complete the operation once the first transactions is persisted in
the on-disk journal.
Transactions are Asynchronous
=============================
In XFS, all high level transactions are asynchronous by default. This means that
xfs_trans_commit() does not guarantee that the modification has been committed
to stable storage when it returns. Hence when a system crashes, not all the
completed transactions will be replayed during recovery.
However, the logging subsystem does provide global ordering guarantees, such
that if a specific change is seen after recovery, all metadata modifications
that were committed prior to that change will also be seen.
For single shot operations that need to reach stable storage immediately, or
ensuring that a long running permanent transaction is fully committed once it is
complete, we can explicitly tag a transaction as synchronous. This will trigger
a "log force" to flush the outstanding committed transactions to stable storage
in the journal and wait for that to complete.
Synchronous transactions are rarely used, however, because they limit logging
throughput to the IO latency limitations of the underlying storage. Instead, we
tend to use log forces to ensure modifications are on stable storage only when
a user operation requires a synchronisation point to occur (e.g. fsync).
Transaction Reservations
========================
It has been mentioned a number of times now that the logging subsystem needs to
provide a forwards progress guarantee so that no modification ever stalls
because it can't be written to the journal due to a lack of space in the
journal. This is achieved by the transaction reservations that are made when
a transaction is first allocated. For permanent transactions, these reservations
are maintained as part of the transaction rolling mechanism.
A transaction reservation provides a guarantee that there is physical log space
available to write the modification into the journal before we start making
modifications to objects and items. As such, the reservation needs to be large
enough to take into account the amount of metadata that the change might need to
log in the worst case. This means that if we are modifying a btree in the
transaction, we have to reserve enough space to record a full leaf-to-root split
of the btree. As such, the reservations are quite complex because we have to
take into account all the hidden changes that might occur.
For example, a user data extent allocation involves allocating an extent from
free space, which modifies the free space trees. That's two btrees. Inserting
the extent into the inode's extent map might require a split of the extent map
btree, which requires another allocation that can modify the free space trees
again. Then we might have to update reverse mappings, which modifies yet
another btree which might require more space. And so on. Hence the amount of
metadata that a "simple" operation can modify can be quite large.
This "worst case" calculation provides us with the static "unit reservation"
for the transaction that is calculated at mount time. We must guarantee that the
log has this much space available before the transaction is allowed to proceed
so that when we come to write the dirty metadata into the log we don't run out
of log space half way through the write.
For one-shot transactions, a single unit space reservation is all that is
required for the transaction to proceed. For permanent transactions, however, we
also have a "log count" that affects the size of the reservation that is to be
made.
While a permanent transaction can get by with a single unit of space
reservation, it is somewhat inefficient to do this as it requires the
transaction rolling mechanism to re-reserve space on every transaction roll. We
know from the implementation of the permanent transactions how many transaction
rolls are likely for the common modifications that need to be made.
For example, and inode allocation is typically two transactions - one to
physically allocate a free inode chunk on disk, and another to allocate an inode
from an inode chunk that has free inodes in it. Hence for an inode allocation
transaction, we might set the reservation log count to a value of 2 to indicate
that the common/fast path transaction will commit two linked transactions in a
chain. Each time a permanent transaction rolls, it consumes an entire unit
reservation.
Hence when the permanent transaction is first allocated, the log space
reservation is increases from a single unit reservation to multiple unit
reservations. That multiple is defined by the reservation log count, and this
means we can roll the transaction multiple times before we have to re-reserve
log space when we roll the transaction. This ensures that the common
modifications we make only need to reserve log space once.
If the log count for a permanent transaction reaches zero, then it needs to
re-reserve physical space in the log. This is somewhat complex, and requires
an understanding of how the log accounts for space that has been reserved.
Log Space Accounting
====================
The position in the log is typically referred to as a Log Sequence Number (LSN).
The log is circular, so the positions in the log are defined by the combination
of a cycle number - the number of times the log has been overwritten - and the
offset into the log. A LSN carries the cycle in the upper 32 bits and the
offset in the lower 32 bits. The offset is in units of "basic blocks" (512
bytes). Hence we can do realtively simple LSN based math to keep track of
available space in the log.
Log space accounting is done via a pair of constructs called "grant heads". The
position of the grant heads is an absolute value, so the amount of space
available in the log is defined by the distance between the position of the
grant head and the current log tail. That is, how much space can be
reserved/consumed before the grant heads would fully wrap the log and overtake
the tail position.
The first grant head is the "reserve" head. This tracks the byte count of the
reservations currently held by active transactions. It is a purely in-memory
accounting of the space reservation and, as such, actually tracks byte offsets
into the log rather than basic blocks. Hence it technically isn't using LSNs to
represent the log position, but it is still treated like a split {cycle,offset}
tuple for the purposes of tracking reservation space.
The reserve grant head is used to accurately account for exact transaction
reservations amounts and the exact byte count that modifications actually make
and need to write into the log. The reserve head is used to prevent new
transactions from taking new reservations when the head reaches the current
tail. It will block new reservations in a FIFO queue and as the log tail moves
forward it will wake them in order once sufficient space is available. This FIFO
mechanism ensures no transaction is starved of resources when log space
shortages occur.
The other grant head is the "write" head. Unlike the reserve head, this grant
head contains an LSN and it tracks the physical space usage in the log. While
this might sound like it is accounting the same state as the reserve grant head
- and it mostly does track exactly the same location as the reserve grant head -
there are critical differences in behaviour between them that provides the
forwards progress guarantees that rolling permanent transactions require.
These differences when a permanent transaction is rolled and the internal "log
count" reaches zero and the initial set of unit reservations have been
exhausted. At this point, we still require a log space reservation to continue
the next transaction in the sequeunce, but we have none remaining. We cannot
sleep during the transaction commit process waiting for new log space to become
available, as we may end up on the end of the FIFO queue and the items we have
locked while we sleep could end up pinning the tail of the log before there is
enough free space in the log to fulfil all of the pending reservations and
then wake up transaction commit in progress.
To take a new reservation without sleeping requires us to be able to take a
reservation even if there is no reservation space currently available. That is,
we need to be able to *overcommit* the log reservation space. As has already
been detailed, we cannot overcommit physical log space. However, the reserve
grant head does not track physical space - it only accounts for the amount of
reservations we currently have outstanding. Hence if the reserve head passes
over the tail of the log all it means is that new reservations will be throttled
immediately and remain throttled until the log tail is moved forward far enough
to remove the overcommit and start taking new reservations. In other words, we
can overcommit the reserve head without violating the physical log head and tail
rules.
As a result, permanent transactions only "regrant" reservation space during
xfs_trans_commit() calls, while the physical log space reservation - tracked by
the write head - is then reserved separately by a call to xfs_log_reserve()
after the commit completes. Once the commit completes, we can sleep waiting for
physical log space to be reserved from the write grant head, but only if one
critical rule has been observed::
Code using permanent reservations must always log the items they hold
locked across each transaction they roll in the chain.
"Re-logging" the locked items on every transaction roll ensures that the items
attached to the transaction chain being rolled are always relocated to the
physical head of the log and so do not pin the tail of the log. If a locked item
pins the tail of the log when we sleep on the write reservation, then we will
deadlock the log as we cannot take the locks needed to write back that item and
move the tail of the log forwards to free up write grant space. Re-logging the
locked items avoids this deadlock and guarantees that the log reservation we are
making cannot self-deadlock.
If all rolling transactions obey this rule, then they can all make forwards
progress independently because nothing will block the progress of the log
tail moving forwards and hence ensuring that write grant space is always
(eventually) made available to permanent transactions no matter how many times
they roll.
Re-logging Explained
====================
XFS allows multiple separate modifications to a single object to be carried in
the log at any given time. This allows the log to avoid needing to flush each
change to disk before recording a new change to the object. XFS does this via a
method called "re-logging". Conceptually, this is quite simple - all it requires
is that any new change to the object is recorded with a *new copy* of all the
existing changes in the new transaction that is written to the log.
That is, if we have a sequence of changes A through to F, and the object was That is, if we have a sequence of changes A through to F, and the object was
written to disk after change D, we would see in the log the following series written to disk after change D, we would see in the log the following series
...@@ -42,16 +327,13 @@ transaction:: ...@@ -42,16 +327,13 @@ transaction::
In other words, each time an object is relogged, the new transaction contains In other words, each time an object is relogged, the new transaction contains
the aggregation of all the previous changes currently held only in the log. the aggregation of all the previous changes currently held only in the log.
This relogging technique also allows objects to be moved forward in the log so This relogging technique allows objects to be moved forward in the log so that
that an object being relogged does not prevent the tail of the log from ever an object being relogged does not prevent the tail of the log from ever moving
moving forward. This can be seen in the table above by the changing forward. This can be seen in the table above by the changing (increasing) LSN
(increasing) LSN of each subsequent transaction - the LSN is effectively a of each subsequent transaction, and it's the technique that allows us to
direct encoding of the location in the log of the transaction. implement long-running, multiple-commit permanent transactions.
This relogging is also used to implement long-running, multiple-commit A typical example of a rolling transaction is the removal of extents from an
transactions. These transaction are known as rolling transactions, and require
a special log reservation known as a permanent transaction reservation. A
typical example of a rolling transaction is the removal of extents from an
inode which can only be done at a rate of two extents per transaction because inode which can only be done at a rate of two extents per transaction because
of reservation size limitations. Hence a rolling extent removal transaction of reservation size limitations. Hence a rolling extent removal transaction
keeps relogging the inode and btree buffers as they get modified in each keeps relogging the inode and btree buffers as they get modified in each
...@@ -67,12 +349,13 @@ the log over and over again. Worse is the fact that objects tend to get ...@@ -67,12 +349,13 @@ the log over and over again. Worse is the fact that objects tend to get
dirtier as they get relogged, so each subsequent transaction is writing more dirtier as they get relogged, so each subsequent transaction is writing more
metadata into the log. metadata into the log.
Another feature of the XFS transaction subsystem is that most transactions are It should now also be obvious how relogging and asynchronous transactions go
asynchronous. That is, they don't commit to disk until either a log buffer is hand in hand. That is, transactions don't get written to the physical journal
filled (a log buffer can hold multiple transactions) or a synchronous operation until either a log buffer is filled (a log buffer can hold multiple
forces the log buffers holding the transactions to disk. This means that XFS is transactions) or a synchronous operation forces the log buffers holding the
doing aggregation of transactions in memory - batching them, if you like - to transactions to disk. This means that XFS is doing aggregation of transactions
minimise the impact of the log IO on transaction throughput. in memory - batching them, if you like - to minimise the impact of the log IO on
transaction throughput.
The limitation on asynchronous transaction throughput is the number and size of The limitation on asynchronous transaction throughput is the number and size of
log buffers made available by the log manager. By default there are 8 log log buffers made available by the log manager. By default there are 8 log
......
...@@ -57,7 +57,8 @@ xlog_grant_push_ail( ...@@ -57,7 +57,8 @@ xlog_grant_push_ail(
STATIC void STATIC void
xlog_sync( xlog_sync(
struct xlog *log, struct xlog *log,
struct xlog_in_core *iclog); struct xlog_in_core *iclog,
struct xlog_ticket *ticket);
#if defined(DEBUG) #if defined(DEBUG)
STATIC void STATIC void
xlog_verify_grant_tail( xlog_verify_grant_tail(
...@@ -567,7 +568,8 @@ xlog_state_shutdown_callbacks( ...@@ -567,7 +568,8 @@ xlog_state_shutdown_callbacks(
int int
xlog_state_release_iclog( xlog_state_release_iclog(
struct xlog *log, struct xlog *log,
struct xlog_in_core *iclog) struct xlog_in_core *iclog,
struct xlog_ticket *ticket)
{ {
xfs_lsn_t tail_lsn; xfs_lsn_t tail_lsn;
bool last_ref; bool last_ref;
...@@ -614,7 +616,7 @@ xlog_state_release_iclog( ...@@ -614,7 +616,7 @@ xlog_state_release_iclog(
trace_xlog_iclog_syncing(iclog, _RET_IP_); trace_xlog_iclog_syncing(iclog, _RET_IP_);
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
xlog_sync(log, iclog); xlog_sync(log, iclog, ticket);
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
return 0; return 0;
} }
...@@ -881,7 +883,7 @@ xlog_force_iclog( ...@@ -881,7 +883,7 @@ xlog_force_iclog(
iclog->ic_flags |= XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA; iclog->ic_flags |= XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA;
if (iclog->ic_state == XLOG_STATE_ACTIVE) if (iclog->ic_state == XLOG_STATE_ACTIVE)
xlog_state_switch_iclogs(iclog->ic_log, iclog, 0); xlog_state_switch_iclogs(iclog->ic_log, iclog, 0);
return xlog_state_release_iclog(iclog->ic_log, iclog); return xlog_state_release_iclog(iclog->ic_log, iclog, NULL);
} }
/* /*
...@@ -944,6 +946,8 @@ xlog_write_unmount_record( ...@@ -944,6 +946,8 @@ xlog_write_unmount_record(
.lv_niovecs = 1, .lv_niovecs = 1,
.lv_iovecp = &reg, .lv_iovecp = &reg,
}; };
LIST_HEAD(lv_chain);
list_add(&vec.lv_list, &lv_chain);
BUILD_BUG_ON((sizeof(struct xlog_op_header) + BUILD_BUG_ON((sizeof(struct xlog_op_header) +
sizeof(struct xfs_unmount_log_format)) != sizeof(struct xfs_unmount_log_format)) !=
...@@ -952,7 +956,7 @@ xlog_write_unmount_record( ...@@ -952,7 +956,7 @@ xlog_write_unmount_record(
/* account for space used by record data */ /* account for space used by record data */
ticket->t_curr_res -= sizeof(unmount_rec); ticket->t_curr_res -= sizeof(unmount_rec);
return xlog_write(log, NULL, &vec, ticket, reg.i_len); return xlog_write(log, NULL, &lv_chain, ticket, reg.i_len);
} }
/* /*
...@@ -2025,7 +2029,8 @@ xlog_calc_iclog_size( ...@@ -2025,7 +2029,8 @@ xlog_calc_iclog_size(
STATIC void STATIC void
xlog_sync( xlog_sync(
struct xlog *log, struct xlog *log,
struct xlog_in_core *iclog) struct xlog_in_core *iclog,
struct xlog_ticket *ticket)
{ {
unsigned int count; /* byte count of bwrite */ unsigned int count; /* byte count of bwrite */
unsigned int roundoff; /* roundoff to BB or stripe */ unsigned int roundoff; /* roundoff to BB or stripe */
...@@ -2037,12 +2042,20 @@ xlog_sync( ...@@ -2037,12 +2042,20 @@ xlog_sync(
count = xlog_calc_iclog_size(log, iclog, &roundoff); count = xlog_calc_iclog_size(log, iclog, &roundoff);
/* move grant heads by roundoff in sync */ /*
xlog_grant_add_space(log, &log->l_reserve_head.grant, roundoff); * If we have a ticket, account for the roundoff via the ticket
xlog_grant_add_space(log, &log->l_write_head.grant, roundoff); * reservation to avoid touching the hot grant heads needlessly.
* Otherwise, we have to move grant heads directly.
*/
if (ticket) {
ticket->t_curr_res -= roundoff;
} else {
xlog_grant_add_space(log, &log->l_reserve_head.grant, roundoff);
xlog_grant_add_space(log, &log->l_write_head.grant, roundoff);
}
/* put cycle number in every block */ /* put cycle number in every block */
xlog_pack_data(log, iclog, roundoff); xlog_pack_data(log, iclog, roundoff);
/* real byte length */ /* real byte length */
size = iclog->ic_offset; size = iclog->ic_offset;
...@@ -2275,7 +2288,7 @@ xlog_write_get_more_iclog_space( ...@@ -2275,7 +2288,7 @@ xlog_write_get_more_iclog_space(
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC); ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC);
xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt); xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt);
error = xlog_state_release_iclog(log, iclog); error = xlog_state_release_iclog(log, iclog, ticket);
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
if (error) if (error)
return error; return error;
...@@ -2471,13 +2484,13 @@ int ...@@ -2471,13 +2484,13 @@ int
xlog_write( xlog_write(
struct xlog *log, struct xlog *log,
struct xfs_cil_ctx *ctx, struct xfs_cil_ctx *ctx,
struct xfs_log_vec *log_vector, struct list_head *lv_chain,
struct xlog_ticket *ticket, struct xlog_ticket *ticket,
uint32_t len) uint32_t len)
{ {
struct xlog_in_core *iclog = NULL; struct xlog_in_core *iclog = NULL;
struct xfs_log_vec *lv = log_vector; struct xfs_log_vec *lv;
uint32_t record_cnt = 0; uint32_t record_cnt = 0;
uint32_t data_cnt = 0; uint32_t data_cnt = 0;
int error = 0; int error = 0;
...@@ -2505,7 +2518,7 @@ xlog_write( ...@@ -2505,7 +2518,7 @@ xlog_write(
if (ctx) if (ctx)
xlog_cil_set_ctx_write_state(ctx, iclog); xlog_cil_set_ctx_write_state(ctx, iclog);
while (lv) { list_for_each_entry(lv, lv_chain, lv_list) {
/* /*
* If the entire log vec does not fit in the iclog, punt it to * If the entire log vec does not fit in the iclog, punt it to
* the partial copy loop which can handle this case. * the partial copy loop which can handle this case.
...@@ -2526,7 +2539,6 @@ xlog_write( ...@@ -2526,7 +2539,6 @@ xlog_write(
xlog_write_full(lv, ticket, iclog, &log_offset, xlog_write_full(lv, ticket, iclog, &log_offset,
&len, &record_cnt, &data_cnt); &len, &record_cnt, &data_cnt);
} }
lv = lv->lv_next;
} }
ASSERT(len == 0); ASSERT(len == 0);
...@@ -2538,7 +2550,7 @@ xlog_write( ...@@ -2538,7 +2550,7 @@ xlog_write(
*/ */
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
xlog_state_finish_copy(log, iclog, record_cnt, 0); xlog_state_finish_copy(log, iclog, record_cnt, 0);
error = xlog_state_release_iclog(log, iclog); error = xlog_state_release_iclog(log, iclog, ticket);
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
return error; return error;
...@@ -2958,7 +2970,7 @@ xlog_state_get_iclog_space( ...@@ -2958,7 +2970,7 @@ xlog_state_get_iclog_space(
* reference to the iclog. * reference to the iclog.
*/ */
if (!atomic_add_unless(&iclog->ic_refcnt, -1, 1)) if (!atomic_add_unless(&iclog->ic_refcnt, -1, 1))
error = xlog_state_release_iclog(log, iclog); error = xlog_state_release_iclog(log, iclog, ticket);
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
if (error) if (error)
return error; return error;
...@@ -3406,7 +3418,8 @@ xfs_log_ticket_get( ...@@ -3406,7 +3418,8 @@ xfs_log_ticket_get(
static int static int
xlog_calc_unit_res( xlog_calc_unit_res(
struct xlog *log, struct xlog *log,
int unit_bytes) int unit_bytes,
int *niclogs)
{ {
int iclog_space; int iclog_space;
uint num_headers; uint num_headers;
...@@ -3486,6 +3499,8 @@ xlog_calc_unit_res( ...@@ -3486,6 +3499,8 @@ xlog_calc_unit_res(
/* roundoff padding for transaction data and one for commit record */ /* roundoff padding for transaction data and one for commit record */
unit_bytes += 2 * log->l_iclog_roundoff; unit_bytes += 2 * log->l_iclog_roundoff;
if (niclogs)
*niclogs = num_headers;
return unit_bytes; return unit_bytes;
} }
...@@ -3494,7 +3509,7 @@ xfs_log_calc_unit_res( ...@@ -3494,7 +3509,7 @@ xfs_log_calc_unit_res(
struct xfs_mount *mp, struct xfs_mount *mp,
int unit_bytes) int unit_bytes)
{ {
return xlog_calc_unit_res(mp->m_log, unit_bytes); return xlog_calc_unit_res(mp->m_log, unit_bytes, NULL);
} }
/* /*
...@@ -3512,7 +3527,7 @@ xlog_ticket_alloc( ...@@ -3512,7 +3527,7 @@ xlog_ticket_alloc(
tic = kmem_cache_zalloc(xfs_log_ticket_cache, GFP_NOFS | __GFP_NOFAIL); tic = kmem_cache_zalloc(xfs_log_ticket_cache, GFP_NOFS | __GFP_NOFAIL);
unit_res = xlog_calc_unit_res(log, unit_bytes); unit_res = xlog_calc_unit_res(log, unit_bytes, &tic->t_iclog_hdrs);
atomic_set(&tic->t_ref, 1); atomic_set(&tic->t_ref, 1);
tic->t_task = current; tic->t_task = current;
......
...@@ -9,7 +9,8 @@ ...@@ -9,7 +9,8 @@
struct xfs_cil_ctx; struct xfs_cil_ctx;
struct xfs_log_vec { struct xfs_log_vec {
struct xfs_log_vec *lv_next; /* next lv in build list */ struct list_head lv_list; /* CIL lv chain ptrs */
uint32_t lv_order_id; /* chain ordering info */
int lv_niovecs; /* number of iovecs in lv */ int lv_niovecs; /* number of iovecs in lv */
struct xfs_log_iovec *lv_iovecp; /* iovec array */ struct xfs_log_iovec *lv_iovecp; /* iovec array */
struct xfs_log_item *lv_item; /* owner */ struct xfs_log_item *lv_item; /* owner */
......
...@@ -44,9 +44,20 @@ xlog_cil_ticket_alloc( ...@@ -44,9 +44,20 @@ xlog_cil_ticket_alloc(
* transaction overhead reservation from the first transaction commit. * transaction overhead reservation from the first transaction commit.
*/ */
tic->t_curr_res = 0; tic->t_curr_res = 0;
tic->t_iclog_hdrs = 0;
return tic; return tic;
} }
static inline void
xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil)
{
struct xlog *log = cil->xc_log;
atomic_set(&cil->xc_iclog_hdrs,
(XLOG_CIL_BLOCKING_SPACE_LIMIT(log) /
(log->l_iclog_size - log->l_iclog_hsize)));
}
/* /*
* Check if the current log item was first committed in this sequence. * Check if the current log item was first committed in this sequence.
* We can't rely on just the log item being in the CIL, we have to check * We can't rely on just the log item being in the CIL, we have to check
...@@ -61,7 +72,7 @@ xlog_item_in_current_chkpt( ...@@ -61,7 +72,7 @@ xlog_item_in_current_chkpt(
struct xfs_cil *cil, struct xfs_cil *cil,
struct xfs_log_item *lip) struct xfs_log_item *lip)
{ {
if (list_empty(&lip->li_cil)) if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
return false; return false;
/* /*
...@@ -93,15 +104,88 @@ xlog_cil_ctx_alloc(void) ...@@ -93,15 +104,88 @@ xlog_cil_ctx_alloc(void)
ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS); ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
INIT_LIST_HEAD(&ctx->committing); INIT_LIST_HEAD(&ctx->committing);
INIT_LIST_HEAD(&ctx->busy_extents); INIT_LIST_HEAD(&ctx->busy_extents);
INIT_LIST_HEAD(&ctx->log_items);
INIT_LIST_HEAD(&ctx->lv_chain);
INIT_WORK(&ctx->push_work, xlog_cil_push_work); INIT_WORK(&ctx->push_work, xlog_cil_push_work);
return ctx; return ctx;
} }
/*
* Aggregate the CIL per cpu structures into global counts, lists, etc and
* clear the percpu state ready for the next context to use. This is called
* from the push code with the context lock held exclusively, hence nothing else
* will be accessing or modifying the per-cpu counters.
*/
static void
xlog_cil_push_pcp_aggregate(
struct xfs_cil *cil,
struct xfs_cil_ctx *ctx)
{
struct xlog_cil_pcp *cilpcp;
int cpu;
for_each_online_cpu(cpu) {
cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
ctx->ticket->t_curr_res += cilpcp->space_reserved;
cilpcp->space_reserved = 0;
if (!list_empty(&cilpcp->busy_extents)) {
list_splice_init(&cilpcp->busy_extents,
&ctx->busy_extents);
}
if (!list_empty(&cilpcp->log_items))
list_splice_init(&cilpcp->log_items, &ctx->log_items);
/*
* We're in the middle of switching cil contexts. Reset the
* counter we use to detect when the current context is nearing
* full.
*/
cilpcp->space_used = 0;
}
}
/*
* Aggregate the CIL per-cpu space used counters into the global atomic value.
* This is called when the per-cpu counter aggregation will first pass the soft
* limit threshold so we can switch to atomic counter aggregation for accurate
* detection of hard limit traversal.
*/
static void
xlog_cil_insert_pcp_aggregate(
struct xfs_cil *cil,
struct xfs_cil_ctx *ctx)
{
struct xlog_cil_pcp *cilpcp;
int cpu;
int count = 0;
/* Trigger atomic updates then aggregate only for the first caller */
if (!test_and_clear_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags))
return;
for_each_online_cpu(cpu) {
int old, prev;
cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
do {
old = cilpcp->space_used;
prev = cmpxchg(&cilpcp->space_used, old, 0);
} while (old != prev);
count += old;
}
atomic_add(count, &ctx->space_used);
}
static void static void
xlog_cil_ctx_switch( xlog_cil_ctx_switch(
struct xfs_cil *cil, struct xfs_cil *cil,
struct xfs_cil_ctx *ctx) struct xfs_cil_ctx *ctx)
{ {
xlog_cil_set_iclog_hdr_count(cil);
set_bit(XLOG_CIL_EMPTY, &cil->xc_flags);
set_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags);
ctx->sequence = ++cil->xc_current_sequence; ctx->sequence = ++cil->xc_current_sequence;
ctx->cil = cil; ctx->cil = cil;
cil->xc_ctx = ctx; cil->xc_ctx = ctx;
...@@ -123,6 +207,7 @@ xlog_cil_init_post_recovery( ...@@ -123,6 +207,7 @@ xlog_cil_init_post_recovery(
{ {
log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log); log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
log->l_cilp->xc_ctx->sequence = 1; log->l_cilp->xc_ctx->sequence = 1;
xlog_cil_set_iclog_hdr_count(log->l_cilp);
} }
static inline int static inline int
...@@ -254,6 +339,7 @@ xlog_cil_alloc_shadow_bufs( ...@@ -254,6 +339,7 @@ xlog_cil_alloc_shadow_bufs(
memset(lv, 0, xlog_cil_iovec_space(niovecs)); memset(lv, 0, xlog_cil_iovec_space(niovecs));
INIT_LIST_HEAD(&lv->lv_list);
lv->lv_item = lip; lv->lv_item = lip;
lv->lv_size = buf_size; lv->lv_size = buf_size;
if (ordered) if (ordered)
...@@ -269,7 +355,6 @@ xlog_cil_alloc_shadow_bufs( ...@@ -269,7 +355,6 @@ xlog_cil_alloc_shadow_bufs(
else else
lv->lv_buf_len = 0; lv->lv_buf_len = 0;
lv->lv_bytes = 0; lv->lv_bytes = 0;
lv->lv_next = NULL;
} }
/* Ensure the lv is set up according to ->iop_size */ /* Ensure the lv is set up according to ->iop_size */
...@@ -396,7 +481,6 @@ xlog_cil_insert_format_items( ...@@ -396,7 +481,6 @@ xlog_cil_insert_format_items(
if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) { if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
/* same or smaller, optimise common overwrite case */ /* same or smaller, optimise common overwrite case */
lv = lip->li_lv; lv = lip->li_lv;
lv->lv_next = NULL;
if (ordered) if (ordered)
goto insert; goto insert;
...@@ -433,6 +517,23 @@ xlog_cil_insert_format_items( ...@@ -433,6 +517,23 @@ xlog_cil_insert_format_items(
} }
} }
/*
* The use of lockless waitqueue_active() requires that the caller has
* serialised itself against the wakeup call in xlog_cil_push_work(). That
* can be done by either holding the push lock or the context lock.
*/
static inline bool
xlog_cil_over_hard_limit(
struct xlog *log,
int32_t space_used)
{
if (waitqueue_active(&log->l_cilp->xc_push_wait))
return true;
if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log))
return true;
return false;
}
/* /*
* Insert the log items into the CIL and calculate the difference in space * Insert the log items into the CIL and calculate the difference in space
* consumed by the item. Add the space to the checkpoint ticket and calculate * consumed by the item. Add the space to the checkpoint ticket and calculate
...@@ -450,8 +551,10 @@ xlog_cil_insert_items( ...@@ -450,8 +551,10 @@ xlog_cil_insert_items(
struct xfs_cil_ctx *ctx = cil->xc_ctx; struct xfs_cil_ctx *ctx = cil->xc_ctx;
struct xfs_log_item *lip; struct xfs_log_item *lip;
int len = 0; int len = 0;
int iclog_space;
int iovhdr_res = 0, split_res = 0, ctx_res = 0; int iovhdr_res = 0, split_res = 0, ctx_res = 0;
int space_used;
int order;
struct xlog_cil_pcp *cilpcp;
ASSERT(tp); ASSERT(tp);
...@@ -461,93 +564,135 @@ xlog_cil_insert_items( ...@@ -461,93 +564,135 @@ xlog_cil_insert_items(
*/ */
xlog_cil_insert_format_items(log, tp, &len); xlog_cil_insert_format_items(log, tp, &len);
spin_lock(&cil->xc_cil_lock); /*
* Subtract the space released by intent cancelation from the space we
* consumed so that we remove it from the CIL space and add it back to
* the current transaction reservation context.
*/
len -= released_space;
/* attach the transaction to the CIL if it has any busy extents */ /*
if (!list_empty(&tp->t_busy)) * Grab the per-cpu pointer for the CIL before we start any accounting.
list_splice_init(&tp->t_busy, &ctx->busy_extents); * That ensures that we are running with pre-emption disabled and so we
* can't be scheduled away between split sample/update operations that
* are done without outside locking to serialise them.
*/
cilpcp = get_cpu_ptr(cil->xc_pcp);
/* /*
* Now transfer enough transaction reservation to the context ticket * We need to take the CIL checkpoint unit reservation on the first
* for the checkpoint. The context ticket is special - the unit * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't
* reservation has to grow as well as the current reservation as we * unnecessarily do an atomic op in the fast path here. We can clear the
* steal from tickets so we can correctly determine the space used * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that
* during the transaction commit. * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit.
*/ */
if (ctx->ticket->t_curr_res == 0) { if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) &&
test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
ctx_res = ctx->ticket->t_unit_res; ctx_res = ctx->ticket->t_unit_res;
ctx->ticket->t_curr_res = ctx_res;
tp->t_ticket->t_curr_res -= ctx_res;
}
/* do we need space for more log record headers? */ /*
iclog_space = log->l_iclog_size - log->l_iclog_hsize; * Check if we need to steal iclog headers. atomic_read() is not a
if (len > 0 && (ctx->space_used / iclog_space != * locked atomic operation, so we can check the value before we do any
(ctx->space_used + len) / iclog_space)) { * real atomic ops in the fast path. If we've already taken the CIL unit
split_res = (len + iclog_space - 1) / iclog_space; * reservation from this commit, we've already got one iclog header
/* need to take into account split region headers, too */ * space reserved so we have to account for that otherwise we risk
split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header); * overrunning the reservation on this ticket.
ctx->ticket->t_unit_res += split_res; *
ctx->ticket->t_curr_res += split_res; * If the CIL is already at the hard limit, we might need more header
tp->t_ticket->t_curr_res -= split_res; * space that originally reserved. So steal more header space from every
ASSERT(tp->t_ticket->t_curr_res >= len); * commit that occurs once we are over the hard limit to ensure the CIL
* push won't run out of reservation space.
*
* This can steal more than we need, but that's OK.
*
* The cil->xc_ctx_lock provides the serialisation necessary for safely
* calling xlog_cil_over_hard_limit() in this context.
*/
space_used = atomic_read(&ctx->space_used) + cilpcp->space_used + len;
if (atomic_read(&cil->xc_iclog_hdrs) > 0 ||
xlog_cil_over_hard_limit(log, space_used)) {
split_res = log->l_iclog_hsize +
sizeof(struct xlog_op_header);
if (ctx_res)
ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1);
else
ctx_res = split_res * tp->t_ticket->t_iclog_hdrs;
atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs);
} }
tp->t_ticket->t_curr_res -= len; cilpcp->space_reserved += ctx_res;
tp->t_ticket->t_curr_res += released_space;
ctx->space_used += len;
ctx->space_used -= released_space;
/* /*
* If we've overrun the reservation, dump the tx details before we move * Accurately account when over the soft limit, otherwise fold the
* the log items. Shutdown is imminent... * percpu count into the global count if over the per-cpu threshold.
*/ */
if (WARN_ON(tp->t_ticket->t_curr_res < 0)) { if (!test_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) {
xfs_warn(log->l_mp, "Transaction log reservation overrun:"); atomic_add(len, &ctx->space_used);
xfs_warn(log->l_mp, } else if (cilpcp->space_used + len >
" log items: %d bytes (iov hdrs: %d bytes)", (XLOG_CIL_SPACE_LIMIT(log) / num_online_cpus())) {
len, iovhdr_res); space_used = atomic_add_return(cilpcp->space_used + len,
xfs_warn(log->l_mp, " split region headers: %d bytes", &ctx->space_used);
split_res); cilpcp->space_used = 0;
xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res);
xlog_print_trans(tp); /*
* If we just transitioned over the soft limit, we need to
* transition to the global atomic counter.
*/
if (space_used >= XLOG_CIL_SPACE_LIMIT(log))
xlog_cil_insert_pcp_aggregate(cil, ctx);
} else {
cilpcp->space_used += len;
} }
/* attach the transaction to the CIL if it has any busy extents */
if (!list_empty(&tp->t_busy))
list_splice_init(&tp->t_busy, &cilpcp->busy_extents);
/* /*
* Now (re-)position everything modified at the tail of the CIL. * Now update the order of everything modified in the transaction
* and insert items into the CIL if they aren't already there.
* We do this here so we only need to take the CIL lock once during * We do this here so we only need to take the CIL lock once during
* the transaction commit. * the transaction commit.
*/ */
order = atomic_inc_return(&ctx->order_id);
list_for_each_entry(lip, &tp->t_items, li_trans) { list_for_each_entry(lip, &tp->t_items, li_trans) {
/* Skip items which aren't dirty in this transaction. */ /* Skip items which aren't dirty in this transaction. */
if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
continue; continue;
/* lip->li_order_id = order;
* Only move the item if it isn't already at the tail. This is if (!list_empty(&lip->li_cil))
* to prevent a transient list_empty() state when reinserting continue;
* an item that is already the only item in the CIL. list_add_tail(&lip->li_cil, &cilpcp->log_items);
*/
if (!list_is_last(&lip->li_cil, &cil->xc_cil))
list_move_tail(&lip->li_cil, &cil->xc_cil);
} }
put_cpu_ptr(cilpcp);
spin_unlock(&cil->xc_cil_lock); /*
* If we've overrun the reservation, dump the tx details before we move
if (tp->t_ticket->t_curr_res < 0) * the log items. Shutdown is imminent...
*/
tp->t_ticket->t_curr_res -= ctx_res + len;
if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
xfs_warn(log->l_mp, "Transaction log reservation overrun:");
xfs_warn(log->l_mp,
" log items: %d bytes (iov hdrs: %d bytes)",
len, iovhdr_res);
xfs_warn(log->l_mp, " split region headers: %d bytes",
split_res);
xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res);
xlog_print_trans(tp);
xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
}
} }
static void static void
xlog_cil_free_logvec( xlog_cil_free_logvec(
struct xfs_log_vec *log_vector) struct list_head *lv_chain)
{ {
struct xfs_log_vec *lv; struct xfs_log_vec *lv;
for (lv = log_vector; lv; ) { while (!list_empty(lv_chain)) {
struct xfs_log_vec *next = lv->lv_next; lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list);
list_del_init(&lv->lv_list);
kmem_free(lv); kmem_free(lv);
lv = next;
} }
} }
...@@ -647,7 +792,7 @@ xlog_cil_committed( ...@@ -647,7 +792,7 @@ xlog_cil_committed(
spin_unlock(&ctx->cil->xc_push_lock); spin_unlock(&ctx->cil->xc_push_lock);
} }
xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain, xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, &ctx->lv_chain,
ctx->start_lsn, abort); ctx->start_lsn, abort);
xfs_extent_busy_sort(&ctx->busy_extents); xfs_extent_busy_sort(&ctx->busy_extents);
...@@ -658,7 +803,7 @@ xlog_cil_committed( ...@@ -658,7 +803,7 @@ xlog_cil_committed(
list_del(&ctx->committing); list_del(&ctx->committing);
spin_unlock(&ctx->cil->xc_push_lock); spin_unlock(&ctx->cil->xc_push_lock);
xlog_cil_free_logvec(ctx->lv_chain); xlog_cil_free_logvec(&ctx->lv_chain);
if (!list_empty(&ctx->busy_extents)) if (!list_empty(&ctx->busy_extents))
xlog_discard_busy_extents(mp, ctx); xlog_discard_busy_extents(mp, ctx);
...@@ -817,7 +962,6 @@ xlog_cil_order_write( ...@@ -817,7 +962,6 @@ xlog_cil_order_write(
static int static int
xlog_cil_write_chain( xlog_cil_write_chain(
struct xfs_cil_ctx *ctx, struct xfs_cil_ctx *ctx,
struct xfs_log_vec *chain,
uint32_t chain_len) uint32_t chain_len)
{ {
struct xlog *log = ctx->cil->xc_log; struct xlog *log = ctx->cil->xc_log;
...@@ -826,7 +970,7 @@ xlog_cil_write_chain( ...@@ -826,7 +970,7 @@ xlog_cil_write_chain(
error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD); error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD);
if (error) if (error)
return error; return error;
return xlog_write(log, ctx, chain, ctx->ticket, chain_len); return xlog_write(log, ctx, &ctx->lv_chain, ctx->ticket, chain_len);
} }
/* /*
...@@ -855,6 +999,8 @@ xlog_cil_write_commit_record( ...@@ -855,6 +999,8 @@ xlog_cil_write_commit_record(
.lv_iovecp = &reg, .lv_iovecp = &reg,
}; };
int error; int error;
LIST_HEAD(lv_chain);
list_add(&vec.lv_list, &lv_chain);
if (xlog_is_shutdown(log)) if (xlog_is_shutdown(log))
return -EIO; return -EIO;
...@@ -865,7 +1011,7 @@ xlog_cil_write_commit_record( ...@@ -865,7 +1011,7 @@ xlog_cil_write_commit_record(
/* account for space used by record data */ /* account for space used by record data */
ctx->ticket->t_curr_res -= reg.i_len; ctx->ticket->t_curr_res -= reg.i_len;
error = xlog_write(log, ctx, &vec, ctx->ticket, reg.i_len); error = xlog_write(log, ctx, &lv_chain, ctx->ticket, reg.i_len);
if (error) if (error)
xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
return error; return error;
...@@ -931,11 +1077,30 @@ xlog_cil_build_trans_hdr( ...@@ -931,11 +1077,30 @@ xlog_cil_build_trans_hdr(
lvhdr->lv_niovecs = 2; lvhdr->lv_niovecs = 2;
lvhdr->lv_iovecp = &hdr->lhdr[0]; lvhdr->lv_iovecp = &hdr->lhdr[0];
lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len; lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len;
lvhdr->lv_next = ctx->lv_chain;
tic->t_curr_res -= lvhdr->lv_bytes; tic->t_curr_res -= lvhdr->lv_bytes;
} }
/*
* CIL item reordering compare function. We want to order in ascending ID order,
* but we want to leave items with the same ID in the order they were added to
* the list. This is important for operations like reflink where we log 4 order
* dependent intents in a single transaction when we overwrite an existing
* shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop),
* CUI (inc), BUI(remap)...
*/
static int
xlog_cil_order_cmp(
void *priv,
const struct list_head *a,
const struct list_head *b)
{
struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list);
struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list);
return l1->lv_order_id > l2->lv_order_id;
}
/* /*
* Pull all the log vectors off the items in the CIL, and remove the items from * Pull all the log vectors off the items in the CIL, and remove the items from
* the CIL. We don't need the CIL lock here because it's only needed on the * the CIL. We don't need the CIL lock here because it's only needed on the
...@@ -947,18 +1112,16 @@ xlog_cil_build_trans_hdr( ...@@ -947,18 +1112,16 @@ xlog_cil_build_trans_hdr(
*/ */
static void static void
xlog_cil_build_lv_chain( xlog_cil_build_lv_chain(
struct xfs_cil *cil,
struct xfs_cil_ctx *ctx, struct xfs_cil_ctx *ctx,
struct list_head *whiteouts, struct list_head *whiteouts,
uint32_t *num_iovecs, uint32_t *num_iovecs,
uint32_t *num_bytes) uint32_t *num_bytes)
{ {
struct xfs_log_vec *lv = NULL; while (!list_empty(&ctx->log_items)) {
while (!list_empty(&cil->xc_cil)) {
struct xfs_log_item *item; struct xfs_log_item *item;
struct xfs_log_vec *lv;
item = list_first_entry(&cil->xc_cil, item = list_first_entry(&ctx->log_items,
struct xfs_log_item, li_cil); struct xfs_log_item, li_cil);
if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) { if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) {
...@@ -967,18 +1130,18 @@ xlog_cil_build_lv_chain( ...@@ -967,18 +1130,18 @@ xlog_cil_build_lv_chain(
continue; continue;
} }
list_del_init(&item->li_cil);
if (!ctx->lv_chain)
ctx->lv_chain = item->li_lv;
else
lv->lv_next = item->li_lv;
lv = item->li_lv; lv = item->li_lv;
item->li_lv = NULL; lv->lv_order_id = item->li_order_id;
*num_iovecs += lv->lv_niovecs;
/* we don't write ordered log vectors */ /* we don't write ordered log vectors */
if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
*num_bytes += lv->lv_bytes; *num_bytes += lv->lv_bytes;
*num_iovecs += lv->lv_niovecs;
list_add_tail(&lv->lv_list, &ctx->lv_chain);
list_del_init(&item->li_cil);
item->li_order_id = 0;
item->li_lv = NULL;
} }
} }
...@@ -1022,10 +1185,11 @@ xlog_cil_push_work( ...@@ -1022,10 +1185,11 @@ xlog_cil_push_work(
int num_bytes = 0; int num_bytes = 0;
int error = 0; int error = 0;
struct xlog_cil_trans_hdr thdr; struct xlog_cil_trans_hdr thdr;
struct xfs_log_vec lvhdr = { NULL }; struct xfs_log_vec lvhdr = {};
xfs_csn_t push_seq; xfs_csn_t push_seq;
bool push_commit_stable; bool push_commit_stable;
LIST_HEAD (whiteouts); LIST_HEAD (whiteouts);
struct xlog_ticket *ticket;
new_ctx = xlog_cil_ctx_alloc(); new_ctx = xlog_cil_ctx_alloc();
new_ctx->ticket = xlog_cil_ticket_alloc(log); new_ctx->ticket = xlog_cil_ticket_alloc(log);
...@@ -1049,12 +1213,14 @@ xlog_cil_push_work( ...@@ -1049,12 +1213,14 @@ xlog_cil_push_work(
if (waitqueue_active(&cil->xc_push_wait)) if (waitqueue_active(&cil->xc_push_wait))
wake_up_all(&cil->xc_push_wait); wake_up_all(&cil->xc_push_wait);
xlog_cil_push_pcp_aggregate(cil, ctx);
/* /*
* Check if we've anything to push. If there is nothing, then we don't * Check if we've anything to push. If there is nothing, then we don't
* move on to a new sequence number and so we have to be able to push * move on to a new sequence number and so we have to be able to push
* this sequence again later. * this sequence again later.
*/ */
if (list_empty(&cil->xc_cil)) { if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
cil->xc_push_seq = 0; cil->xc_push_seq = 0;
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
goto out_skip; goto out_skip;
...@@ -1094,7 +1260,7 @@ xlog_cil_push_work( ...@@ -1094,7 +1260,7 @@ xlog_cil_push_work(
list_add(&ctx->committing, &cil->xc_committing); list_add(&ctx->committing, &cil->xc_committing);
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
xlog_cil_build_lv_chain(cil, ctx, &whiteouts, &num_iovecs, &num_bytes); xlog_cil_build_lv_chain(ctx, &whiteouts, &num_iovecs, &num_bytes);
/* /*
* Switch the contexts so we can drop the context lock and move out * Switch the contexts so we can drop the context lock and move out
...@@ -1126,15 +1292,31 @@ xlog_cil_push_work( ...@@ -1126,15 +1292,31 @@ xlog_cil_push_work(
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
up_write(&cil->xc_ctx_lock); up_write(&cil->xc_ctx_lock);
/*
* Sort the log vector chain before we add the transaction headers.
* This ensures we always have the transaction headers at the start
* of the chain.
*/
list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp);
/* /*
* Build a checkpoint transaction header and write it to the log to * Build a checkpoint transaction header and write it to the log to
* begin the transaction. We need to account for the space used by the * begin the transaction. We need to account for the space used by the
* transaction header here as it is not accounted for in xlog_write(). * transaction header here as it is not accounted for in xlog_write().
* Add the lvhdr to the head of the lv chain we pass to xlog_write() so
* it gets written into the iclog first.
*/ */
xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs); xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs);
num_bytes += lvhdr.lv_bytes; num_bytes += lvhdr.lv_bytes;
list_add(&lvhdr.lv_list, &ctx->lv_chain);
error = xlog_cil_write_chain(ctx, &lvhdr, num_bytes); /*
* Take the lvhdr back off the lv_chain immediately after calling
* xlog_cil_write_chain() as it should not be passed to log IO
* completion.
*/
error = xlog_cil_write_chain(ctx, num_bytes);
list_del(&lvhdr.lv_list);
if (error) if (error)
goto out_abort_free_ticket; goto out_abort_free_ticket;
...@@ -1142,7 +1324,14 @@ xlog_cil_push_work( ...@@ -1142,7 +1324,14 @@ xlog_cil_push_work(
if (error) if (error)
goto out_abort_free_ticket; goto out_abort_free_ticket;
xfs_log_ticket_ungrant(log, ctx->ticket); /*
* Grab the ticket from the ctx so we can ungrant it after releasing the
* commit_iclog. The ctx may be freed by the time we return from
* releasing the commit_iclog (i.e. checkpoint has been completed and
* callback run) so we can't reference the ctx after the call to
* xlog_state_release_iclog().
*/
ticket = ctx->ticket;
/* /*
* If the checkpoint spans multiple iclogs, wait for all previous iclogs * If the checkpoint spans multiple iclogs, wait for all previous iclogs
...@@ -1192,12 +1381,14 @@ xlog_cil_push_work( ...@@ -1192,12 +1381,14 @@ xlog_cil_push_work(
if (push_commit_stable && if (push_commit_stable &&
ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE) ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE)
xlog_state_switch_iclogs(log, ctx->commit_iclog, 0); xlog_state_switch_iclogs(log, ctx->commit_iclog, 0);
xlog_state_release_iclog(log, ctx->commit_iclog); ticket = ctx->ticket;
xlog_state_release_iclog(log, ctx->commit_iclog, ticket);
/* Not safe to reference ctx now! */ /* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
xlog_cil_cleanup_whiteouts(&whiteouts); xlog_cil_cleanup_whiteouts(&whiteouts);
xfs_log_ticket_ungrant(log, ticket);
return; return;
out_skip: out_skip:
...@@ -1207,17 +1398,19 @@ xlog_cil_push_work( ...@@ -1207,17 +1398,19 @@ xlog_cil_push_work(
return; return;
out_abort_free_ticket: out_abort_free_ticket:
xfs_log_ticket_ungrant(log, ctx->ticket);
ASSERT(xlog_is_shutdown(log)); ASSERT(xlog_is_shutdown(log));
xlog_cil_cleanup_whiteouts(&whiteouts); xlog_cil_cleanup_whiteouts(&whiteouts);
if (!ctx->commit_iclog) { if (!ctx->commit_iclog) {
xfs_log_ticket_ungrant(log, ctx->ticket);
xlog_cil_committed(ctx); xlog_cil_committed(ctx);
return; return;
} }
spin_lock(&log->l_icloglock); spin_lock(&log->l_icloglock);
xlog_state_release_iclog(log, ctx->commit_iclog); ticket = ctx->ticket;
xlog_state_release_iclog(log, ctx->commit_iclog, ticket);
/* Not safe to reference ctx now! */ /* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock); spin_unlock(&log->l_icloglock);
xfs_log_ticket_ungrant(log, ticket);
} }
/* /*
...@@ -1232,18 +1425,27 @@ xlog_cil_push_background( ...@@ -1232,18 +1425,27 @@ xlog_cil_push_background(
struct xlog *log) __releases(cil->xc_ctx_lock) struct xlog *log) __releases(cil->xc_ctx_lock)
{ {
struct xfs_cil *cil = log->l_cilp; struct xfs_cil *cil = log->l_cilp;
int space_used = atomic_read(&cil->xc_ctx->space_used);
/* /*
* The cil won't be empty because we are called while holding the * The cil won't be empty because we are called while holding the
* context lock so whatever we added to the CIL will still be there * context lock so whatever we added to the CIL will still be there.
*/ */
ASSERT(!list_empty(&cil->xc_cil)); ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
/* /*
* Don't do a background push if we haven't used up all the * We are done if:
* space available yet. * - we haven't used up all the space available yet; or
* - we've already queued up a push; and
* - we're not over the hard limit; and
* - nothing has been over the hard limit.
*
* If so, we don't need to take the push lock as there's nothing to do.
*/ */
if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) { if (space_used < XLOG_CIL_SPACE_LIMIT(log) ||
(cil->xc_push_seq == cil->xc_current_sequence &&
space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) &&
!waitqueue_active(&cil->xc_push_wait))) {
up_read(&cil->xc_ctx_lock); up_read(&cil->xc_ctx_lock);
return; return;
} }
...@@ -1270,12 +1472,11 @@ xlog_cil_push_background( ...@@ -1270,12 +1472,11 @@ xlog_cil_push_background(
* dipping back down under the hard limit. * dipping back down under the hard limit.
* *
* The ctx->xc_push_lock provides the serialisation necessary for safely * The ctx->xc_push_lock provides the serialisation necessary for safely
* using the lockless waitqueue_active() check in this context. * calling xlog_cil_over_hard_limit() in this context.
*/ */
if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) || if (xlog_cil_over_hard_limit(log, space_used)) {
waitqueue_active(&cil->xc_push_wait)) {
trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket); trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
ASSERT(cil->xc_ctx->space_used < log->l_logsize); ASSERT(space_used < log->l_logsize);
xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock); xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
return; return;
} }
...@@ -1334,7 +1535,8 @@ xlog_cil_push_now( ...@@ -1334,7 +1535,8 @@ xlog_cil_push_now(
* If the CIL is empty or we've already pushed the sequence then * If the CIL is empty or we've already pushed the sequence then
* there's no more work that we need to do. * there's no more work that we need to do.
*/ */
if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) { if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) ||
push_seq <= cil->xc_push_seq) {
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
return; return;
} }
...@@ -1352,7 +1554,7 @@ xlog_cil_empty( ...@@ -1352,7 +1554,7 @@ xlog_cil_empty(
bool empty = false; bool empty = false;
spin_lock(&cil->xc_push_lock); spin_lock(&cil->xc_push_lock);
if (list_empty(&cil->xc_cil)) if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
empty = true; empty = true;
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
return empty; return empty;
...@@ -1483,7 +1685,7 @@ xlog_cil_flush( ...@@ -1483,7 +1685,7 @@ xlog_cil_flush(
* If the CIL is empty, make sure that any previous checkpoint that may * If the CIL is empty, make sure that any previous checkpoint that may
* still be in an active iclog is pushed to stable storage. * still be in an active iclog is pushed to stable storage.
*/ */
if (list_empty(&log->l_cilp->xc_cil)) if (test_bit(XLOG_CIL_EMPTY, &log->l_cilp->xc_flags))
xfs_log_force(log->l_mp, 0); xfs_log_force(log->l_mp, 0);
} }
...@@ -1568,7 +1770,7 @@ xlog_cil_force_seq( ...@@ -1568,7 +1770,7 @@ xlog_cil_force_seq(
* we would have found the context on the committing list. * we would have found the context on the committing list.
*/ */
if (sequence == cil->xc_current_sequence && if (sequence == cil->xc_current_sequence &&
!list_empty(&cil->xc_cil)) { !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
spin_unlock(&cil->xc_push_lock); spin_unlock(&cil->xc_push_lock);
goto restart; goto restart;
} }
...@@ -1588,15 +1790,49 @@ xlog_cil_force_seq( ...@@ -1588,15 +1790,49 @@ xlog_cil_force_seq(
return 0; return 0;
} }
/*
* Move dead percpu state to the relevant CIL context structures.
*
* We have to lock the CIL context here to ensure that nothing is modifying
* the percpu state, either addition or removal. Both of these are done under
* the CIL context lock, so grabbing that exclusively here will ensure we can
* safely drain the cilpcp for the CPU that is dying.
*/
void
xlog_cil_pcp_dead(
struct xlog *log,
unsigned int cpu)
{
struct xfs_cil *cil = log->l_cilp;
struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
struct xfs_cil_ctx *ctx;
down_write(&cil->xc_ctx_lock);
ctx = cil->xc_ctx;
if (ctx->ticket)
ctx->ticket->t_curr_res += cilpcp->space_reserved;
cilpcp->space_reserved = 0;
if (!list_empty(&cilpcp->log_items))
list_splice_init(&cilpcp->log_items, &ctx->log_items);
if (!list_empty(&cilpcp->busy_extents))
list_splice_init(&cilpcp->busy_extents, &ctx->busy_extents);
atomic_add(cilpcp->space_used, &ctx->space_used);
cilpcp->space_used = 0;
up_write(&cil->xc_ctx_lock);
}
/* /*
* Perform initial CIL structure initialisation. * Perform initial CIL structure initialisation.
*/ */
int int
xlog_cil_init( xlog_cil_init(
struct xlog *log) struct xlog *log)
{ {
struct xfs_cil *cil; struct xfs_cil *cil;
struct xfs_cil_ctx *ctx; struct xfs_cil_ctx *ctx;
struct xlog_cil_pcp *cilpcp;
int cpu;
cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL); cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL);
if (!cil) if (!cil)
...@@ -1611,22 +1847,31 @@ xlog_cil_init( ...@@ -1611,22 +1847,31 @@ xlog_cil_init(
if (!cil->xc_push_wq) if (!cil->xc_push_wq)
goto out_destroy_cil; goto out_destroy_cil;
INIT_LIST_HEAD(&cil->xc_cil); cil->xc_log = log;
cil->xc_pcp = alloc_percpu(struct xlog_cil_pcp);
if (!cil->xc_pcp)
goto out_destroy_wq;
for_each_possible_cpu(cpu) {
cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
INIT_LIST_HEAD(&cilpcp->busy_extents);
INIT_LIST_HEAD(&cilpcp->log_items);
}
INIT_LIST_HEAD(&cil->xc_committing); INIT_LIST_HEAD(&cil->xc_committing);
spin_lock_init(&cil->xc_cil_lock);
spin_lock_init(&cil->xc_push_lock); spin_lock_init(&cil->xc_push_lock);
init_waitqueue_head(&cil->xc_push_wait); init_waitqueue_head(&cil->xc_push_wait);
init_rwsem(&cil->xc_ctx_lock); init_rwsem(&cil->xc_ctx_lock);
init_waitqueue_head(&cil->xc_start_wait); init_waitqueue_head(&cil->xc_start_wait);
init_waitqueue_head(&cil->xc_commit_wait); init_waitqueue_head(&cil->xc_commit_wait);
cil->xc_log = log;
log->l_cilp = cil; log->l_cilp = cil;
ctx = xlog_cil_ctx_alloc(); ctx = xlog_cil_ctx_alloc();
xlog_cil_ctx_switch(cil, ctx); xlog_cil_ctx_switch(cil, ctx);
return 0; return 0;
out_destroy_wq:
destroy_workqueue(cil->xc_push_wq);
out_destroy_cil: out_destroy_cil:
kmem_free(cil); kmem_free(cil);
return -ENOMEM; return -ENOMEM;
...@@ -1636,14 +1881,17 @@ void ...@@ -1636,14 +1881,17 @@ void
xlog_cil_destroy( xlog_cil_destroy(
struct xlog *log) struct xlog *log)
{ {
if (log->l_cilp->xc_ctx) { struct xfs_cil *cil = log->l_cilp;
if (log->l_cilp->xc_ctx->ticket)
xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket); if (cil->xc_ctx) {
kmem_free(log->l_cilp->xc_ctx); if (cil->xc_ctx->ticket)
xfs_log_ticket_put(cil->xc_ctx->ticket);
kmem_free(cil->xc_ctx);
} }
ASSERT(list_empty(&log->l_cilp->xc_cil)); ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
destroy_workqueue(log->l_cilp->xc_push_wq); free_percpu(cil->xc_pcp);
kmem_free(log->l_cilp); destroy_workqueue(cil->xc_push_wq);
kmem_free(cil);
} }
...@@ -143,15 +143,16 @@ enum xlog_iclog_state { ...@@ -143,15 +143,16 @@ enum xlog_iclog_state {
#define XLOG_COVER_OPS 5 #define XLOG_COVER_OPS 5
typedef struct xlog_ticket { typedef struct xlog_ticket {
struct list_head t_queue; /* reserve/write queue */ struct list_head t_queue; /* reserve/write queue */
struct task_struct *t_task; /* task that owns this ticket */ struct task_struct *t_task; /* task that owns this ticket */
xlog_tid_t t_tid; /* transaction identifier : 4 */ xlog_tid_t t_tid; /* transaction identifier */
atomic_t t_ref; /* ticket reference count : 4 */ atomic_t t_ref; /* ticket reference count */
int t_curr_res; /* current reservation in bytes : 4 */ int t_curr_res; /* current reservation */
int t_unit_res; /* unit reservation in bytes : 4 */ int t_unit_res; /* unit reservation */
char t_ocnt; /* original count : 1 */ char t_ocnt; /* original unit count */
char t_cnt; /* current count : 1 */ char t_cnt; /* current unit count */
uint8_t t_flags; /* properties of reservation : 1 */ uint8_t t_flags; /* properties of reservation */
int t_iclog_hdrs; /* iclog hdrs in t_curr_res */
} xlog_ticket_t; } xlog_ticket_t;
/* /*
...@@ -221,13 +222,25 @@ struct xfs_cil_ctx { ...@@ -221,13 +222,25 @@ struct xfs_cil_ctx {
xfs_lsn_t commit_lsn; /* chkpt commit record lsn */ xfs_lsn_t commit_lsn; /* chkpt commit record lsn */
struct xlog_in_core *commit_iclog; struct xlog_in_core *commit_iclog;
struct xlog_ticket *ticket; /* chkpt ticket */ struct xlog_ticket *ticket; /* chkpt ticket */
int space_used; /* aggregate size of regions */ atomic_t space_used; /* aggregate size of regions */
struct list_head busy_extents; /* busy extents in chkpt */ struct list_head busy_extents; /* busy extents in chkpt */
struct xfs_log_vec *lv_chain; /* logvecs being pushed */ struct list_head log_items; /* log items in chkpt */
struct list_head lv_chain; /* logvecs being pushed */
struct list_head iclog_entry; struct list_head iclog_entry;
struct list_head committing; /* ctx committing list */ struct list_head committing; /* ctx committing list */
struct work_struct discard_endio_work; struct work_struct discard_endio_work;
struct work_struct push_work; struct work_struct push_work;
atomic_t order_id;
};
/*
* Per-cpu CIL tracking items
*/
struct xlog_cil_pcp {
int32_t space_used;
uint32_t space_reserved;
struct list_head busy_extents;
struct list_head log_items;
}; };
/* /*
...@@ -248,8 +261,8 @@ struct xfs_cil_ctx { ...@@ -248,8 +261,8 @@ struct xfs_cil_ctx {
*/ */
struct xfs_cil { struct xfs_cil {
struct xlog *xc_log; struct xlog *xc_log;
struct list_head xc_cil; unsigned long xc_flags;
spinlock_t xc_cil_lock; atomic_t xc_iclog_hdrs;
struct workqueue_struct *xc_push_wq; struct workqueue_struct *xc_push_wq;
struct rw_semaphore xc_ctx_lock ____cacheline_aligned_in_smp; struct rw_semaphore xc_ctx_lock ____cacheline_aligned_in_smp;
...@@ -263,8 +276,17 @@ struct xfs_cil { ...@@ -263,8 +276,17 @@ struct xfs_cil {
wait_queue_head_t xc_start_wait; wait_queue_head_t xc_start_wait;
xfs_csn_t xc_current_sequence; xfs_csn_t xc_current_sequence;
wait_queue_head_t xc_push_wait; /* background push throttle */ wait_queue_head_t xc_push_wait; /* background push throttle */
void __percpu *xc_pcp; /* percpu CIL structures */
#ifdef CONFIG_HOTPLUG_CPU
struct list_head xc_pcp_list;
#endif
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
/* xc_flags bit values */
#define XLOG_CIL_EMPTY 1
#define XLOG_CIL_PCP_SPACE 2
/* /*
* The amount of log space we allow the CIL to aggregate is difficult to size. * The amount of log space we allow the CIL to aggregate is difficult to size.
* Whatever we choose, we have to make sure we can get a reservation for the * Whatever we choose, we have to make sure we can get a reservation for the
...@@ -486,14 +508,15 @@ struct xlog_ticket *xlog_ticket_alloc(struct xlog *log, int unit_bytes, ...@@ -486,14 +508,15 @@ struct xlog_ticket *xlog_ticket_alloc(struct xlog *log, int unit_bytes,
void xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket); void xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket);
void xlog_print_trans(struct xfs_trans *); void xlog_print_trans(struct xfs_trans *);
int xlog_write(struct xlog *log, struct xfs_cil_ctx *ctx, int xlog_write(struct xlog *log, struct xfs_cil_ctx *ctx,
struct xfs_log_vec *log_vector, struct xlog_ticket *tic, struct list_head *lv_chain, struct xlog_ticket *tic,
uint32_t len); uint32_t len);
void xfs_log_ticket_ungrant(struct xlog *log, struct xlog_ticket *ticket); void xfs_log_ticket_ungrant(struct xlog *log, struct xlog_ticket *ticket);
void xfs_log_ticket_regrant(struct xlog *log, struct xlog_ticket *ticket); void xfs_log_ticket_regrant(struct xlog *log, struct xlog_ticket *ticket);
void xlog_state_switch_iclogs(struct xlog *log, struct xlog_in_core *iclog, void xlog_state_switch_iclogs(struct xlog *log, struct xlog_in_core *iclog,
int eventual_size); int eventual_size);
int xlog_state_release_iclog(struct xlog *log, struct xlog_in_core *iclog); int xlog_state_release_iclog(struct xlog *log, struct xlog_in_core *iclog,
struct xlog_ticket *ticket);
/* /*
* When we crack an atomic LSN, we sample it first so that the value will not * When we crack an atomic LSN, we sample it first so that the value will not
...@@ -682,4 +705,9 @@ xlog_kvmalloc( ...@@ -682,4 +705,9 @@ xlog_kvmalloc(
return p; return p;
} }
/*
* CIL CPU dead notifier
*/
void xlog_cil_pcp_dead(struct xlog *log, unsigned int cpu);
#endif /* __XFS_LOG_PRIV_H__ */ #endif /* __XFS_LOG_PRIV_H__ */
...@@ -2213,6 +2213,7 @@ xfs_cpu_dead( ...@@ -2213,6 +2213,7 @@ xfs_cpu_dead(
list_for_each_entry_safe(mp, n, &xfs_mount_list, m_mount_list) { list_for_each_entry_safe(mp, n, &xfs_mount_list, m_mount_list) {
spin_unlock(&xfs_mount_list_lock); spin_unlock(&xfs_mount_list_lock);
xfs_inodegc_cpu_dead(mp, cpu); xfs_inodegc_cpu_dead(mp, cpu);
xlog_cil_pcp_dead(mp->m_log, cpu);
spin_lock(&xfs_mount_list_lock); spin_lock(&xfs_mount_list_lock);
} }
spin_unlock(&xfs_mount_list_lock); spin_unlock(&xfs_mount_list_lock);
......
...@@ -760,7 +760,7 @@ xfs_log_item_batch_insert( ...@@ -760,7 +760,7 @@ xfs_log_item_batch_insert(
void void
xfs_trans_committed_bulk( xfs_trans_committed_bulk(
struct xfs_ail *ailp, struct xfs_ail *ailp,
struct xfs_log_vec *log_vector, struct list_head *lv_chain,
xfs_lsn_t commit_lsn, xfs_lsn_t commit_lsn,
bool aborted) bool aborted)
{ {
...@@ -775,7 +775,7 @@ xfs_trans_committed_bulk( ...@@ -775,7 +775,7 @@ xfs_trans_committed_bulk(
spin_unlock(&ailp->ail_lock); spin_unlock(&ailp->ail_lock);
/* unpin all the log items */ /* unpin all the log items */
for (lv = log_vector; lv; lv = lv->lv_next ) { list_for_each_entry(lv, lv_chain, lv_list) {
struct xfs_log_item *lip = lv->lv_item; struct xfs_log_item *lip = lv->lv_item;
xfs_lsn_t item_lsn; xfs_lsn_t item_lsn;
......
...@@ -45,6 +45,7 @@ struct xfs_log_item { ...@@ -45,6 +45,7 @@ struct xfs_log_item {
struct xfs_log_vec *li_lv; /* active log vector */ struct xfs_log_vec *li_lv; /* active log vector */
struct xfs_log_vec *li_lv_shadow; /* standby vector */ struct xfs_log_vec *li_lv_shadow; /* standby vector */
xfs_csn_t li_seq; /* CIL commit seq */ xfs_csn_t li_seq; /* CIL commit seq */
uint32_t li_order_id; /* CIL commit order */
}; };
/* /*
......
...@@ -19,7 +19,8 @@ void xfs_trans_add_item(struct xfs_trans *, struct xfs_log_item *); ...@@ -19,7 +19,8 @@ void xfs_trans_add_item(struct xfs_trans *, struct xfs_log_item *);
void xfs_trans_del_item(struct xfs_log_item *); void xfs_trans_del_item(struct xfs_log_item *);
void xfs_trans_unreserve_and_mod_sb(struct xfs_trans *tp); void xfs_trans_unreserve_and_mod_sb(struct xfs_trans *tp);
void xfs_trans_committed_bulk(struct xfs_ail *ailp, struct xfs_log_vec *lv, void xfs_trans_committed_bulk(struct xfs_ail *ailp,
struct list_head *lv_chain,
xfs_lsn_t commit_lsn, bool aborted); xfs_lsn_t commit_lsn, bool aborted);
/* /*
* AIL traversal cursor. * AIL traversal cursor.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment