Commit a98c9d24 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3398c0af
......@@ -25,6 +25,7 @@ import (
"compress/zlib"
"context"
"crypto/sha1"
"fmt"
"io"
"math/rand"
"net/url"
......@@ -77,7 +78,7 @@ func (c *Client) Close() error {
// return err
}
func (c *Client) LastTid() (zodb.Tid, error) {
func (c *Client) LastTid(ctx context.Context) (zodb.Tid, error) {
panic("TODO")
/*
c.Mlink // XXX check we are connected
......@@ -98,17 +99,17 @@ func (c *Client) LastTid() (zodb.Tid, error) {
*/
}
func (c *Client) LastOid() (zodb.Oid, error) {
func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
// XXX there is no LastOid in NEO/py
panic("TODO")
}
// decompress decompresses data according to zlib encoding.
//
// out buffer, if there is enough capacity, is used for decompression destionation.
// out buffer, if there is enough capacity, is used for decompression destination.
// if out has not not enough capacity a new buffer is allocated and used.
//
// return: destination buffer with full decompressed data.
// return: destination buffer with full decompressed data or error.
func decompress(in []byte, out []byte) ([]byte, error) {
bin := bytes.NewReader(in)
zr, err := zlib.NewReader(bin)
......@@ -126,23 +127,33 @@ func decompress(in []byte, out []byte) ([]byte, error) {
return bout.Bytes(), nil
}
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
// XXX check pt is operational first? -> no if there is no data - we'll
// just won't find ready cell
//
// XXX or better still check first M told us ok to go? (ClusterState=RUNNING)
//if c.node.ClusterState != ClusterRunning {
// return nil, 0, &Error{NOT_READY, "cluster not operational"}
//}
cellv := c.node.PartTab.Get(xid.Oid)
// XXX cellv = filter(cellv, UP_TO_DATE)
if len(cellv) == 0 {
return nil, 0, fmt.Errorf("no storages alive for oid %v", xid.Oid) // XXX err ctx
}
cell := cellv[rand.Intn(len(cellv))]
stor := c.node.NodeTab.Get(cell.NodeUUID)
if stor == nil {
panic(0) // XXX
return nil, 0, fmt.Errorf("storage %v not yet known", cell.NodeUUID) // XXX err ctx
}
// XXX check stor.State == RUNNING
// XXX check stor.State == RUNNING -> in link
Sconn, err := stor.Conn()
Sconn := stor.Conn // XXX temp stub
//Sconn, err := stor.Conn()
if err != nil {
panic(0) // XXX
return nil, 0, err // XXX err ctx
}
defer lclose(Sconn)
defer lclose(ctx, Sconn)
req := neo.GetObject{Oid: xid.Oid}
if xid.TidBefore {
......@@ -160,12 +171,12 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
}
checksum := sha1.Sum(data)
if checksum != reply.Checksum {
if checksum != resp.Checksum {
// XXX data corrupt
}
data := resp.Data
if reply.Compression {
data = resp.Data
if resp.Compression {
data, err = decompress(resp.Data, make([]byte, 0, len(resp.Data)))
if err != nil {
// XXX data corrupt
......
package client
import (
"context"
"io"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
// XXX dup in neo,server
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
......@@ -82,9 +82,6 @@ type NodeTable struct {
}
// // special error indicating dial is currently in progress
// var errDialInprogress = errors.New("dialing...")
// even if dialing a peer failed, we'll attempt redial after this timeout
const δtRedial = 3 * time.Second
......
......@@ -334,8 +334,8 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
// TODO AskUnfinishedTransactions
case *neo.LastIDs:
lastTid, zerr1 := stor.zstor.LastTid()
lastOid, zerr2 := stor.zstor.LastOid()
lastTid, zerr1 := stor.zstor.LastTid(ctx)
lastOid, zerr2 := stor.zstor.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr // XXX send the error to M
}
......@@ -488,10 +488,11 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
for {
err := stor.serveClient1(ctx, conn)
if err != nil {
return err
log.Infof(ctx, "%v: %v", conn, err)
return
}
lclose(conn)
lclose(ctx, conn)
// keep on going in the same goroutine to avoid goroutine creation overhead
// TODO Accept += timeout, go away if inactive
......@@ -545,7 +546,7 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
}
// serveClient1 serves 1 request from a client
func (stor *Storage) serveClient1(conn *neo.Conn) error {
func (stor *Storage) serveClient1(ctx context.Context, conn *neo.Conn) error {
req, err := conn.Recv()
if err != nil {
return err // XXX log / err / send error before closing
......
......@@ -30,7 +30,7 @@ import (
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
// XXX dup in neo
// XXX dup in neo, client
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
......
......@@ -9,7 +9,7 @@ import (
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
// XXX dup in server
// XXX dup in server, client
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
......
......@@ -24,6 +24,7 @@ package storage
//go:generate sh -c "go run ../../xcommon/tracing/cmd/gotrace/{gotrace,util}.go ."
import (
"context"
"fmt"
"sort"
"sync"
......@@ -107,7 +108,7 @@ type revCacheEntry struct {
// StorLoader represents loading part of a storage.
// XXX -> zodb?
type StorLoader interface {
Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error)
Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error)
}
// lock order: Cache.mu > oidCacheEntry
......@@ -148,7 +149,7 @@ func (c *Cache) SetSizeMax(sizeMax int) {
// Load loads data from database via cache.
//
// If data is already in cache - cached content is returned.
func (c *Cache) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
rce, rceNew := c.lookupRCE(xid)
// rce is already in cache - use it
......@@ -162,7 +163,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
} else {
// XXX use connection poll
// XXX or it should be cared by loader?
c.loadRCE(rce, xid.Oid)
c.loadRCE(ctx, rce, xid.Oid)
}
if rce.err != nil {
......@@ -184,7 +185,7 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
// If data is not yet in cache loading for it is started in the background.
// Prefetch is not blocking operation and does not wait for loading, if any was
// started, to complete.
func (c *Cache) Prefetch(xid zodb.Xid) {
func (c *Cache) Prefetch(ctx context.Context, xid zodb.Xid) {
rce, rceNew := c.lookupRCE(xid)
// !rceNew -> no need to adjust LRU - it will be adjusted by further actual data Load
......@@ -193,7 +194,7 @@ func (c *Cache) Prefetch(xid zodb.Xid) {
// spawn loading in the background if rce was not yet loaded
if rceNew {
// XXX use connection poll
go c.loadRCE(rce, xid.Oid)
go c.loadRCE(ctx, rce, xid.Oid)
}
}
......@@ -290,15 +291,16 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
//
// rce must be new just created by lookupRCE() with returned rceNew=true.
// loading completion is signalled by closing rce.ready.
func (c *Cache) loadRCE(rce *revCacheEntry, oid zodb.Oid) {
func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
oce := rce.parent
data, serial, err := c.loader.Load(zodb.Xid{
data, serial, err := c.loader.Load(ctx, zodb.Xid{
Oid: oid,
XTid: zodb.XTid{Tid: rce.before, TidBefore: true},
})
// normalize data/serial if it was error
if err != nil {
// XXX err == canceled? -> ?
data = nil
serial = 0
}
......
......@@ -21,6 +21,7 @@ package storage
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
......@@ -49,7 +50,7 @@ type tOidData struct {
err error // e.g. io error
}
func (stor *tStorage) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
//fmt.Printf("> load(%v)\n", xid)
//defer func() { fmt.Printf("< %v, %v, %v\n", data, serial, err) }()
tid := xid.Tid
......@@ -142,11 +143,12 @@ func TestCache(t *testing.T) {
}
c := NewCache(tstor, 100 /* > Σ all data */)
ctx := context.Background()
checkLoad := func(xid zodb.Xid, data []byte, serial zodb.Tid, err error) {
t.Helper()
bad := &bytes.Buffer{}
d, s, e := c.Load(xid)
d, s, e := c.Load(ctx, xid)
if !reflect.DeepEqual(data, d) {
fmt.Fprintf(bad, "data:\n%s\n", pretty.Compare(data, d))
}
......@@ -335,7 +337,7 @@ func TestCache(t *testing.T) {
// (<14 also becomes ready and takes oce lock first, merging <12 and <14 into <16.
// <16 did not yet took oce lock so c.size is temporarily reduced and
// <16 is not yet on LRU list)
c.loadRCE(rce1_b14, 1)
c.loadRCE(ctx, rce1_b14, 1)
checkRCE(rce1_b14, 14, 10, world, nil)
checkRCE(rce1_b16, 16, 10, world, nil)
checkRCE(rce1_b12, 12, 10, world, nil)
......@@ -344,7 +346,7 @@ func TestCache(t *testing.T) {
// (<16 takes oce lock and updates c.size and LRU list)
rce1_b16.ready = make(chan struct{}) // so loadRCE could run
c.loadRCE(rce1_b16, 1)
c.loadRCE(ctx, rce1_b16, 1)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16)
checkMRU(12, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
......@@ -364,7 +366,7 @@ func TestCache(t *testing.T) {
checkMRU(12, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4) // no <17 and <18 yet
// (<18 loads and takes oce lock first - merge <17 with <18)
c.loadRCE(rce1_b18, 1)
c.loadRCE(ctx, rce1_b18, 1)
checkRCE(rce1_b18, 18, 16, zz, nil)
checkRCE(rce1_b17, 17, 16, zz, nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b18)
......@@ -437,7 +439,7 @@ func TestCache(t *testing.T) {
// <9 must be separate from <8 and <10 because it is IO error there
rce1_b9, new9 := c.lookupRCE(xidlt(1,9))
ok1(new9)
c.loadRCE(rce1_b9, 1)
c.loadRCE(ctx, rce1_b9, 1)
checkRCE(rce1_b9, 9, 0, nil, ioerr)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b10, rce1_b16, rce1_b20, rce1_b22)
checkMRU(17, rce1_b9, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
......@@ -507,7 +509,7 @@ func TestCache(t *testing.T) {
checkMRU(15, rce1_b16, rce1_b7, rce1_b9, rce1_b22)
// reload <20 -> <22 should be evicted
go c.Load(xidlt(1,20))
go c.Load(ctx, xidlt(1,20))
tc.Expect(gcstart, gcfinish)
// - evicted <22 (lru.1, www, size=3)
......@@ -520,7 +522,7 @@ func TestCache(t *testing.T) {
checkMRU(14, rce1_b20_2, rce1_b16, rce1_b7, rce1_b9)
// load big <78 -> several rce must be evicted
go c.Load(xidlt(1,78))
go c.Load(ctx, xidlt(1,78))
tc.Expect(gcstart, gcfinish)
// - evicted <9 (lru.1, ioerr, size=0)
......@@ -548,7 +550,7 @@ func TestCache(t *testing.T) {
checkMRU(0)
// XXX verify caching vs ctx cancel
// XXX verify db inconsistency checks
// XXX verify loading with before > cache.before
}
......
......@@ -198,14 +198,14 @@ func (fs *FileStorage) Close() error {
}
func (fs *FileStorage) LastTid() (zodb.Tid, error) {
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX check we have transactions at all
// XXX what to return if not?
// XXX must be under lock
return fs.txnhMax.Tid, nil // XXX error always nil ?
}
func (fs *FileStorage) LastOid() (zodb.Oid, error) {
func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
// XXX check we have objects at all?
// XXX what to return if not?
// XXX must be under lock
......@@ -224,7 +224,7 @@ func (e *ErrXidLoad) Error() string {
return fmt.Sprintf("loading %v: %v", e.Xid, e.Err)
}
func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction who changed this oid
dataPos, ok := fs.index.Get(xid.Oid)
if !ok {
......
......@@ -75,7 +75,7 @@ type oidLoadedOk struct {
// checkLoad verifies that fs.Load(xid) returns expected result
func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect oidLoadedOk) {
data, tid, err := fs.Load(xid)
data, tid, err := fs.Load(context.Background(), xid)
if err != nil {
t.Errorf("load %v: %v", xid, err)
}
......
......@@ -23,6 +23,7 @@
package zodb
import (
"context"
"fmt"
)
......@@ -142,12 +143,12 @@ type IStorage interface {
// LastTid returns the id of the last committed transaction.
// if no transactions have been committed yet, LastTid returns Tid zero value
LastTid() (Tid, error)
LastTid(ctx context.Context) (Tid, error)
// LastOid returns highest object id of objects committed to storage.
// if there is no data committed yet, LastOid returns Oid zero value
// XXX ZODB/py does not define this in IStorage
LastOid() (Oid, error)
LastOid(ctx context.Context) (Oid, error)
// LoadSerial and LoadBefore generalized into 1 Load (see Xid for details)
//
......@@ -156,9 +157,9 @@ type IStorage interface {
// XXX currently deleted data is returned as data=nil -- is it ok?
// TODO specify error when data not found -> ErrOidMissing | ErrXidMissing
// TODO data []byte -> something allocated from slab ?
Load(xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ?
Load(ctx context.Context, xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ?
// Prefetch(xid Xid) (no error)
// Prefetch(ctx, xid Xid) (no error)
// Store(oid Oid, serial Tid, data []byte, txn ITransaction) error
// XXX Restore ?
......@@ -171,7 +172,7 @@ type IStorage interface {
// XXX allow iteration both ways (forward & backward)
// XXX text
Iterate(tidMin, tidMax Tid) IStorageIterator // XXX , error ?
Iterate(tidMin, tidMax Tid) IStorageIterator // XXX ctx , error ?
}
type IStorageIterator interface {
......@@ -180,12 +181,12 @@ type IStorageIterator interface {
// 2. iterator over transaction data records.
// transaction metadata stays valid until next call to NextTxn().
// end of iteration is indicated with io.EOF
NextTxn() (*TxnInfo, IStorageRecordIterator, error)
NextTxn() (*TxnInfo, IStorageRecordIterator, error) // XXX ctx
}
type IStorageRecordIterator interface { // XXX naming -> IRecordIterator
// NextData yields information about next storage data record.
// returned data stays valid until next call to NextData().
// end of iteration is indicated with io.EOF
NextData() (*StorageRecordInformation, error)
NextData() (*StorageRecordInformation, error) // XXX ctx
}
......@@ -33,8 +33,8 @@ import (
// Catobj dumps content of one ZODB object
// The object is printed in raw form without any headers (see Dumpobj)
func Catobj(w io.Writer, stor zodb.IStorage, xid zodb.Xid) error {
data, _, err := stor.Load(xid)
func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid) error {
data, _, err := stor.Load(ctx, xid)
if err != nil {
return err
}
......@@ -44,10 +44,10 @@ func Catobj(w io.Writer, stor zodb.IStorage, xid zodb.Xid) error {
}
// Dumpobj dumps content of one ZODB object with zodbdump-like header
func Dumpobj(w io.Writer, stor zodb.IStorage, xid zodb.Xid, hashOnly bool) error {
func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, hashOnly bool) error {
var objInfo zodb.StorageRecordInformation
data, tid, err := stor.Load(xid)
data, tid, err := stor.Load(ctx, xid)
if err != nil {
return err
}
......@@ -118,7 +118,9 @@ func catobjMain(argv []string) {
Fatal("only 1 object allowed with -raw")
}
stor, err := zodb.OpenStorageURL(context.Background(), storUrl) // TODO read-only
ctx := context.Background()
stor, err := zodb.OpenStorageURL(ctx, storUrl) // TODO read-only
if err != nil {
Fatal(err)
}
......@@ -126,9 +128,9 @@ func catobjMain(argv []string) {
catobj := func(xid zodb.Xid) error {
if raw {
return Catobj(os.Stdout, stor, xid)
return Catobj(ctx, os.Stdout, stor, xid)
} else {
return Dumpobj(os.Stdout, stor, xid, hashOnly)
return Dumpobj(ctx, os.Stdout, stor, xid, hashOnly)
}
}
......
......@@ -32,14 +32,19 @@ import (
)
// paramFunc is a function to retrieve 1 storage parameter
type paramFunc func(stor zodb.IStorage) (string, error)
type paramFunc func(ctx context.Context, stor zodb.IStorage) (string, error)
var infov = []struct {name string; getParam paramFunc} {
// XXX e.g. stor.LastTid() should return err itself
{"name", func(stor zodb.IStorage) (string, error) { return stor.StorageName(), nil }},
{"name", func(ctx context.Context, stor zodb.IStorage) (string, error) {
return stor.StorageName(), nil
}},
// TODO reenable size
// {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }},
{"last_tid", func(stor zodb.IStorage) (string, error) {tid, err := stor.LastTid(); return tid.String(), err }},
{"last_tid", func(ctx context.Context, stor zodb.IStorage) (string, error) {
tid, err := stor.LastTid(ctx)
return tid.String(), err
}},
}
// {} parameter_name -> get_parameter(stor)
......@@ -52,7 +57,7 @@ func init() {
}
// Info prints general information about a ZODB storage
func Info(w io.Writer, stor zodb.IStorage, parameterv []string) error {
func Info(ctx context.Context, w io.Writer, stor zodb.IStorage, parameterv []string) error {
wantnames := false
if len(parameterv) == 0 {
for _, info := range infov {
......@@ -71,7 +76,7 @@ func Info(w io.Writer, stor zodb.IStorage, parameterv []string) error {
if wantnames {
out += parameter + "="
}
value, err := getParam(stor)
value, err := getParam(ctx, stor)
if err != nil {
return fmt.Errorf("getting %s: %v", parameter, err)
}
......@@ -115,12 +120,14 @@ func infoMain(argv []string) {
}
storUrl := argv[0]
stor, err := zodb.OpenStorageURL(context.Background(), storUrl) // TODO read-only
ctx := context.Background()
stor, err := zodb.OpenStorageURL(ctx, storUrl) // TODO read-only
if err != nil {
Fatal(err)
}
err = Info(os.Stdout, stor, argv[1:])
err = Info(ctx, os.Stdout, stor, argv[1:])
if err != nil {
Fatal(err)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment