Commit 18b22971 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 51e5b8b2
...@@ -23,7 +23,6 @@ package zeo ...@@ -23,7 +23,6 @@ package zeo
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net/url" "net/url"
"strings" "strings"
"sync" "sync"
...@@ -113,28 +112,31 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte ...@@ -113,28 +112,31 @@ func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIte
// invalidateTransaction receives invalidations from server // invalidateTransaction receives invalidations from server
func (z *zeo) invalidateTransaction(arg interface{}) error { func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
defer xerr.Context(&err, "invalidateTransaction")
t, ok := z.srv.asTuple(arg) t, ok := z.srv.asTuple(arg)
if !ok || len(t) != 2 { if !ok || len(t) != 2 {
return XXX("got %#v; expect 2-tuple", arg) return fmt.Errorf("got %#v; expect 2-tuple", arg)
} }
// (tid, oidv) // (tid, oidv)
tid, ok1 := z.srv.tidUnpack(t[0]) tid, ok1 := z.srv.tidUnpack(t[0])
xoidt, ok2 := z.srv.asTuple(t[1]) xoidt, ok2 := z.srv.asTuple(t[1])
if !(ok1 && ok2) { if !(ok1 && ok2) {
return XXX("got (%T, %T); expect (tid, []oid)") return fmt.Errorf("got (%T, %T); expect (tid, []oid)", t...)
} }
oidv := []zodb.Oid{} oidv := []zodb.Oid{}
for _, xoid := range xoidt { for _, xoid := range xoidt {
oid, ok := z.srv.oidUnpack(xoid) oid, ok := z.srv.oidUnpack(xoid)
if !ok { if !ok {
return XXX("non-oid %#v in oidv", xoid) return fmt.Errorf("non-oid %#v in oidv", xoid)
} }
oidv = append(oidv, oid)
} }
if tid <= z.head { if tid <= z.head {
return XXX("bad invalidation from server: tid not ↑: %s -> %s", z.head, tid) return fmt.Errorf("bad invalidation from server: tid not ↑: %s -> %s", z.head, tid)
} }
z.head = tid z.head = tid
...@@ -142,6 +144,7 @@ func (z *zeo) invalidateTransaction(arg interface{}) error { ...@@ -142,6 +144,7 @@ func (z *zeo) invalidateTransaction(arg interface{}) error {
return nil return nil
} }
// invalidation event received and we have to send it to .watchq
event := &zodb.EventCommit{Tid: tid, Changev: oidv} event := &zodb.EventCommit{Tid: tid, Changev: oidv}
z.at0Mu.Lock() z.at0Mu.Lock()
...@@ -379,13 +382,6 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -379,13 +382,6 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented") return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
} }
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
if opt.Watchq != nil {
log.Print("zeo: FIXME: watchq support not implemented - there " +
"won't be notifications about database changes")
}
zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ... zl, err := dialZLink(ctx, net, addr) // XXX + methodTable {invalidateTransaction tid, oidv} -> ...
if err != nil { if err != nil {
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
...@@ -416,7 +412,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -416,7 +412,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
} }
lastTid, ok := zl.tidUnpack(xlastTid) // XXX -> xlastTid -> scan lastTid, ok := zl.tidUnpack(xlastTid)
if !ok { if !ok {
return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid) return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment