Commit 0b792ca5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5d6a5cf4
...@@ -282,6 +282,8 @@ type hwaiter struct { ...@@ -282,6 +282,8 @@ type hwaiter struct {
// XXX -> waitHead? // XXX -> waitHead?
// XXX -> waitHeadOrDBUnlock? // XXX -> waitHeadOrDBUnlock?
func (db *DB) headWait(ctx context.Context, at Tid) (err error) { func (db *DB) headWait(ctx context.Context, at Tid) (err error) {
// XXX check if db is already down -> error even if at is under coverage?
if at <= db.δtail.Head() { if at <= db.δtail.Head() {
return nil // we already have the coverage return nil // we already have the coverage
} }
...@@ -334,7 +336,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -334,7 +336,11 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
}() }()
// XXX check db is aready down/closed // check db is already down/closed
// XXX -> headWait?
if ready(db.down) {
return nil, db.downErr
}
// find out db state we should open at // find out db state we should open at
at := opt.At at := opt.At
...@@ -367,11 +373,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -367,11 +373,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
} }
conn := db.open(at, opt.NoPool) conn := db.open(at, opt.NoPool)
err = conn.resyncAndDBUnlock(ctx, at) conn.resyncAndDBUnlock(ctx, at)
if err != nil {
// XXX release conn?
return nil, err
}
return conn, nil return conn, nil
} }
...@@ -447,25 +449,32 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error { ...@@ -447,25 +449,32 @@ func (conn *Connection) Resync(ctx context.Context, at Tid) error {
panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)") panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)")
} }
// XXX err ctx? ("resync @at -> @at")
db := conn.db db := conn.db
// XXX check db is already down/closed
// XXX or -> headWait?
db.mu.Lock() db.mu.Lock()
// wait for db.Head ≥ at // wait for db.Head ≥ at
err = db.headWait(ctx, at) err := db.headWait(ctx, at)
if err != nil { if err != nil {
return err return err
} }
return conn.resyncAndDBUnlock(ctx, at) conn.resyncAndDBUnlock(ctx, at)
return nil
} }
// resyncAndDBUnlock serves Connection.Resync and DB.Open . // resyncAndDBUnlock serves Connection.Resync and DB.Open .
// //
// Must be called with at ≤ conn.db.Head . // Must be called with at ≤ conn.db.Head .
// Must be called with conn.db locked and unlocks it at the end. // Must be called with conn.db locked and unlocks it at the end.
func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err error) { func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) {
db := conn.db db := conn.db
txn := transaction.Current(ctx) // XXX no unlock here txn := transaction.Current(ctx) // XXX no unlock here if !txn
if conn.txn != nil { if conn.txn != nil {
db.mu.Unlock() db.mu.Unlock()
...@@ -478,11 +487,8 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro ...@@ -478,11 +487,8 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro
panic(fmt.Sprintf("resync: at (%s) > head (%s)", at, head)) panic(fmt.Sprintf("resync: at (%s) > head (%s)", at, head))
} }
// XXX err ctx
// upon exit, with all locks released, register conn to txn. // upon exit, with all locks released, register conn to txn.
defer func() { defer func() {
if err != nil { return }
conn.at = at conn.at = at
conn.txn = txn conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn)) txn.RegisterSync((*connTxnSync)(conn))
...@@ -491,10 +497,10 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro ...@@ -491,10 +497,10 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro
// conn.at == at - nothing to do (even if out of δtail coverage) // conn.at == at - nothing to do (even if out of δtail coverage)
if conn.at == at { if conn.at == at {
db.mu.Unlock() db.mu.Unlock()
return nil return
} }
// XXX -> DB.δobj(at1, at2) // XXX -> DB.δobj(at1, at2) ?
// conn.at != at - have to invalidate objects in live cache. // conn.at != at - have to invalidate objects in live cache.
δtail := db.δtail δtail := db.δtail
...@@ -551,7 +557,7 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro ...@@ -551,7 +557,7 @@ func (conn *Connection) resyncAndDBUnlock(ctx context.Context, at Tid) (err erro
} }
// all done // all done
return nil return
} }
// get returns connection from db pool most close to at with conn.at ∈ [atMin, at]. // get returns connection from db pool most close to at with conn.at ∈ [atMin, at].
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment