Commit bae8fda6 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: Connection.Resync

Add low-level option to resync Connection to another database state with
preserving its live cache.

Wendelin.core needs this for its main ZODB connection because it needs
to carefully propagate ZODB invalidations into file invalidations and
semantically relies on objects staying in ZODB live cache for this to
work correctly:

https://lab.nexedi.com/kirr/wendelin.core/blob/455a54425c/wcfs/wcfs.go#L245
parent 918455e7
......@@ -46,10 +46,11 @@ import (
// Use DB.Open to open a connection.
type Connection struct {
db *DB // Connection is part of this DB
txn transaction.Transaction // opened under this txn; nil if idle in DB pool.
txn transaction.Transaction // opened under this txn; nil after transaction ends.
at Tid // current view of database; stable inside a transaction.
cache LiveCache // cache of connection's in-RAM objects
noPool bool // connection is not returned to db.pool
}
// LiveCache keeps registry of live in-RAM objects for a Connection.
......
......@@ -166,6 +166,14 @@ func (db *DB) Close() error {
type ConnOptions struct {
At Tid // if !0, open Connection bound to `at` view of database; not latest.
NoSync bool // don't sync with storage to get its last tid.
// don't put connection back into DB pool after transaction ends.
//
// This is low-level option that allows to inspect/use connection's
// LiveCache after transaction finishes, and to manually resync the
// connection onto another database view. See Connection.Resync for
// details.
NoPool bool
}
// String represents connection options in human-readable form.
......@@ -187,6 +195,12 @@ func (opt *ConnOptions) String() string {
}
s += "sync"
s += ", "
if opt.NoPool {
s += "!"
}
s += "pool"
s += ")"
return s
}
......@@ -317,7 +331,9 @@ func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
//
// Open must be called under transaction.
// Opened connection must be used only under the same transaction and only
// until that transaction is complete.
// until that transaction is complete(*).
//
// (*) unless NoPool option is used.
func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err error) {
defer func() {
if err == nil {
......@@ -361,7 +377,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
}
// open(at)
conn := db.open(at)
conn := db.open(at, opt.NoPool)
conn.resync(ctx, at)
return conn, nil
}
......@@ -373,19 +389,26 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
//
// Must be called with at ≤ db.Head .
// Must be called with db.mu released.
func (db *DB) open(at Tid) *Connection {
func (db *DB) open(at Tid, noPool bool) *Connection {
db.mu.Lock()
defer db.mu.Unlock()
δtail := db.δtail
//fmt.Printf("db.open @%s\t; δtail (%s, %s]\n", at, δtail.Tail(), δtail.Head())
//fmt.Printf("db.open @%s nopool=%v\t; δtail (%s, %s]\n", at, noPool, δtail.Tail(), δtail.Head())
// at should be ≤ head (caller waited for it before invoking us)
if head := δtail.Head(); at > head {
panic(fmt.Sprintf("open: at (%s) > head (%s)", at, head))
}
// NoPool connection - create one anew
if noPool {
conn := newConnection(db, at)
conn.noPool = true
return conn
}
// check if we already have an exact match
conn := db.get(at, at)
if conn != nil {
......@@ -412,7 +435,47 @@ func (db *DB) open(at Tid) *Connection {
return conn
}
// resync serves DB.Open .
// Resync resyncs the connection onto different database view and transaction.
//
// Connection's objects pinned in live cache are guaranteed to stay in live
// cache, even if maybe in ghost state (e.g. if they have to be invalidated due
// to database changes).
//
// Resync can be used many times.
//
// Resync must be used only under the following conditions:
//
// - the connection was initially opened with NoPool flag;
// - previous transaction, under which connection was previously
// opened/resynced, must be already complete;
// - contrary to DB.Open, at cannot be 0.
//
// Note: new at can be both higher and lower than previous connection at.
//
// Note: if new at is already covered by DB.Head Resync will be non-blocking
// operation. However if at is > current DB.Head Resync, similarly to DB.Open,
// will block waiting for DB.Head to become ≥ at.
func (conn *Connection) Resync(ctx context.Context, at Tid) (err error) {
if !conn.noPool {
panic("Conn.Resync: connection was opened without NoPool flag")
}
if at == 0 {
panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)")
}
defer xerr.Contextf(&err, "resync @%s -> @%s", conn.at, at)
// wait for db.Head ≥ at
err = conn.db.headWait(ctx, at)
if err != nil {
return err
}
conn.resync(ctx, at)
return nil
}
// resync serves Connection.Resync and DB.Open .
//
// Must be called with at ≤ conn.db.Head .
// Must be called with conn.db released.
......@@ -611,5 +674,7 @@ func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
// mark the connection as no longer being live
conn.txn = nil
if !conn.noPool {
conn.db.put(conn)
}
}
......@@ -286,6 +286,25 @@ func (t *tPersistentDB) checkObj(obj *MyObject, oid Oid, serial Tid, state Objec
}
}
// Resync resyncs t to new transaction @at.
func (t *tPersistentDB) Resync(at Tid) {
t.Helper()
db := t.conn.db
txn, ctx := transaction.New(context.Background())
err := t.conn.Resync(ctx, at)
if err != nil {
t.Fatalf("resync %s -> %s", at, err)
}
t.txn = txn
t.ctx = ctx
assert.Same(t, t.conn.db, db)
assert.Same(t, t.conn.txn, t.txn)
assert.Equal(t, t.conn.At(), at)
}
// Abort aborts t's connection and verifies it becomes !live.
func (t *tPersistentDB) Abort() {
t.Helper()
......@@ -475,6 +494,108 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
t.Abort()
t2.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// open new connection in nopool mode to verify resync
t4 := testopen(&ConnOptions{NoPool: true})
t = t4
assert.Equal(t.conn.At(), at2)
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// pin obj2 into live cache, similarly to conn1
rzcache := t.conn.Cache()
rzcache.Lock()
rzcache.SetControl(&zcacheControl{[]Oid{_obj2.oid}})
rzcache.Unlock()
// it should see latest data
robj1 := t.Get(101)
robj2 := t.Get(102)
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, InvalidTid, GHOST, 0)
t.PActivate(robj1)
t.PActivate(robj2)
t.checkObj(robj1, 101, at1, UPTODATE, 1, "hello")
t.checkObj(robj2, 102, at2, UPTODATE, 1, "kitty")
// obj2 stays in live cache
robj1.PDeactivate()
robj2.PDeactivate()
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, at2, UPTODATE, 0, "kitty")
// txn4 completes, but its conn stays out of db pool
t.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// Resync ↓ (at2 -> at1; within δtail coverage)
t.Resync(at1)
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// obj2 should be invalidated
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, InvalidTid, GHOST, 0)
// obj2 data should be old
t.PActivate(robj1)
t.PActivate(robj2)
t.checkObj(robj1, 101, at1, UPTODATE, 1, "hello")
t.checkObj(robj2, 102, at1, UPTODATE, 1, "world")
robj1.PDeactivate()
robj2.PDeactivate()
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, at1, UPTODATE, 0, "world")
// Resync ↑ (at1 -> at2; within δtail coverage)
t.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
t.Resync(at2)
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// obj2 should be invalidated
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, InvalidTid, GHOST, 0)
t.PActivate(robj1)
t.PActivate(robj2)
t.checkObj(robj1, 101, at1, UPTODATE, 1, "hello")
t.checkObj(robj2, 102, at2, UPTODATE, 1, "kitty")
robj1.PDeactivate()
robj2.PDeactivate()
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, at2, UPTODATE, 0, "kitty")
// Resync ↓ (at1 -> at0; to outside δtail coverage)
t.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
t.Resync(at0)
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// obj2 should be invalidated
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, InvalidTid, GHOST, 0)
t.PActivate(robj1)
t.PActivate(robj2)
t.checkObj(robj1, 101, at0, UPTODATE, 1, "init")
t.checkObj(robj2, 102, at0, UPTODATE, 1, "db")
robj1.PDeactivate()
robj2.PDeactivate()
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, at0, UPTODATE, 0, "db")
// Resync ↑ (at0 -> at2; from outside δtail coverage)
t.Abort()
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
t.Resync(at2)
assert.Equal(db.pool, []*Connection{t1.conn, t2.conn})
// obj2 should be invalidated
t.checkObj(robj1, 101, InvalidTid, GHOST, 0)
t.checkObj(robj2, 102, InvalidTid, GHOST, 0)
}
// TODO Map & List tests.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment