Commit 028efc96 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 828077d9
...@@ -20,6 +20,7 @@ package neo ...@@ -20,6 +20,7 @@ package neo
import ( import (
"context" "context"
"fmt"
"net/url" "net/url"
"../zodb" "../zodb"
...@@ -27,24 +28,44 @@ import ( ...@@ -27,24 +28,44 @@ import (
type Client struct { type Client struct {
storLink *NodeLink // link to storage node storLink *NodeLink // link to storage node
storConn *Conn // XXX main connection to storage
} }
var _ zodb.IStorage = (*Client)(nil) var _ zodb.IStorage = (*Client)(nil)
//func Open(...) (*Client, error) {
//}
func (c *Client) StorageName() string { func (c *Client) StorageName() string {
return "neo" // TODO more specific return "neo" // TODO more specific
} }
func (c *Client) Close() error { func (c *Client) Close() error {
panic("TODO") // NOTE this will abort all currently in-flght IO and close all connections over storLink
err := c.storLink.Close()
// XXX also wait for some goroutines to finish ?
return err
} }
func (c *Client) LastTid() zodb.Tid { func (c *Client) LastTid() (zodb.Tid, error) {
panic("TODO") // XXX open new conn for this particular req/reply ?
err := EncodeAndSend(c.storConn, &LastTransaction{})
if err != nil {
return 0, err // XXX err context
}
reply, err := RecvAndDecode(c.storConn)
if err != nil {
return 0, err // XXX err context
}
switch reply := reply.(type) {
case *Error:
return 0, reply // XXX err context
default:
// XXX more error context ?
return 0, fmt.Errorf("protocol error: unexpected reply: %T", reply)
case *AnswerLastTransaction:
return reply.Tid, nil
}
} }
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
...@@ -66,9 +87,16 @@ func openClientByURL(u *url.URL) (zodb.IStorage, error) { ...@@ -66,9 +87,16 @@ func openClientByURL(u *url.URL) (zodb.IStorage, error) {
return nil, err return nil, err
} }
return &Client{storLink}, nil conn := storLink.NewConn()
// TODO identify ourselves via conn
return &Client{storLink, conn}, nil
} }
//func Open(...) (*Client, error) {
//}
func init() { func init() {
zodb.RegisterStorage("neo", openClientByURL) zodb.RegisterStorage("neo", openClientByURL)
} }
...@@ -285,7 +285,7 @@ func (p *Error) NEOEncodedLen() int { ...@@ -285,7 +285,7 @@ func (p *Error) NEOEncodedLen() int {
} }
func (p *Error) NEOEncode(data []byte) { func (p *Error) NEOEncode(data []byte) {
binary.BigEndian.PutUint32(data[0:], p.Code) binary.BigEndian.PutUint32(data[0:], uint32(p.Code))
{ {
l := uint32(len(p.Message)) l := uint32(len(p.Message))
binary.BigEndian.PutUint32(data[4:], l) binary.BigEndian.PutUint32(data[4:], l)
...@@ -300,7 +300,7 @@ func (p *Error) NEODecode(data []byte) (int, error) { ...@@ -300,7 +300,7 @@ func (p *Error) NEODecode(data []byte) (int, error) {
if uint32(len(data)) < 8 { if uint32(len(data)) < 8 {
goto overflow goto overflow
} }
p.Code = binary.BigEndian.Uint32(data[0:]) p.Code = ErrorCode(binary.BigEndian.Uint32(data[0:]))
{ {
l := binary.BigEndian.Uint32(data[4:]) l := binary.BigEndian.Uint32(data[4:])
data = data[8:] data = data[8:]
......
//go:generate stringer -output proto-str2.go -type ErrorCode proto.go
package neo
// XXX or better translate to some other errors ?
// XXX here - not in proto.go - because else stringer will be confused
func (e *Error) Error() string {
s := e.Code.String()
if e.Message != "" {
s += ": " + e.Message
}
return s
}
// Code generated by "stringer -output proto-str2.go -type ErrorCode proto.go"; DO NOT EDIT.
package neo
import "fmt"
const _ErrorCode_name = "ACKNOT_READYOID_NOT_FOUNDTID_NOT_FOUNDOID_DOES_NOT_EXISTPROTOCOL_ERRORBROKEN_NODEREPLICATION_ERRORCHECKING_ERRORBACKEND_NOT_IMPLEMENTEDNON_READABLE_CELLREAD_ONLY_ACCESSINCOMPLETE_TRANSACTION"
var _ErrorCode_index = [...]uint8{0, 3, 12, 25, 38, 56, 70, 81, 98, 112, 135, 152, 168, 190}
func (i ErrorCode) String() string {
if i >= ErrorCode(len(_ErrorCode_index)-1) {
return fmt.Sprintf("ErrorCode(%d)", i)
}
return _ErrorCode_name[_ErrorCode_index[i]:_ErrorCode_index[i+1]]
}
...@@ -23,7 +23,8 @@ const ( ...@@ -23,7 +23,8 @@ const (
RESPONSE_MASK = 0x8000 RESPONSE_MASK = 0x8000
) )
type ErrorCode int //type ErrorCode int
type ErrorCode uint32
const ( const (
ACK ErrorCode = iota ACK ErrorCode = iota
NOT_READY NOT_READY
...@@ -217,8 +218,8 @@ type Notify struct { ...@@ -217,8 +218,8 @@ type Notify struct {
// any other message, even if such a message does not expect a reply // any other message, even if such a message does not expect a reply
// usually. Any -> Any. // usually. Any -> Any.
type Error struct { type Error struct {
Code uint32 // PNumber //Code uint32 // PNumber
//Code ErrorCode // PNumber Code ErrorCode // PNumber
Message string Message string
} }
......
...@@ -170,7 +170,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -170,7 +170,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
data, tid, err := stor.zstor.Load(xid) data, tid, err := stor.zstor.Load(xid)
if err != nil { if err != nil {
// TODO translate err to NEO protocol error codes // TODO translate err to NEO protocol error codes
reply = &Error{Code: 0, Message: err.Error()} reply = &Error{Code: 0, Message: err.Error()} // XXX Code
} else { } else {
reply = &AnswerGetObject{ reply = &AnswerGetObject{
Oid: xid.Oid, Oid: xid.Oid,
...@@ -188,11 +188,22 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -188,11 +188,22 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
EncodeAndSend(conn, reply) // XXX err EncodeAndSend(conn, reply) // XXX err
case *LastTransaction: case *LastTransaction:
lastTid := stor.zstor.LastTid() var reply NEOEncoder
EncodeAndSend(conn, &AnswerLastTransaction{lastTid}) // XXX err
lastTid, err := stor.zstor.LastTid()
if err != nil {
reply = &Error{Code:0, Message: err.Error()}
} else {
reply = &AnswerLastTransaction{lastTid}
}
EncodeAndSend(conn, reply) // XXX err
//case *ObjectHistory: //case *ObjectHistory:
//case *StoreObject: //case *StoreObject:
default:
panic("unexpected packet") // XXX
} }
//req.Put(...) //req.Put(...)
...@@ -204,7 +215,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -204,7 +215,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
const storageSummary = "run NEO storage node" const storageSummary = "run NEO storage node"
// TODO options: // TODO options:
// cluster, masterv, bind ... // cluster, masterv ...
func storageUsage(w io.Writer) { func storageUsage(w io.Writer) {
fmt.Fprintf(w, fmt.Fprintf(w,
......
...@@ -717,10 +717,10 @@ func Open(path string) (*FileStorage, error) { ...@@ -717,10 +717,10 @@ func Open(path string) (*FileStorage, error) {
} }
func (fs *FileStorage) LastTid() zodb.Tid { func (fs *FileStorage) LastTid() (zodb.Tid, error) {
// XXX check we have transactions at all // XXX check we have transactions at all
// XXX what to return then? // XXX what to return then?
return fs.txnhMax.Tid return fs.txnhMax.Tid, nil // XXX error always nil ?
} }
// ErrXidLoad is returned when there is an error while loading xid // ErrXidLoad is returned when there is an error while loading xid
......
...@@ -136,8 +136,7 @@ type IStorage interface { ...@@ -136,8 +136,7 @@ type IStorage interface {
// LastTid returns the id of the last committed transaction. // LastTid returns the id of the last committed transaction.
// if not transactions have been committed yet, LastTid returns Tid zero value // if not transactions have been committed yet, LastTid returns Tid zero value
// XXX ^^^ ok ? LastTid() (Tid, error)
LastTid() Tid // XXX -> Tid, ok ? ,err ?
// LoadSerial and LoadBefore generalized into 1 Load (see Xid for details) // LoadSerial and LoadBefore generalized into 1 Load (see Xid for details)
// TODO data []byte -> something allocated from slab ? // TODO data []byte -> something allocated from slab ?
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package zodbtools package zodbtools
//go:generate sh -c "python2 -m zodbtools.zodb dump ../../../storage/fs1/testdata/1.fs >testdata/1.zdump.pyok" //go:generate sh -c "python2 -m zodbtools.zodb dump ../../zodb/storage/fs1/testdata/1.fs >testdata/1.zdump.pyok"
import ( import (
"bytes" "bytes"
......
...@@ -37,7 +37,7 @@ var infov = []struct {name string; getParam paramFunc} { ...@@ -37,7 +37,7 @@ var infov = []struct {name string; getParam paramFunc} {
{"name", func(stor zodb.IStorage) (string, error) { return stor.StorageName(), nil }}, {"name", func(stor zodb.IStorage) (string, error) { return stor.StorageName(), nil }},
// TODO reenable size // TODO reenable size
// {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }}, // {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }},
{"last_tid", func(stor zodb.IStorage) (string, error) {return stor.LastTid().String(), nil }}, {"last_tid", func(stor zodb.IStorage) (string, error) {tid, err := stor.LastTid(); return tid.String(), err }},
} }
// {} parameter_name -> get_parameter(stor) // {} parameter_name -> get_parameter(stor)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment