Commit 34fb46aa authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ef4fc1b4
......@@ -260,24 +260,25 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = head
}
db.mu.Lock() // XXX no - back to resync
defer db.mu.Unlock()
db.mu.Lock()
// unlock is either in db.open -> err, or in resyncAndDBUnlock
conn, err := db.open(ctx, at, opt.NoPool)
conn, err := db.openOrDBUnlock(ctx, at, opt.NoPool)
if err != nil {
return nil, err
}
conn.resync(txn, at)
conn.resyncAndDBUnlock(txn, at)
return conn, nil
}
// open is internal worker for Open.
// openOrDBUnlock is internal worker for Open.
//
// it returns either new connection, or connection from the pool.
// returned connection does not neccessarily have .at=at, and have to go through .resync().
//
// must be called with db.mu locked.
func (db *DB) open(ctx context.Context, at Tid, noPool bool) (*Connection, error) {
// db.mu is unlocked on error.
func (db *DB) openOrDBUnlock(ctx context.Context, at Tid, noPool bool) (*Connection, error) {
// NoPool connection - create anew
if noPool {
conn := newConnection(db, at)
......@@ -320,7 +321,7 @@ retry:
select {
case <-ctx.Done():
// XXX double unlock
// leave db.mu unlocked
return nil, ctx.Err()
case <-δready:
......@@ -364,17 +365,21 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) {
}
conn.db.mu.Lock()
defer conn.db.mu.Unlock()
conn.resync(txn, at)
conn.resyncAndDBUnlock(txn, at)
}
// resync serves Connection.resync and DB.Open .
func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
// resyncAndDBUnlock serves Connection.Resync and DB.Open .
//
// must be called with conn.db locked and unlocks it at the end.
func (conn *Connection) resyncAndDBUnlock(txn transaction.Transaction, at Tid) {
db := conn.db
if conn.txn != nil {
db.mu.Unlock()
panic("Conn.resync: previous transaction is not yet complete")
}
// upon exit, with all locks released, register conn to txn.
defer func() {
conn.at = at
conn.txn = txn
......@@ -383,11 +388,13 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
// conn.at == at - nothing to do (even if out of δtail coverage)
if conn.at == at {
db.mu.Unlock()
return
}
// conn.at != at - have to invalidate objects in live cache.
δtail := db.δtail
δobj := make(map[Oid]struct{}) // what to invalidate
δobj := make(map[Oid]struct{}) // set(oid) - what to invalidate
δall := false // if we have to invalidate all objects
// both conn.at and at are covered by δtail - we can invalidate selectively
......@@ -413,7 +420,7 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
δall = true
}
// XXX unlock db before txn.RegisterSync (it locks txn.mu)
// unlock db before locking cache and txn
db.mu.Unlock()
conn.cache.Lock()
......@@ -423,7 +430,7 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
// XXX keep synced with LiveCache details
// XXX -> conn.cache.forEach?
// XXX should we wait for db.stor.head to cover at?
// or leave this wait till load time?
// or leave this wait till .Load() time?
for _, wobj := range conn.cache.objtab {
obj, _ := wobj.Get().(IPersistent)
if obj != nil {
......@@ -441,10 +448,6 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
// all done
return
// conn.at = at
// conn.txn = txn
// txn.RegisterSync((*connTxnSync)(conn))
}
// get returns connection from db pool most close to at.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment