Commit ef4fc1b4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8693a253
...@@ -260,7 +260,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -260,7 +260,7 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
at = head at = head
} }
db.mu.Lock() db.mu.Lock() // XXX no - back to resync
defer db.mu.Unlock() defer db.mu.Unlock()
conn, err := db.open(ctx, at, opt.NoPool) conn, err := db.open(ctx, at, opt.NoPool)
...@@ -320,6 +320,7 @@ retry: ...@@ -320,6 +320,7 @@ retry:
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX double unlock
return nil, ctx.Err() return nil, ctx.Err()
case <-δready: case <-δready:
...@@ -358,9 +359,6 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) { ...@@ -358,9 +359,6 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) {
if !conn.noPool { if !conn.noPool {
panic("Conn.Resync: connection was opened without NoPool flag") panic("Conn.Resync: connection was opened without NoPool flag")
} }
if conn.txn != nil {
panic("Conn.Resync: previous transaction is not yet complete")
}
if at == 0 { if at == 0 {
panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)") panic("Conn.Resync: resync to at=0 (auto-mode is valid only for DB.Open)")
} }
...@@ -372,15 +370,25 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) { ...@@ -372,15 +370,25 @@ func (conn *Connection) Resync(txn transaction.Transaction, at Tid) {
} }
// resync serves Connection.resync and DB.Open . // resync serves Connection.resync and DB.Open .
//
// must be called with conn.db.mu locked.
func (conn *Connection) resync(txn transaction.Transaction, at Tid) { func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
// XXX conn.cache.Lock ? - yes (e.g. if user also checks it from outside, right?) if conn.txn != nil {
// XXX assert conn.txn == nil panic("Conn.resync: previous transaction is not yet complete")
}
δtail := db.δtail defer func() {
conn.at = at
conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn))
}()
// XXX conn.at == at - do nothing (even if out of δtail coverave) // conn.at == at - nothing to do (even if out of δtail coverage)
if conn.at == at {
return
}
δtail := db.δtail
δobj := make(map[Oid]struct{}) // what to invalidate
δall := false // if we have to invalidate all objects
// both conn.at and at are covered by δtail - we can invalidate selectively // both conn.at and at are covered by δtail - we can invalidate selectively
if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) && if (δtail.Tail() < conn.at && conn.at <= δtail.Head()) &&
...@@ -395,16 +403,23 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) { ...@@ -395,16 +403,23 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
for _, δ := range δv { for _, δ := range δv {
for _, oid := range δ.changev { for _, oid := range δ.changev {
obj := conn.cache.Get(oid) δobj[oid] = struct{}{}
if obj != nil {
obj.PInvalidate()
}
} }
} }
// some of conn.at or at is outside δtail coverage - invalidate all // some of conn.at or at is outside δtail coverage - invalidate all
// objects, but keep the objects present in live cache. // objects, but keep the objects present in live cache.
} else { } else {
δall = true
}
// XXX unlock db before txn.RegisterSync (it locks txn.mu)
db.mu.Unlock()
conn.cache.Lock()
defer conn.cache.Unlock()
if δall {
// XXX keep synced with LiveCache details // XXX keep synced with LiveCache details
// XXX -> conn.cache.forEach? // XXX -> conn.cache.forEach?
// XXX should we wait for db.stor.head to cover at? // XXX should we wait for db.stor.head to cover at?
...@@ -415,13 +430,21 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) { ...@@ -415,13 +430,21 @@ func (conn *Connection) resync(txn transaction.Transaction, at Tid) {
obj.PInvalidate() obj.PInvalidate()
} }
} }
} else {
for oid := range δobj {
obj := conn.cache.Get(oid)
if obj != nil {
obj.PInvalidate()
}
}
} }
// XXX unlock db before txn.RegisterSync (it locks txn.mu) // all done
return
conn.at = at // conn.at = at
conn.txn = txn // conn.txn = txn
txn.RegisterSync((*connTxnSync)(conn)) // txn.RegisterSync((*connTxnSync)(conn))
} }
// get returns connection from db pool most close to at. // get returns connection from db pool most close to at.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment