Commit 5aae345c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 472d16cf
......@@ -91,26 +91,37 @@ type FileStorage struct {
// protects updates to index and to txnh{Min,Max} - in other words
// to everything that depends on what current last transaction is.
// mu also protects downErr.
mu sync.RWMutex
index *Index // oid -> data record position in transaction which last changed oid
txnhMin TxnHeader // transaction headers for min/max transactions committed
txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty)
downErr error // !nil when the storage is no longer operational
// driver client <- watcher: database commits.
watchq chan<- zodb.WatchEvent
down chan struct{} // closed when FileStorage is no longer operational
downOnce sync.Once
down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
}
// IStorageDriver
var _ zodb.IStorageDriver = (*FileStorage)(nil)
// zerr turns err into zodb.OpError about fs.op(args)
func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpError {
return &zodb.OpError{URL: fs.URL(), Op: op, Args: args, Err: err}
}
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.downErr != nil {
return zodb.InvalidTid, fs.zerr("last_tid", nil, fs.downErr)
}
return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
}
......@@ -118,6 +129,10 @@ func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.downErr != nil {
return zodb.InvalidOid, fs.zerr("last_oid", nil, fs.downErr)
}
lastOid, _ := fs.index.Last() // returns zero-value, if empty
return lastOid, nil
}
......@@ -157,7 +172,7 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, seri
func (fs *FileStorage) Load_XXXWithNextSerialXXX(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error) {
buf, serial, nextSerial, err = fs.load(xid)
if err != nil {
err = &zodb.OpError{URL: fs.URL(), Op: "load", Args: xid, Err: err}
err = fs.zerr("load", xid, err)
}
return buf, serial, nextSerial, err
}
......@@ -167,6 +182,10 @@ func (fs *FileStorage) Load_XXXWithNextSerialXXX(_ context.Context, xid zodb.Xid
func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction which changed this oid
fs.mu.RLock()
if err := fs.downErr; err != nil {
fs.mu.RUnlock()
return nil, 0, 0, err
}
dataPos, ok := fs.index.Get(xid.Oid)
fs.mu.RUnlock()
if !ok {
......@@ -333,6 +352,12 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt
func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) {
fs.mu.RLock()
// no longer operational
if err := fs.downErr; err != nil {
fs.mu.RUnlock()
return TxnHeader{}, err
}
// check for empty database
if fs.txnhMin.Len == 0 {
// empty database - no such record
......@@ -419,14 +444,9 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
return ziter
case err != nil:
return &iterStartError{&zodb.OpError{
URL: fs.URL(),
Op: "iter",
// XXX (?) add TidRange type which prints as
// "tidmin..tidmax" with omitting ends if it is either 0 or ∞
Args: []zodb.Tid{tidMin, tidMax},
Err: err,
}}
return &iterStartError{fs.zerr("iter", []zodb.Tid{tidMin, tidMax}, err)}
}
// setup iter from what findTxnRecord found
......@@ -462,16 +482,22 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
}
}
// XXX fs.watchErr = err (-> fail other operations)?
if err != nil {
log.Print(err)
}
// if watcher failed with e.g. IO error we no longer know what is real
// last_tid and which objects were modified in the IO error region.
// -> storage operations have to fail from now on.
fs.shutdown(err)
if fs.watchq != nil {
close(fs.watchq)
}
}
// _watcher serves watcher and returns either when fs is closed (ok), or when
// it hits any kind of non-recoverable error.
func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
f := fs.file
idx := fs.index
......@@ -507,7 +533,23 @@ mainloop:
// recheck the file periodically.
}
// XXX dequeue everything from w?
// we will be advancing through the file as much as we can.
// drain everything what is currently left in fs watcher queue.
drain:
for {
select {
case err := <-w.Errors:
if err != fsnotify.ErrEventOverflow {
// unexpected error -> shutdown
return err
}
case <-w.Events:
default:
break drain
}
}
}
first = false
......@@ -620,19 +662,26 @@ mainloop:
// --- open + rebuild index ---
func (fs *FileStorage) shutdown() {
// shutdown marks storage as no longer operational with specified reason.
//
// only the first call takes the effect.
func (fs *FileStorage) shutdown(reason error) {
fs.downOnce.Do(func() {
fs.errClose = fs.file.Close()
close(fs.down)
fs.mu.Lock()
defer fs.mu.Unlock()
fs.downErr = fmt.Errorf("not operational due: %s", reason)
})
}
func (fs *FileStorage) Close() error {
fs.shutdown()
fs.shutdown(fmt.Errorf("closed"))
// XXX wait for watcher?
if fs.errClose != nil {
return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: fs.errClose}
return fs.zerr("close", nil, fs.errClose)
}
// TODO if opened !ro -> .saveIndex()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment