Commit d9cfa3c0 authored by Kirill Smelkov's avatar Kirill Smelkov

X Move xbufio out of fs1

parent 0c867eca
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
"../../../../storage/fs1" "../../../../storage/fs1"
"../../../../xcommon/xbufio"
"../../../../xcommon/xfmt" "../../../../xcommon/xfmt"
"../../../../xcommon/xslice" "../../../../xcommon/xslice"
) )
...@@ -70,7 +71,7 @@ func fsTail(w io.Writer, path string, ntxn int) (err error) { ...@@ -70,7 +71,7 @@ func fsTail(w io.Writer, path string, ntxn int) (err error) {
data := []byte{} data := []byte{}
// use sequential IO buffer // use sequential IO buffer
fSeq := fs1.NewSeqBufReader(f) fSeq := xbufio.NewSeqReaderAt(f)
// start iterating at tail. // start iterating at tail.
// this should get EOF but read txnh.LenPrev ok. // this should get EOF but read txnh.LenPrev ok.
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"os" "os"
"../../zodb" "../../zodb"
"../../xcommon/xbufio"
"../../xcommon/xslice" "../../xcommon/xslice"
) )
...@@ -797,7 +798,7 @@ const ( ...@@ -797,7 +798,7 @@ const (
// txnIter is iterator over transaction records // txnIter is iterator over transaction records
type txnIter struct { type txnIter struct {
fsSeq *SeqBufReader fsSeq *xbufio.SeqReaderAt
Txnh TxnHeader // current transaction information Txnh TxnHeader // current transaction information
TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir
...@@ -807,7 +808,7 @@ type txnIter struct { ...@@ -807,7 +808,7 @@ type txnIter struct {
// dataIter is iterator over data records inside one transaction // dataIter is iterator over data records inside one transaction
type dataIter struct { type dataIter struct {
fsSeq *SeqBufReader fsSeq *xbufio.SeqReaderAt
Txnh *TxnHeader // header of transaction we are iterating inside Txnh *TxnHeader // header of transaction we are iterating inside
Datah DataHeader Datah DataHeader
...@@ -921,7 +922,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -921,7 +922,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
Iter := iterator{} Iter := iterator{}
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
fsSeq := NewSeqBufReader(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
Iter.txnIter.fsSeq = fsSeq Iter.txnIter.fsSeq = fsSeq
Iter.dataIter.fsSeq = fsSeq Iter.dataIter.fsSeq = fsSeq
Iter.dataIter.Txnh = &Iter.txnIter.Txnh Iter.dataIter.Txnh = &Iter.txnIter.Txnh
......
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017 Nexedi SA and Contributors. XXX -> GPLv3
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -10,9 +10,8 @@ ...@@ -10,9 +10,8 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// XXX move -> xbufio // Package xbufio provides addons to std bufio package.
package xbufio
package fs1
import ( import (
"io" "io"
...@@ -20,16 +19,14 @@ import ( ...@@ -20,16 +19,14 @@ import (
//"log" //"log"
) )
// SeqBufReader implements buffering for a io.ReaderAt optimized for sequential access // SeqReaderAt implements buffering for a io.ReaderAt optimized for sequential access
// Both forward, backward and interleaved forward/backward access patterns are supported // Both forward, backward and interleaved forward/backward access patterns are supported
// //
// NOTE SeqBufReader is not safe to use from multiple goroutines concurrently. // NOTE SeqReaderAt is not safe to use from multiple goroutines concurrently.
// Strictly speaking this goes against io.ReaderAt interface but sequential // Strictly speaking this goes against io.ReaderAt interface but sequential
// workloads usually means sequential processing. It would be a pity to // workloads usually mean sequential processing. It would be a pity to
// add mutex for nothing. // add mutex for nothing.
// type SeqReaderAt struct {
// XXX -> xbufio.SeqReader
type SeqBufReader struct {
// buffer for data at pos. cap(buf) - whole buffer capacity // buffer for data at pos. cap(buf) - whole buffer capacity
buf []byte buf []byte
pos int64 pos int64
...@@ -47,12 +44,12 @@ type SeqBufReader struct { ...@@ -47,12 +44,12 @@ type SeqBufReader struct {
// TODO text about syscall / memcpy etc // TODO text about syscall / memcpy etc
const defaultSeqBufSize = 8192 // XXX retune - must be <= size(L1d) / 2 const defaultSeqBufSize = 8192 // XXX retune - must be <= size(L1d) / 2
func NewSeqBufReader(r io.ReaderAt) *SeqBufReader { func NewSeqReaderAt(r io.ReaderAt) *SeqReaderAt {
return NewSeqBufReaderSize(r, defaultSeqBufSize) return NewSeqReaderAtSize(r, defaultSeqBufSize)
} }
func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { func NewSeqReaderAtSize(r io.ReaderAt, size int) *SeqReaderAt {
sb := &SeqBufReader{r: r, buf: make([]byte, 0, size)} // all positions are zero initially sb := &SeqReaderAt{r: r, buf: make([]byte, 0, size)} // all positions are zero initially
return sb return sb
} }
...@@ -63,7 +60,7 @@ func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader { ...@@ -63,7 +60,7 @@ func NewSeqBufReaderSize(r io.ReaderAt, size int) *SeqBufReader {
// } // }
// debug helper for sb.r.ReadAt // debug helper for sb.r.ReadAt
func (sb *SeqBufReader) ioReadAt(p []byte, pos int64) (int, error) { func (sb *SeqReaderAt) ioReadAt(p []byte, pos int64) (int, error) {
/* /*
verb := "read" verb := "read"
if len(p) > cap(sb.buf) { if len(p) > cap(sb.buf) {
...@@ -75,7 +72,7 @@ func (sb *SeqBufReader) ioReadAt(p []byte, pos int64) (int, error) { ...@@ -75,7 +72,7 @@ func (sb *SeqBufReader) ioReadAt(p []byte, pos int64) (int, error) {
return sb.r.ReadAt(p, pos) return sb.r.ReadAt(p, pos)
} }
func (sb *SeqBufReader) ReadAt(p []byte, pos int64) (int, error) { func (sb *SeqReaderAt) ReadAt(p []byte, pos int64) (int, error) {
//log.Printf("access\t[%v, %v)\t#%v\t@%+d", pos, pos + len64(p), len(p), pos - sb.posLastAccess) //log.Printf("access\t[%v, %v)\t#%v\t@%+d", pos, pos + len64(p), len(p), pos - sb.posLastAccess)
// read-in last access positions and update them in *sb with current ones for next read // read-in last access positions and update them in *sb with current ones for next read
......
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017 Nexedi SA and Contributors. XXX -> GPLv3
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -10,9 +10,7 @@ ...@@ -10,9 +10,7 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// XXX move -> xbufio package xbufio
package fs1
import ( import (
"bytes" "bytes"
...@@ -209,9 +207,9 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} { ...@@ -209,9 +207,9 @@ var xSeqBufTestv = []struct {pos int64; Len int; bufPos int64; bufLen int} {
} }
func TestSeqBufReader(t *testing.T) { func TestSeqReaderAt(t *testing.T) {
r := &XReader{} r := &XReader{}
rb := NewSeqBufReaderSize(r, 10) // with 10 it is easier to do/check math for a human rb := NewSeqReaderAtSize(r, 10) // with 10 it is easier to do/check math for a human
for _, tt := range xSeqBufTestv { for _, tt := range xSeqBufTestv {
pOk := make([]byte, tt.Len) pOk := make([]byte, tt.Len)
...@@ -236,9 +234,9 @@ func TestSeqBufReader(t *testing.T) { ...@@ -236,9 +234,9 @@ func TestSeqBufReader(t *testing.T) {
} }
// this is benchmark for how thin wrapper is, not for logic inside it // this is benchmark for how thin wrapper is, not for logic inside it
func BenchmarkSeqBufReader(b *testing.B) { func BenchmarkSeqReaderAt(b *testing.B) {
r := &XReader{} r := &XReader{}
rb := NewSeqBufReaderSize(r, 10) // same as in TestSeqBufReader rb := NewSeqReaderAtSize(r, 10) // same as in TestSeqReaderAt
buf := make([]byte, 128 /* > all .Len in xSeqBufTestv */) buf := make([]byte, 128 /* > all .Len in xSeqBufTestv */)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment