Commit 58e0142c authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: Fix it to provide "Close vs watchq" guaranty

Provide guaranty that Close forces the driver to stop sending to watchq
and to close it. See a5dbb92b ("go/zodb: Require drivers to close watchq
on Close") for details.

Without the fix TestWatch fails with test timeout:

    panic: test timed out after 30s

    # Close waits for serve to stop
    goroutine 93 [semacquire]:
    sync.runtime_Semacquire(0xc000152170)
            /home/kirr/src/tools/go/go/src/runtime/sema.go:56 +0x45
    sync.(*WaitGroup).Wait(0xc000152168)
            /home/kirr/src/tools/go/go/src/sync/waitgroup.go:130 +0x65
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zLink).Close(0xc0001520f0, 0x1313, 0x1)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zrpc.go:159 +0x47
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zeo).Close(0xc000313680, 0xc000107c78, 0x1)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo.go:526 +0x2e
    lab.nexedi.com/kirr/neo/go/internal/xtesting.DrvTestWatch(0xc000082c00, 0xc0000aa2a0, 0x24, 0x6a4a38)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/internal/xtesting/xtesting.go:442 +0xdb5
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.TestWatch.func1(0xc000082c00, 0x6e3498, 0xc00009a380)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:270 +0x99
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEOSrv.func2.1(0xc0000a4168, 0x16)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:207 +0xfb
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEOSrv.func1(0xc000082c00, 0xc00009c180)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:186 +0x129
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.withZEOSrv.func2(0xc000082c00)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo_test.go:199 +0x10e
    testing.tRunner(0xc000082c00, 0xc00009c160)
            /home/kirr/src/tools/go/go/src/testing/testing.go:1194 +0xef
    created by testing.(*T).Run
            /home/kirr/src/tools/go/go/src/testing/testing.go:1239 +0x2b3

    # serve is stuck in invalidateTransaction doing watchq<-
    goroutine 26 [chan send]:
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zeo).invalidateTransaction(0xc000313680, 0x6417e0, 0xc000323b60, 0x0, 0x0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zeo.go:176 +0x373
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zLink).serveRecv1(0xc0001520f0, 0xc000393890, 0x0, 0x0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zrpc.go:225 +0x4b4
    lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zLink).serveRecv(0xc0001520f0)
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zrpc.go:176 +0x8d
    created by lab.nexedi.com/kirr/neo/go/zodb/storage/zeo.(*zLink).start
            /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/storage/zeo/zrpc.go:99 +0xc8
parent 88848c31
// Copyright (C) 2018-2020 Nexedi SA and Contributors. // Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -51,6 +51,9 @@ type zeo struct { ...@@ -51,6 +51,9 @@ type zeo struct {
// becomes ready when serve loop finishes // becomes ready when serve loop finishes
serveWG sync.WaitGroup serveWG sync.WaitGroup
closeOnce sync.Once
closed chan struct{} // ready when Closed
url string // we were opened via this url string // we were opened via this
} }
...@@ -173,7 +176,13 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) { ...@@ -173,7 +176,13 @@ func (z *zeo) invalidateTransaction(arg interface{}) (err error) {
// at0 is initialized - ok to send current event if it goes > at0 // at0 is initialized - ok to send current event if it goes > at0
if tid > z.at0 { if tid > z.at0 {
z.watchq <- event select {
case <-z.closed:
// closed - client does not read watchq anymore
case z.watchq <- event:
// ok
}
} }
return nil return nil
} }
...@@ -187,7 +196,13 @@ func (z *zeo) flushEventq0() { ...@@ -187,7 +196,13 @@ func (z *zeo) flushEventq0() {
if z.watchq != nil { if z.watchq != nil {
for _, e := range z.eventq0 { for _, e := range z.eventq0 {
if e.Tid > z.at0 { if e.Tid > z.at0 {
z.watchq <- e select {
case <-z.closed:
// closed - client does not read watchq anymore
case z.watchq <- e:
// ok
}
} }
} }
} }
...@@ -437,7 +452,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -437,7 +452,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}() }()
z := &zeo{link: zlink, watchq: opt.Watchq, url: url} z := &zeo{link: zlink, watchq: opt.Watchq, closed: make(chan struct{}), url: url}
// start serve loop on the link // start serve loop on the link
z.serveWG.Add(1) z.serveWG.Add(1)
...@@ -455,14 +470,18 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -455,14 +470,18 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// close .watchq after serve is over // close .watchq after serve is over
z.at0Mu.Lock() z.at0Mu.Lock()
defer z.at0Mu.Unlock() defer z.at0Mu.Unlock()
if z.at0Initialized {
z.flushEventq0()
}
if z.watchq != nil { if z.watchq != nil {
if err != nil { if err != nil && /* already flushed .eventq0 */z.at0Initialized {
z.watchq <- &zodb.EventError{Err: err} select {
case <-z.closed:
// closed - client does not read watchq anymore
case z.watchq <- &zodb.EventError{Err: err}:
// ok
}
} }
close(z.watchq) close(z.watchq)
z.watchq = nil // prevent flushEventq0 to send to closed chan
} }
}() }()
...@@ -495,13 +514,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -495,13 +514,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
// "invalidateTransaction" server notification. // "invalidateTransaction" server notification.
// //
// filter-out first < at0 messages for this reason. // filter-out first < at0 messages for this reason.
//
// do this in separate task not to deadlock in watchq<- : we did not
// yet returned z to caller and so noone might be yet reading from watchq.
go func() {
z.at0Mu.Lock() z.at0Mu.Lock()
z.at0 = lastTid z.at0 = lastTid
z.at0Initialized = true z.at0Initialized = true
z.flushEventq0() z.flushEventq0()
z.at0Mu.Unlock() z.at0Mu.Unlock()
}()
//call('get_info') -> {}str->str, ex // XXX can be omitted //call('get_info') -> {}str->str, ex // XXX can be omitted
/* /*
...@@ -519,11 +541,15 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -519,11 +541,15 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
'supports_record_iternext': True}) 'supports_record_iternext': True})
*/ */
return z, z.at0, nil return z, lastTid, nil
} }
func (z *zeo) Close() error { func (z *zeo) Close() error {
err := z.link.Close() var err error
z.closeOnce.Do(func() {
close(z.closed)
err = z.link.Close()
})
z.serveWG.Wait() z.serveWG.Wait()
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment