Commit 5404250a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent dacbb6cf
...@@ -16,9 +16,14 @@ ...@@ -16,9 +16,14 @@
package fs1 package fs1
import ( import (
"bufio"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"io" "io"
"net"
"os"
"strconv"
"../../zodb" "../../zodb"
"./fsb" "./fsb"
...@@ -73,65 +78,67 @@ func (fsi *fsIndex) Save(topPos int64, w io.Writer) error { ...@@ -73,65 +78,67 @@ func (fsi *fsIndex) Save(topPos int64, w io.Writer) error {
p := pickle.NewEncoder(w) p := pickle.NewEncoder(w)
err := p.Encode(topPos) err := p.Encode(topPos)
if err != nil {
goto out
}
var oidb [8]byte {
var posb [8]byte if err != nil {
var oidPrefixCur zodb.Oid // current oid[0:6] with [6:8] = 00 goto out
oidBuf := []byte{} // current oid[6:8]oid[6:8]... }
posBuf := []byte{} // current pos[0:6]pos[0:6]...
var t [2]interface{} // tuple for (oid, fsBucket.toString())
var oidb [8]byte
var posb [8]byte
var oidPrefixCur zodb.Oid // current oid[0:6] with [6:8] = 00
oidBuf := []byte{} // current oid[6:8]oid[6:8]...
posBuf := []byte{} // current pos[0:6]pos[0:6]...
var t [2]interface{} // tuple for (oid, fsBucket.toString())
e, err := fsi.SeekFirst() e, err := fsi.SeekFirst()
if err == io.EOF { // always only io.EOF indicating an empty btree if err == io.EOF { // always only io.EOF indicating an empty btree
goto skip goto skip
} }
for { for {
oid, pos, errStop := e.Next() oid, pos, errStop := e.Next()
oidPrefix := oid & oidPrefixMask oidPrefix := oid & oidPrefixMask
if oidPrefix != oidPrefixCur {
// emit (oid[:6], oid[6:8]oid[6:8]...pos[0:6]pos[0:6]...)
binary.BigEndian.PutUint64(oidb[:], uint64(oid))
t[0] = oidb[0:6]
t[1] = bytes.Join([][]byte{oidBuf, posBuf}, nil)
err = p.Encode(t)
if err != nil {
goto out
}
oidPrefixCur = oidPrefix
oidBuf = oidBuf[:0]
posBuf = posBuf[:0]
}
if oidPrefix != oidPrefixCur { if errStop != nil {
// emit (oid[:6], oid[6:8]oid[6:8]...pos[0:6]pos[0:6]...) break
binary.BigEndian.PutUint64(oidb[:], uint64(oid))
t[0] = oidb[0:6]
t[1] = bytes.Join([][]byte{oidBuf, posBuf}, nil)
err = p.Encode(t)
if err != nil {
goto out
} }
oidPrefixCur = oidPrefix binary.BigEndian.PutUint64(oidb[:], uint64(oid))
oidBuf = oidBuf[:0] binary.BigEndian.PutUint64(posb[:], uint64(pos))
posBuf = posBuf[:0]
}
if errStop != nil { // XXX check pos does not overflow 6 bytes
break oidBuf = append(oidBuf, oidb[6:8]...)
posBuf = append(posBuf, posb[0:6]...)
} }
binary.BigEndian.PutUint64(oidb[:], uint64(oid)) e.Close()
binary.BigEndian.PutUint64(posb[:], uint64(pos))
// XXX check pos does not overflow 6 bytes skip:
oidBuf = append(oidBuf, oidb[6:8]...) err = p.Encode(pickle.None{})
posBuf = append(posBuf, posb[0:6]...)
} }
e.Close()
skip:
err = p.Encode(pickle.None{})
out: out:
if err == nil { if err == nil {
return err return err
} }
if _, ok := err.(pikle.TypeError); ok { if _, ok := err.(*pickle.TypeError); ok {
panic(err) // all our types are expected to be supported by pickle panic(err) // all our types are expected to be supported by pickle
} }
...@@ -140,99 +147,173 @@ out: ...@@ -140,99 +147,173 @@ out:
return &IndexIOError{"save", err} return &IndexIOError{"save", err}
} }
// IndexLoadError is the errortype returned by index load routines
type IndexLoadError struct {
Filename string
Pos int64
Err error
}
func (e *IndexLoadError) Error() string {
s := e.Filename
if s != "" {
s += ": "
}
s += "pickle @" + strconv.FormatInt(e.Pos, 10) + ": "
s += e.Err.Error()
return s
}
// LoadIndex loads index from a reader // LoadIndex loads index from a reader
func LoadIndex(r io.Reader) (topPos int64, fsi *fsIndex, err error) { func LoadIndex(r io.Reader) (topPos int64, fsi *fsIndex, err error) {
p := pickle.NewDecoder(r) xr := NewBufReader(r)
// by passing bufio.Reader directly we make sure it won't create one internally
// if we can know file position we can show it in error context p := pickle.NewDecoder(xr.Reader)
rseek, _ := r.(io.Seeker) var picklePos int64
var rpos int64
decode := func() (interface{}, error) { {
if rseek != nil { picklePos = xr.InputOffset()
rpos = rseek.Seek(...) // XXX not ok as p buffers r internally xtopPos, err := p.Decode()
if err != nil {
goto out
}
var ok bool
topPos, ok = xtopPos.(int64)
if !ok {
err = fmt.Errorf("topPos is %T (expected int64)", xtopPos)
goto out
} }
}
xtopPos, err := p.Decode()
if err != nil {
// TODO err
}
topPos, ok := xtopPos.(int64)
if !ok {
// TODO err
}
fsi = &fsIndex{} // TODO cmpFunc ... fsi = &fsIndex{} // TODO cmpFunc ...
var oidb [8]byte var oidb [8]byte
var posb [8]byte var posb [8]byte
loop: loop:
for { for {
xv, err := p.Decode() // load/decode next entry
if err != nil { picklePos = xr.InputOffset()
// TODO err xv, err := p.Decode()
} if err != nil {
switch xv.(type) { goto out
default: }
// TODO err
break
case pickle.None: switch xv.(type) {
break loop default:
err = fmt.Errorf("invalid entry: type %T", xv)
goto out
case []interface{}: case pickle.None:
// so far ok break loop
}
v := xv.([]interface{}) case []interface{}:
if len(v) != 2 { // so far ok
// TODO err }
}
xoidPrefixStr := v[0] // unpack entry tuple -> oidPrefix, fsBucket
oidPrefixStr, ok := xoidPrefixStr.(string) v := xv.([]interface{})
if !ok || len(oidPrefixStr) != 6 { if len(v) != 2 {
// TODO err = fmt.Errorf("invalid entry: len = %i", len(v))
} goto out
copy(oidb[:], oidPrefixStr) }
oidPrefix := zodb.Oid(binary.BigEndian.Uint64(oidb[:]))
xkvStr := v[1] // decode oidPrefix
kvStr, ok := xkvStr.(string) xoidPrefixStr := v[0]
if !ok || len(kvStr) % 8 != 0 { oidPrefixStr, ok := xoidPrefixStr.(string)
// TODO if !ok {
} err = fmt.Errorf("invalid oidPrefix: type %T", xoidPrefixStr)
goto out
}
if l := len(oidPrefixStr); l != 6 {
err = fmt.Errorf("invalid oidPrefix: len = %i", l)
goto out
}
copy(oidb[:], oidPrefixStr)
oidPrefix := zodb.Oid(binary.BigEndian.Uint64(oidb[:]))
// check fsBucket
xkvStr := v[1]
kvStr, ok := xkvStr.(string)
if !ok {
err = fmt.Errorf("invalid fsBucket: type %T", xkvStr)
goto out
}
if l := len(kvStr); l % 8 != 0 {
err = fmt.Errorf("invalid fsBucket: len = %i", l)
goto out
}
kvBuf := mem.Bytes(kvStr) // load btree from fsBucket entries
kvBuf := mem.Bytes(kvStr)
n := len(kvBuf) / 8 n := len(kvBuf) / 8
oidBuf := kvBuf[:n*2] oidBuf := kvBuf[:n*2]
posBuf := kvBuf[n*2:] posBuf := kvBuf[n*2:]
for i:=0; i<n; i++ { for i:=0; i<n; i++ {
oid := zodb.Oid(binary.BigEndian.Uint16(oidBuf[i*2:])) oid := zodb.Oid(binary.BigEndian.Uint16(oidBuf[i*2:]))
oid |= oidPrefix oid |= oidPrefix
copy(posb[2:], posBuf[i*6:]) copy(posb[2:], posBuf[i*6:])
tid := zodb.Tid(binary.BigEndian.Uint64(posb[:])) pos := int64(binary.BigEndian.Uint64(posb[:]))
fsi.Set(oid, tid) fsi.Set(oid, pos)
}
} }
} }
return topPos, fsi, nil
out: out:
if err == nil { if err == nil {
return topPos, fsi, err return topPos, fsi, err
} }
rname := IOName(r) return 0, nil, &IndexLoadError{IOName(r), picklePos, err}
}
// CountReader is an io.Reader that count total bytes read.
type CountReader struct {
io.Reader
nread int64
}
func (r *CountReader) Read(p []byte) (int, error) {
n, err := r.Reader.Read(p)
r.nread += int64(n)
return n, err
}
// InputOffset returns current position in input stream
func (r *CountReader) InputOffset() int64 {
return r.nread
}
// BufReader is a bufio.Reader + bell & whistles
type BufReader struct {
*bufio.Reader
cr *CountReader
}
func NewBufReader(r io.Reader) *BufReader {
// idempotent(BufReader)
if r, ok := r.(*BufReader); ok {
return r
}
// idempotent(CountReader)
cr, ok := r.(*CountReader)
if !ok {
cr = &CountReader{r, 0}
}
// same for file name
rname, _ := r.(interface{ Name() string })
return &BufReader{bufio.NewReader(cr), cr}
} }
// InputOffset returns current position in input stream
func (r *BufReader) InputOffset() int64 {
return r.cr.InputOffset() - int64(r.Reader.Buffered())
}
// IOName returns a "filename" associated with io.Reader, io.Writer, net.Conn, ... // IOName returns a "filename" associated with io.Reader, io.Writer, net.Conn, ...
// if name cannot be deterined - "" is returned. // if name cannot be deterined - "" is returned.
...@@ -245,14 +326,14 @@ func IOName(f interface {}) string { ...@@ -245,14 +326,14 @@ func IOName(f interface {}) string {
case net.Conn: case net.Conn:
// XXX not including LocalAddr is ok? // XXX not including LocalAddr is ok?
return f.RemoteAddr.String() return f.RemoteAddr().String()
case *io.LimitedReader: case *io.LimitedReader:
return IOName(f.R) return IOName(f.R)
case *io.PipeReader: case *io.PipeReader:
fallthrough return "pipe"
case *io.PipeWriter: case *io.PipeWriter:
return "pipe" return "pipe"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment