Commit 6991ec36 authored by Kirill Smelkov's avatar Kirill Smelkov

X split storage backend from neo.Storage

parent fc2f4a1a
...@@ -508,7 +508,8 @@ func TestMasterStorage(t *testing.T) { ...@@ -508,7 +508,8 @@ func TestMasterStorage(t *testing.T) {
// cluster nodes // cluster nodes
M := NewMaster("abc1", ":1", Mhost) M := NewMaster("abc1", ":1", Mhost)
zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor) zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zback)
C := newClient("abc1", "m:1", Chost) C := newClient("abc1", "m:1", Chost)
// let tracer know how to map state addresses to node names // let tracer know how to map state addresses to node names
...@@ -919,7 +920,7 @@ func (d tdispatch1) Dispatch(event interface{}) { ...@@ -919,7 +920,7 @@ func (d tdispatch1) Dispatch(event interface{}) {
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) { func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
// create test cluster <- XXX factor to utility func // create test cluster <- XXX factor to utility func
zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs") zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
...@@ -957,7 +958,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -957,7 +958,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
ev.Ack() ev.Ack()
// now after we know Maddr create S & C and start S serving // now after we know Maddr create S & C and start S serving
S := NewStorage("abc1", Maddr, "", Snet, zstor) S := NewStorage("abc1", Maddr, "", Snet, zback)
C := NewClient("abc1", Maddr, Cnet) C := NewClient("abc1", Maddr, Cnet)
wg.Go(func() error { wg.Go(func() error {
...@@ -975,7 +976,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -975,7 +976,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax} xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
buf1, serial1, err := zstor.Load(ctx, xid1) buf1, serial1, _, err := zback.Load(ctx, xid1)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
......
...@@ -32,7 +32,7 @@ import ( ...@@ -32,7 +32,7 @@ import (
"lab.nexedi.com/kirr/go123/prog" "lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" "lab.nexedi.com/kirr/neo/go/neo/storage/fs1"
) )
const storageSummary = "run storage node" const storageSummary = "run storage node"
...@@ -86,14 +86,14 @@ func storageMain(argv []string) { ...@@ -86,14 +86,14 @@ func storageMain(argv []string) {
// XXX hack to use existing zodb storage for data // XXX hack to use existing zodb storage for data
zstor, err := fs1.Open(context.Background(), argv[0]) zback, err := fs1.Open(context.Background(), argv[0])
if err != nil { if err != nil {
prog.Fatal(err) prog.Fatal(err)
} }
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ? net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storSrv := neo.NewStorage(*cluster, master, *bind, net, zstor) storSrv := neo.NewStorage(*cluster, master, *bind, net, zback)
ctx := context.Background() ctx := context.Background()
/* /*
......
...@@ -23,7 +23,8 @@ import ( ...@@ -23,7 +23,8 @@ import (
"context" "context"
"math" "math"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" zfs1 "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
bfs1 "lab.nexedi.com/kirr/neo/go/neo/storage/fs1"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
) )
...@@ -37,10 +38,16 @@ func gox(wg interface { Go(func() error) }, xf func()) { ...@@ -37,10 +38,16 @@ func gox(wg interface { Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf)) wg.Go(exc.Funcx(xf))
} }
func xfs1stor(path string) *fs1.FileStorage { func xfs1stor(path string) *zfs1.FileStorage {
zstor, err := fs1.Open(bg, path) stor, err := zfs1.Open(bg, path)
exc.Raiseif(err) exc.Raiseif(err)
return zstor return stor
}
func xfs1back(path string) *bfs1.FS1Backend {
back, err := bfs1.Open(bg, path)
exc.Raiseif(err)
return back
} }
var bg = context.Background() var bg = context.Background()
......
// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -31,17 +31,36 @@ import ( ...@@ -31,17 +31,36 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xio" "lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
) )
// StorageBackend is the interface for actual storage service that is used by Storage node.
type StorageBackend interface {
// LastTid should return the id of the last committed transaction.
//
// XXX same as in zodb.IStorageDriver
// XXX +viewAt ?
LastTid(ctx context.Context) (zodb.Tid, error)
// LastOid should return the max object id stored.
LastOid(ctx context.Context) (zodb.Oid, error)
// Load, similarly to zodb.IStorageDriver.Load should load object data addressed by xid.
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial
// XXX +viewAt ?
Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial, nextSerial zodb.Tid, err error)
}
// Storage is NEO node that keeps data and provides read/write access to it via network. // Storage is NEO node that keeps data and provides read/write access to it via network.
//
// Storage implements only NEO protocol logic with data being persisted via provided StorageBackend.
type Storage struct { type Storage struct {
node *NodeApp node *NodeApp
...@@ -51,33 +70,19 @@ type Storage struct { ...@@ -51,33 +70,19 @@ type Storage struct {
opMu sync.Mutex opMu sync.Mutex
opCtx context.Context opCtx context.Context
// TODO storage layout: back StorageBackend
// meta/
// data/
// 1 inbox/ (commit queues)
// 2 ? (data.fs)
// 3 packed/ (deltified objects)
//
// XXX we currently depend on extra functionality FS provides over
// plain zodb.IStorage (e.g. loading with nextSerial) and even if
// nextSerial will be gone in the future, we will probably depend on
// particular layout more and more -> directly work with fs1 & friends.
//
// TODO -> abstract into backend interfaces so various backands are
// possible (e.g. +SQL)
zstor *fs1.FileStorage // underlying ZODB storage
//nodeCome chan nodeCome // node connected //nodeCome chan nodeCome // node connected
} }
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr. // NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr.
// //
// The storage uses zstor as underlying backend for storing data. // The storage uses back as underlying backend for storing data.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor *fs1.FileStorage) *Storage { func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back StorageBackend) *Storage {
stor := &Storage{ stor := &Storage{
node: NewNodeApp(net, proto.STORAGE, clusterName, masterAddr, serveAddr), node: NewNodeApp(net, proto.STORAGE, clusterName, masterAddr, serveAddr),
zstor: zstor, back: back,
} }
// operational context is initially done (no service should be provided) // operational context is initially done (no service should be provided)
...@@ -295,8 +300,8 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -295,8 +300,8 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
// TODO AskUnfinishedTransactions // TODO AskUnfinishedTransactions
case *proto.LastIDs: case *proto.LastIDs:
lastTid, zerr1 := stor.zstor.LastTid(ctx) lastTid, zerr1 := stor.back.LastTid(ctx)
lastOid, zerr2 := stor.zstor.LastOid(ctx) lastOid, zerr2 := stor.back.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil { if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr // XXX send the error to M return zerr // XXX send the error to M
} }
...@@ -539,7 +544,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot ...@@ -539,7 +544,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
} }
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial // FIXME kill nextSerial support after neo/py cache does not depend on next_serial
buf, serial, nextSerial, err := stor.zstor.Load_XXXWithNextSerialXXX(ctx, xid) buf, serial, nextSerial, err := stor.back.Load(ctx, xid)
if err != nil { if err != nil {
// translate err to NEO protocol error codes // translate err to NEO protocol error codes
e := err.(*zodb.OpError) // XXX move this to ErrEncode? e := err.(*zodb.OpError) // XXX move this to ErrEncode?
...@@ -576,7 +581,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot ...@@ -576,7 +581,7 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
} }
case *proto.LastTransaction: case *proto.LastTransaction:
lastTid, err := stor.zstor.LastTid(ctx) lastTid, err := stor.back.LastTid(ctx)
if err != nil { if err != nil {
return proto.ErrEncode(err) return proto.ErrEncode(err)
} }
......
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package fs1 provides NEO storage backend based on ZODB FileStorage.
package fs1
import (
"context"
"lab.nexedi.com/kirr/go123/mem"
//"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
)
type FS1Backend struct {
// TODO storage layout:
// meta/
// data/
// 1 inbox/ (commit queues)
// 2 ? (data.fs)
// 3 packed/ (deltified objects)
//
// XXX we currently depend on extra functionality FS provides over
// plain zodb.IStorage (e.g. loading with nextSerial) and even if
// nextSerial will be gone in the future, we will probably depend on
// particular layout more and more -> directly work with fs1 & friends.
//
// TODO -> abstract into backend interfaces so various backands are
// possible (e.g. +SQL)
zstor *fs1.FileStorage // underlying ZODB storage
}
// XXX disabled not to create import cycle with neo(test)
// XXX -> backend registry?
//var _ neo.StorageBackend = (*FS1Backend)(nil)
func Open(ctx context.Context, path string) (*FS1Backend, error) {
zstor, err := fs1.Open(ctx, path)
if err != nil {
return nil, err
}
return &FS1Backend{zstor: zstor}, nil
}
func (f *FS1Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
return f.zstor.LastTid(ctx)
}
func (f *FS1Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
return f.zstor.LastOid(ctx)
}
func (f *FS1Backend) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, zodb.Tid, error) {
// FIXME kill nextSerial support after neo/py cache does not depend on next_serial
return f.zstor.Load_XXXWithNextSerialXXX(ctx, xid)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment