Commit 49434236 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 04346414
......@@ -101,6 +101,10 @@ type FileStorage struct {
// driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event
// sync(s) waiting for feedback from watcher
syncMu sync.Mutex
syncv []chan zodb.Tid
down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
......@@ -115,20 +119,6 @@ func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpErro
return &zodb.OpError{URL: fs.URL(), Op: op, Args: args, Err: err}
}
func (fs *FileStorage) Sync(_ context.Context) (zodb.Tid, error) {
// FIXME: it currently does not do full sync to check data state as of Sync call time
// XXX -> move closer to watcher
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.downErr != nil {
return zodb.InvalidTid, fs.zerr("sync", nil, fs.downErr)
}
return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
}
func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
......@@ -538,8 +528,15 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error)
defer tick.Stop()
var t0partial time.Time
first := true
var syncv []chan zodb.Tid
mainloop:
for {
// notify Sync(s) that queued before previous stat + advance
for _, sync := range syncv {
sync <- fs.txnhMax.Tid // TODO +lock after commit is implemented
}
syncv = nil // just in case
if !first {
traceWatch("select ...")
select {
......@@ -587,6 +584,12 @@ mainloop:
}
first = false
// remember queued Sync(s) that we should notify after stat + advance
fs.syncMu.Lock()
syncv = fs.syncv
fs.syncv = nil
fs.syncMu.Unlock()
// check f size, to see whether there could be any updates.
fi, err := f.Stat()
if err != nil {
......@@ -706,6 +709,56 @@ mainloop:
}
}
// Sync implements zodb.IStorageDriver.
func (fs *FileStorage) Sync(ctx context.Context) (head zodb.Tid, err error) {
defer func() {
if err != nil {
err = fs.zerr("sync", nil, err)
}
}()
// check file size; if it is the same there was no new commits.
fs.mu.RLock()
topPos := fs.index.TopPos
head = fs.txnhMax.Tid
fs.mu.RUnlock()
fi, err := fs.file.Stat()
if err != nil {
return zodb.InvalidTid, err
}
fsize := fi.Size()
switch {
case fsize == topPos:
return head, nil // same as before
case fsize < topPos:
// XXX add pack support?
return zodb.InvalidTid, fmt.Errorf("file truncated (%d -> %d)", topPos, fsize)
}
// the file has more data than covered by current topPos. However that
// might be in-progress transaction that will be aborted. Ask watcher
// to give us feedback after it goes through one iteration to:
// - stat the file once again, and
// - advance as much as it can.
syncq := make(chan zodb.Tid, 1)
fs.syncMu.Lock()
fs.syncv = append(fs.syncv, syncq)
fs.syncMu.Unlock()
select {
case <-fs.down:
return zodb.InvalidTid, fs.downErr
case <-ctx.Done():
return zodb.InvalidTid, ctx.Err()
case head := <-syncq:
return head, nil
}
}
// --- open + rebuild index ---
// shutdown marks storage as no longer operational with specified reason.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment