Commit 6f377311 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2d12146a
...@@ -124,7 +124,7 @@ func NewDB(stor IStorage) *DB { ...@@ -124,7 +124,7 @@ func NewDB(stor IStorage) *DB {
watchq := make(chan CommitEvent) watchq := make(chan CommitEvent)
at0 := stor.AddWatch(watchq) at0 := stor.AddWatch(watchq)
db.δtail = NewΔTail(at0) // init δtail to (at0, at0] db.δtail = NewΔTail(at0) // init to (at0, at0]
go db.watcher(watchq) go db.watcher(watchq)
// XXX DelWatch? in db.Close() ? // XXX DelWatch? in db.Close() ?
...@@ -243,36 +243,29 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -243,36 +243,29 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// find out db state we should open at // find out db state we should open at
at := opt.At at := opt.At
if at == 0 { if at == 0 {
head := Tid(0)
if opt.NoSync { if opt.NoSync {
db.mu.Lock() db.mu.Lock()
head = db.δtail.Head() // = 0 if δtail was not yet initialized with first event at = db.δtail.Head()
db.mu.Unlock() db.mu.Unlock()
} } else {
// !NoSync or δtail !initialized
// sync storage for lastTid // sync storage for lastTid
if head == 0 {
var err error var err error
// XXX stor.LastTid returns last_tid storage itself // XXX stor.LastTid returns last_tid storage itself
// received on server, not last_tid on server. // received on server, not last_tid on server.
// -> add stor.Sync() ? // -> add stor.Sync() ?
head, err = db.stor.LastTid(ctx) at, err = db.stor.LastTid(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
at = head
} }
// proceed to open(at) // proceed to open(at)
db.mu.Lock() // unlocked in *DBUnlock db.mu.Lock() // unlocked in *DBUnlock
/* /*
err := db.needHeadOrDBUnlock(ctx, at) // XXX what if δtail !init yet? err := db.needHeadOrDBUnlock(ctx, at)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -313,23 +306,15 @@ retry: ...@@ -313,23 +306,15 @@ retry:
// no exact match - let's try to find nearest // no exact match - let's try to find nearest
δtail := db.δtail δtail := db.δtail
δhead := db.δtail.Head()
// too far in the past, and we know there is no exact match // too far in the past, and we know there is no exact match
// -> new historic connection. // -> new historic connection.
if at <= db.δtail.Tail() { if at <= δtail.Tail() {
return newConnection(db, at), nil
}
// δtail !initialized yet
if db.δtail.Head() == 0 {
// XXX δtail could be not yet initialized, but e.g. last_tid changed
// -> we have to wait for δtail not to loose just-released live cache
return newConnection(db, at), nil return newConnection(db, at), nil
} }
// we have some δtail coverage, but at is ahead of that. // we have some δtail coverage, but at is ahead of that.
if at > δhead { if at > δtail.Head() {
// wait till δtail.head is up to date covering ≥ at, // wait till δtail.head is up to date covering ≥ at,
// and retry the loop (δtail.tail might go over at while we are waiting) // and retry the loop (δtail.tail might go over at while we are waiting)
δready := make(chan struct{}) δready := make(chan struct{})
...@@ -348,8 +333,6 @@ retry: ...@@ -348,8 +333,6 @@ retry:
} }
} }
// XXX note: vvv at start δtail.Tail is not covering first committed txn
// at ∈ (δtail, δhead] ; try to get nearby idle connection or make a new one // at ∈ (δtail, δhead] ; try to get nearby idle connection or make a new one
conn = db.get(δtail.Tail(), at) conn = db.get(δtail.Tail(), at)
if conn == nil { if conn == nil {
......
...@@ -451,6 +451,7 @@ type Watcher interface { ...@@ -451,6 +451,7 @@ type Watcher interface {
// //
// Multiple AddWatch calls with the same watchq register watchq only once. XXX // Multiple AddWatch calls with the same watchq register watchq only once. XXX
// //
// XXX ↑ guaranteed
// XXX watchq closed when stor.watchq closed? // XXX watchq closed when stor.watchq closed?
AddWatch(watchq chan<- CommitEvent) (at0 Tid) AddWatch(watchq chan<- CommitEvent) (at0 Tid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment