Commit 664b34bb authored by Kirill Smelkov's avatar Kirill Smelkov

Sync with NEO/py v1.10

parents 27df6fe8 e915d7a5
...@@ -16,6 +16,19 @@ This happens in the following conditions: ...@@ -16,6 +16,19 @@ This happens in the following conditions:
4. the cell is checked completely before it could replicate up to the max tid 4. the cell is checked completely before it could replicate up to the max tid
to check to check
Sometimes, it causes the master to crash::
File "neo/lib/handler.py", line 72, in dispatch
method(conn, *args, **kw)
File "neo/master/handlers/storage.py", line 93, in notifyReplicationDone
cell_list = app.backup_app.notifyReplicationDone(node, offset, tid)
File "neo/master/backup_app.py", line 337, in notifyReplicationDone
assert cell.isReadable()
AssertionError
Workaround: make sure all cells are up-to-date before checking replicas. Workaround: make sure all cells are up-to-date before checking replicas.
Found by running testBackupNodeLost many times. Found by running testBackupNodeLost many times:
- either a failureException: 12 != 11
- or the above assert failure, in which case the unit test freezes
Change History Change History
============== ==============
1.10 (2018-07-16)
-----------------
A important performance improvement is that the replication now remembers where
it was interrupted: a storage node that gets disconnected for a short time now
gets fully operational quite instantaneously because it only has to replicate
the new data. Before, the time to recover depended on the size of the DB, just
to verify that most of the data are already transferred.
As a small optimization, an empty transaction extension is now serialized with
an empty string.
The above 2 changes required a bump of the protocol version, as well as an
upgrade of the storage format. Once upgraded (this is done automatically as
usual), databases can't be opened anymore by older versions of NEO.
Other general changes:
- Add support for custom compression levels.
- Maximize resiliency by taking into account the topology of storage nodes.
- Fix a few issues with ZODB5. Note however that merging several DB with the
Importer backend only works if they were only used with ZODB < 5.
Master:
- Automatically discard feeding cells that get out-of-date.
Client:
- Fix partial import from a source storage.
- Store uncompressed if compressed size is equal.
Storage:
- Fixed v1.9 code that sped up the replication by sending bigger network
packets.
- Fix replication of creation undone.
- Stop logging 'Abort TXN' for txn that have been locked.
- Clarify log about data deletion of discarded cells.
MySQL backend:
- Fix replication of big OIDs (> 16M).
- Do not full-scan for duplicates of big OIDs if deduplication is disabled.
- Fix remaining places where a server disconnection was not catched.
SQlite backend:
- Fix indexes of upgraded databases.
Importer backend:
- Fetch and process the data to import in a separate process. It is even
usually free to use the best compression level.
- New option to write back new transactions to the source database.
See 'importer.conf' for more information.
- Give a title to the 'import' and 'writeback' subprocesses,
if the 'setproctitle' egg is installed.
- Log when the transaction index for FileStorage DB is built.
- Open imported database in read-only whenever possible.
- Do not trigger speedupFileStorageTxnLookup uselessly.
- Do not checksum data twice.
- Fix NameError when recovering during tpc_finish.
1.9 (2018-03-13) 1.9 (2018-03-13)
---------------- ----------------
......
graft tools graft tools
include neo.conf CHANGELOG.rst TODO TESTS.txt ZODB3.patch include neo.conf CHANGELOG.rst TODO ZODB3.patch
...@@ -537,8 +537,8 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -537,8 +537,8 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// on the wire it comes as "before", not "at" // on the wire it comes as "before", not "at"
req := proto.GetObject{ req := proto.GetObject{
Oid: xid.Oid, Oid: xid.Oid,
Tid: at2Before(xid.At), Before: at2Before(xid.At),
Serial: proto.INVALID_TID, At: proto.INVALID_TID,
} }
resp := proto.AnswerObject{} resp := proto.AnswerObject{}
......
// Copyright (C) 2016-2018 Nexedi SA and Contributors. // Copyright (C) 2016-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -113,6 +113,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri ...@@ -113,6 +113,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType proto.NodeType, addr stri
UUID: app.MyInfo.UUID, UUID: app.MyInfo.UUID,
Address: app.MyInfo.Addr, Address: app.MyInfo.Addr,
ClusterName: app.ClusterName, ClusterName: app.ClusterName,
DevPath: nil, // XXX stub
IdTime: app.MyInfo.IdTime, // XXX ok? IdTime: app.MyInfo.IdTime, // XXX ok?
} }
accept := &proto.AcceptIdentification{} accept := &proto.AcceptIdentification{}
......
...@@ -107,6 +107,7 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -107,6 +107,7 @@ func TestMasterStorage(t0 *testing.T) {
UUID: 0, UUID: 0,
Address: xnaddr("s:1"), Address: xnaddr("s:1"),
ClusterName: "abc1", ClusterName: "abc1",
DevPath: nil,
IdTime: proto.IdTimeNone, IdTime: proto.IdTimeNone,
})) }))
...@@ -208,6 +209,7 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -208,6 +209,7 @@ func TestMasterStorage(t0 *testing.T) {
UUID: 0, UUID: 0,
Address: xnaddr(""), Address: xnaddr(""),
ClusterName: "abc1", ClusterName: "abc1",
DevPath: nil,
IdTime: proto.IdTimeNone, IdTime: proto.IdTimeNone,
})) }))
...@@ -293,6 +295,7 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -293,6 +295,7 @@ func TestMasterStorage(t0 *testing.T) {
UUID: proto.UUID(proto.CLIENT, 1), UUID: proto.UUID(proto.CLIENT, 1),
Address: xnaddr(""), Address: xnaddr(""),
ClusterName: "abc1", ClusterName: "abc1",
DevPath: nil,
IdTime: 0.02, IdTime: 0.02,
})) }))
...@@ -307,8 +310,8 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -307,8 +310,8 @@ func TestMasterStorage(t0 *testing.T) {
// ... -> GetObject(xid1) // ... -> GetObject(xid1)
tCS.Expect(conntx("c:2", "s:3", 3, &proto.GetObject{ tCS.Expect(conntx("c:2", "s:3", 3, &proto.GetObject{
Oid: xid1.Oid, Oid: xid1.Oid,
Tid: at2Before(xid1.At), Before: at2Before(xid1.At),
Serial: proto.INVALID_TID, At: proto.INVALID_TID,
})) }))
tCS.Expect(conntx("s:3", "c:2", 3, &proto.AnswerObject{ tCS.Expect(conntx("s:3", "c:2", 3, &proto.AnswerObject{
Oid: xid1.Oid, Oid: xid1.Oid,
...@@ -342,8 +345,8 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -342,8 +345,8 @@ func TestMasterStorage(t0 *testing.T) {
// ... -> GetObject(xid1prev) // ... -> GetObject(xid1prev)
tCS.Expect(conntx("c:2", "s:3", 5, &proto.GetObject{ tCS.Expect(conntx("c:2", "s:3", 5, &proto.GetObject{
Oid: xid1prev.Oid, Oid: xid1prev.Oid,
Tid: serial1, Before: serial1,
Serial: proto.INVALID_TID, At: proto.INVALID_TID,
})) }))
tCS.Expect(conntx("s:3", "c:2", 5, &proto.AnswerObject{ tCS.Expect(conntx("s:3", "c:2", 5, &proto.AnswerObject{
Oid: xid1prev.Oid, Oid: xid1prev.Oid,
......
// Copyright (C) 2016-2018 Nexedi SA and Contributors. // Copyright (C) 2016-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -1015,8 +1015,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -1015,8 +1015,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
case *proto.GetObject: case *proto.GetObject:
err = req.Reply(&proto.AnswerObject{ err = req.Reply(&proto.AnswerObject{
Oid: msg.Oid, Oid: msg.Oid,
Serial: msg.Serial, Serial: msg.At,
DataSerial: msg.Tid, DataSerial: msg.Before,
}) })
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
...@@ -1033,15 +1033,15 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -1033,15 +1033,15 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
obj := &proto.AnswerObject{} obj := &proto.AnswerObject{}
get.Oid = zodb.Oid(i) get.Oid = zodb.Oid(i)
get.Serial = zodb.Tid(i+1) get.At = zodb.Tid(i+1)
get.Tid = zodb.Tid(i+2) get.Before = zodb.Tid(i+2)
err := l1.Ask1(get, obj) err := l1.Ask1(get, obj)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
if !(obj.Oid == get.Oid && obj.Serial == get.Serial && obj.DataSerial == get.Tid) { if !(obj.Oid == get.Oid && obj.Serial == get.At && obj.DataSerial == get.Before) {
b.Fatalf("read back: %v ; requested %v", obj, get) b.Fatalf("read back: %v ; requested %v", obj, get)
} }
......
This diff is collapsed.
// Copyright (C) 2016-2017 Nexedi SA and Contributors. // Copyright (C) 2016-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -42,6 +42,11 @@ func hex(s string) string { ...@@ -42,6 +42,11 @@ func hex(s string) string {
return string(b) return string(b)
} }
// uint8 -> string as encoded on the wire
func u8(v uint8) string {
return string(v)
}
// uint16 -> string as encoded on the wire // uint16 -> string as encoded on the wire
func u16(v uint16) string { func u16(v uint16) string {
var b [2]byte var b [2]byte
...@@ -163,8 +168,8 @@ func TestMsgMarshal(t *testing.T) { ...@@ -163,8 +168,8 @@ func TestMsgMarshal(t *testing.T) {
// empty // empty
{&Ping{}, ""}, {&Ping{}, ""},
// uint32, string // uint8, string
{&Error{Code: 0x01020304, Message: "hello"}, "\x01\x02\x03\x04\x00\x00\x00\x05hello"}, {&Error{Code: 0x04, Message: "hello"}, "\x04\x00\x00\x00\x05hello"},
// Oid, Tid, bool, Checksum, []byte // Oid, Tid, bool, Checksum, []byte
{&StoreObject{ {&StoreObject{
...@@ -194,9 +199,9 @@ func TestMsgMarshal(t *testing.T) { ...@@ -194,9 +199,9 @@ func TestMsgMarshal(t *testing.T) {
hex("0102030405060708") + hex("0102030405060708") +
hex("00000003") + hex("00000003") +
hex("00000001000000020000000b000000000000001100000001") + hex("00000001000000020000000b010000001100") +
hex("00000002000000010000000b00000002") + hex("00000002000000010000000b02") +
hex("00000007000000030000000b000000040000000f000000030000001700000000"), hex("00000007000000030000000b030000000f040000001701"),
}, },
// map[Oid]struct {Tid,Tid,bool} // map[Oid]struct {Tid,Tid,bool}
...@@ -245,11 +250,12 @@ func TestMsgMarshal(t *testing.T) { ...@@ -245,11 +250,12 @@ func TestMsgMarshal(t *testing.T) {
}, },
// uint32, Address, string, IdTime // uint32, Address, string, IdTime
{&RequestIdentification{CLIENT, 17, Address{"localhost", 7777}, "myname", 0.12345678}, {&RequestIdentification{CLIENT, 17, Address{"localhost", 7777}, "myname", []string{"room1", "rack234"}, 0.12345678},
u32(2) + u32(17) + u32(9) + u8(2) + u32(17) + u32(9) +
"localhost" + u16(7777) + "localhost" + u16(7777) +
u32(6) + "myname" + u32(6) + "myname" +
u32(2) + u32(5)+"room1" + u32(7)+"rack234" +
hex("3fbf9add1091c895"), hex("3fbf9add1091c895"),
}, },
...@@ -258,7 +264,7 @@ func TestMsgMarshal(t *testing.T) { ...@@ -258,7 +264,7 @@ func TestMsgMarshal(t *testing.T) {
{CLIENT, Address{}, UUID(CLIENT, 1), RUNNING, 1504466245.925599}}}, {CLIENT, Address{}, UUID(CLIENT, 1), RUNNING, 1504466245.925599}}},
hex("41d66b15517b469d") + u32(1) + hex("41d66b15517b469d") + u32(1) +
u32(2) + u32(0) /* <- ø Address */ + hex("e0000001") + u32(2) + u8(2) + u32(0) /* <- ø Address */ + hex("e0000001") + u8(2) +
hex("41d66b15517b3d04"), hex("41d66b15517b3d04"),
}, },
......
...@@ -341,14 +341,12 @@ import ( ...@@ -341,14 +341,12 @@ import (
// generate code for this type to implement neo.Msg // generate code for this type to implement neo.Msg
var msgCode MsgCode var msgCode MsgCode
msgCode.answer = specAnnotation.answer || strings.HasPrefix(typename, "Answer") msgCode.answer = specAnnotation.answer || strings.HasPrefix(typename, "Answer")
switch { // increment msgSerial only by +1 when going from
case !msgCode.answer || typename == "Error": // request1->request2 in `Request1 Answer1 Request2`.
msgCode.msgSerial = msgSerial if msgCode.answer && typename != "Error" {
msgSerial--
// answer to something
default:
msgCode.msgSerial = msgSerial - 1
} }
msgCode.msgSerial = msgSerial
fmt.Fprintf(&buf, "// %s. %s\n\n", msgCode, typename) fmt.Fprintf(&buf, "// %s. %s\n\n", msgCode, typename)
......
This diff is collapsed.
...@@ -27,7 +27,7 @@ const _ErrorCode_name = "ACKNOT_READYOID_NOT_FOUNDTID_NOT_FOUNDOID_DOES_NOT_EXIS ...@@ -27,7 +27,7 @@ const _ErrorCode_name = "ACKNOT_READYOID_NOT_FOUNDTID_NOT_FOUNDOID_DOES_NOT_EXIS
var _ErrorCode_index = [...]uint8{0, 3, 12, 25, 38, 56, 70, 87, 101, 124, 141, 157, 179} var _ErrorCode_index = [...]uint8{0, 3, 12, 25, 38, 56, 70, 87, 101, 124, 141, 157, 179}
func (i ErrorCode) String() string { func (i ErrorCode) String() string {
if i >= ErrorCode(len(_ErrorCode_index)-1) { if i < 0 || i >= ErrorCode(len(_ErrorCode_index)-1) {
return "ErrorCode(" + strconv.FormatInt(int64(i), 10) + ")" return "ErrorCode(" + strconv.FormatInt(int64(i), 10) + ")"
} }
return _ErrorCode_name[_ErrorCode_index[i]:_ErrorCode_index[i+1]] return _ErrorCode_name[_ErrorCode_index[i]:_ErrorCode_index[i+1]]
...@@ -99,16 +99,16 @@ func _() { ...@@ -99,16 +99,16 @@ func _() {
// An "invalid array index" compiler error signifies that the constant values have changed. // An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again. // Re-run the stringer command to generate them again.
var x [1]struct{} var x [1]struct{}
_ = x[UP_TO_DATE-0] _ = x[OUT_OF_DATE-0]
_ = x[OUT_OF_DATE-1] _ = x[UP_TO_DATE-1]
_ = x[FEEDING-2] _ = x[FEEDING-2]
_ = x[DISCARDED-3] _ = x[CORRUPTED-3]
_ = x[CORRUPTED-4] _ = x[DISCARDED-4]
} }
const _CellState_name = "UP_TO_DATEOUT_OF_DATEFEEDINGDISCARDEDCORRUPTED" const _CellState_name = "OUT_OF_DATEUP_TO_DATEFEEDINGCORRUPTEDDISCARDED"
var _CellState_index = [...]uint8{0, 10, 21, 28, 37, 46} var _CellState_index = [...]uint8{0, 11, 21, 28, 37, 46}
func (i CellState) String() string { func (i CellState) String() string {
if i < 0 || i >= CellState(len(_CellState_index)-1) { if i < 0 || i >= CellState(len(_CellState_index)-1) {
......
...@@ -3,102 +3,102 @@ package proto ...@@ -3,102 +3,102 @@ package proto
var pyMsgRegistry = map[uint16]string{ var pyMsgRegistry = map[uint16]string{
1: "RequestIdentification", 1: "RequestIdentification",
3: "Ping", 2: "Ping",
5: "CloseClient", 3: "CloseClient",
6: "PrimaryMaster", 4: "PrimaryMaster",
8: "NotPrimaryMaster", 5: "NotPrimaryMaster",
9: "NotifyNodeInformation", 6: "NotifyNodeInformation",
10: "Recovery", 7: "Recovery",
12: "LastIDs", 8: "LastIDs",
14: "AskPartitionTable", 9: "AskPartitionTable",
16: "SendPartitionTable", 10: "SendPartitionTable",
17: "NotifyPartitionChanges", 11: "NotifyPartitionChanges",
18: "StartOperation", 12: "StartOperation",
19: "StopOperation", 13: "StopOperation",
20: "UnfinishedTransactions", 14: "UnfinishedTransactions",
22: "LockedTransactions", 15: "LockedTransactions",
24: "FinalTID", 16: "FinalTID",
26: "ValidateTransaction", 17: "ValidateTransaction",
27: "BeginTransaction", 18: "BeginTransaction",
29: "FailedVote", 19: "FailedVote",
30: "FinishTransaction", 20: "FinishTransaction",
32: "LockInformation", 21: "LockInformation",
34: "InvalidateObjects", 22: "InvalidateObjects",
35: "NotifyUnlockInformation", 23: "NotifyUnlockInformation",
36: "AskNewOIDs", 24: "AskNewOIDs",
38: "NotifyDeadlock", 25: "NotifyDeadlock",
39: "RebaseTransaction", 26: "RebaseTransaction",
41: "RebaseObject", 27: "RebaseObject",
43: "StoreObject", 28: "StoreObject",
45: "AbortTransaction", 29: "AbortTransaction",
46: "StoreTransaction", 30: "StoreTransaction",
48: "VoteTransaction", 31: "VoteTransaction",
50: "GetObject", 32: "GetObject",
52: "AskTIDs", 33: "AskTIDs",
54: "TransactionInformation", 34: "TransactionInformation",
56: "ObjectHistory", 35: "ObjectHistory",
58: "PartitionList", 36: "PartitionList",
60: "NodeList", 37: "NodeList",
62: "SetNodeState", 38: "SetNodeState",
63: "AddPendingNodes", 39: "AddPendingNodes",
64: "TweakPartitionTable", 40: "TweakPartitionTable",
65: "SetClusterState", 41: "SetClusterState",
66: "Repair", 42: "Repair",
67: "RepairOne", 43: "RepairOne",
68: "NotifyClusterState", 44: "NotifyClusterState",
69: "AskClusterState", 45: "AskClusterState",
71: "ObjectUndoSerial", 46: "ObjectUndoSerial",
73: "AskTIDsFrom", 47: "AskTIDsFrom",
75: "Pack", 48: "Pack",
77: "CheckReplicas", 49: "CheckReplicas",
78: "CheckPartition", 50: "CheckPartition",
79: "CheckTIDRange", 51: "CheckTIDRange",
81: "CheckSerialRange", 52: "CheckSerialRange",
83: "PartitionCorrupted", 53: "PartitionCorrupted",
84: "NotifyReady", 54: "NotifyReady",
85: "LastTransaction", 55: "LastTransaction",
87: "CheckCurrentSerial", 56: "CheckCurrentSerial",
89: "NotifyTransactionFinished", 57: "NotifyTransactionFinished",
90: "Replicate", 58: "Replicate",
91: "ReplicationDone", 59: "ReplicationDone",
92: "FetchTransactions", 60: "FetchTransactions",
94: "FetchObjects", 61: "FetchObjects",
96: "AddTransaction", 62: "AddTransaction",
97: "AddObject", 63: "AddObject",
98: "Truncate", 64: "Truncate",
32768: "Error", 32768: "Error",
32769: "AcceptIdentification", 32769: "AcceptIdentification",
32771: "Pong", 32770: "Pong",
32774: "AnswerPrimary", 32772: "AnswerPrimary",
32778: "AnswerRecovery", 32775: "AnswerRecovery",
32780: "AnswerLastIDs", 32776: "AnswerLastIDs",
32782: "AnswerPartitionTable", 32777: "AnswerPartitionTable",
32788: "AnswerUnfinishedTransactions", 32782: "AnswerUnfinishedTransactions",
32790: "AnswerLockedTransactions", 32783: "AnswerLockedTransactions",
32792: "AnswerFinalTID", 32784: "AnswerFinalTID",
32795: "AnswerBeginTransaction", 32786: "AnswerBeginTransaction",
32798: "AnswerTransactionFinished", 32788: "AnswerTransactionFinished",
32800: "AnswerInformationLocked", 32789: "AnswerInformationLocked",
32804: "AnswerNewOIDs", 32792: "AnswerNewOIDs",
32807: "AnswerRebaseTransaction", 32794: "AnswerRebaseTransaction",
32809: "AnswerRebaseObject", 32795: "AnswerRebaseObject",
32811: "AnswerStoreObject", 32796: "AnswerStoreObject",
32814: "AnswerStoreTransaction", 32798: "AnswerStoreTransaction",
32816: "AnswerVoteTransaction", 32799: "AnswerVoteTransaction",
32818: "AnswerObject", 32800: "AnswerObject",
32820: "AnswerTIDs", 32801: "AnswerTIDs",
32822: "AnswerTransactionInformation", 32802: "AnswerTransactionInformation",
32824: "AnswerObjectHistory", 32803: "AnswerObjectHistory",
32826: "AnswerPartitionList", 32804: "AnswerPartitionList",
32828: "AnswerNodeList", 32805: "AnswerNodeList",
32837: "AnswerClusterState", 32813: "AnswerClusterState",
32839: "AnswerObjectUndoSerial", 32814: "AnswerObjectUndoSerial",
32841: "AnswerTIDsFrom", 32815: "AnswerTIDsFrom",
32843: "AnswerPack", 32816: "AnswerPack",
32847: "AnswerCheckTIDRange", 32819: "AnswerCheckTIDRange",
32849: "AnswerCheckSerialRange", 32820: "AnswerCheckSerialRange",
32853: "AnswerLastTransaction", 32823: "AnswerLastTransaction",
32855: "AnswerCheckCurrentSerial", 32824: "AnswerCheckCurrentSerial",
32860: "AnswerFetchTransactions", 32828: "AnswerFetchTransactions",
32862: "AnswerFetchObjects", 32829: "AnswerFetchObjects",
} }
// Copyright (C) 2016-2018 Nexedi SA and Contributors. // Copyright (C) 2016-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -533,10 +533,10 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot ...@@ -533,10 +533,10 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
switch req := req.(type) { switch req := req.(type) {
case *proto.GetObject: case *proto.GetObject:
xid := zodb.Xid{Oid: req.Oid} xid := zodb.Xid{Oid: req.Oid}
if req.Serial != proto.INVALID_TID { if req.At != proto.INVALID_TID {
xid.At = req.Serial xid.At = req.At
} else { } else {
xid.At = before2At(req.Tid) xid.At = before2At(req.Before)
} }
obj, err := stor.back.Load(ctx, xid) obj, err := stor.back.Load(ctx, xid)
...@@ -547,11 +547,11 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot ...@@ -547,11 +547,11 @@ func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp prot
// compatibility with py side: // compatibility with py side:
// for loadSerial - check we have exact hit - else "nodata" // for loadSerial - check we have exact hit - else "nodata"
if req.Serial != proto.INVALID_TID { if req.At != proto.INVALID_TID {
if obj.Serial != req.Serial { if obj.Serial != req.At {
return &proto.Error{ return &proto.Error{
Code: proto.OID_NOT_FOUND, Code: proto.OID_NOT_FOUND,
Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.Serial), Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.At),
} }
} }
} }
......
// Copyright (C) 2018 Nexedi SA and Contributors. // Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// schema & queries are based on neo/storage/database/sqlite.py // schema & queries are based on neo/storage/database/sqlite.py
// //
...@@ -67,7 +67,7 @@ import ( ...@@ -67,7 +67,7 @@ import (
// ---- schema ---- // ---- schema ----
const schemaVersion = 2 const schemaVersion = 3
// table "config" stores configuration parameters which affect the persistent data. // table "config" stores configuration parameters which affect the persistent data.
// //
...@@ -80,11 +80,11 @@ const config = ` ...@@ -80,11 +80,11 @@ const config = `
// table "pt" stores a partition table. // table "pt" stores a partition table.
const pt = ` const pt = `
rid INTEGER NOT NULL, -- row id partition INTEGER NOT NULL, -- row id
nid INTEGER NOT NULL, -- node id nid INTEGER NOT NULL, -- node id
state INTEGER NOT NULL, -- cell state tid INTEGER NOT NULL,
PRIMARY KEY (rid, nid) PRIMARY KEY (partition, nid)
` `
// table "trans" stores information on committed transactions. // table "trans" stores information on committed transactions.
...@@ -260,12 +260,10 @@ func (b *Backend) query1(query string, argv ...interface{}) *row1 { ...@@ -260,12 +260,10 @@ func (b *Backend) query1(query string, argv ...interface{}) *row1 {
func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) { func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
var lastTid zodb.Tid var lastTid zodb.Tid
// FIXME nodeID <- my node UUID err := b.query1("SELECT MAX(tid) FROM trans",
myID := proto.UUID(proto.STORAGE, 1) // FIXME + " WHERE partition=?" and caller looping over partitions from readable set
// XXX AND tid<=? (max_tid)
err := b.query1("SELECT MAX(tid) FROM pt, trans" + ).Scan(&lastTid)
" WHERE nid=? AND rid=partition" /* XXX AND tid<=? (max_tid) */,
myID).Scan(&lastTid)
if err != nil { if err != nil {
// no transaction have been committed // no transaction have been committed
...@@ -282,11 +280,9 @@ func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) { ...@@ -282,11 +280,9 @@ func (b *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) { func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
var lastOid zodb.Oid var lastOid zodb.Oid
// FIXME nodeID <- my node UUID err := b.query1("SELECT MAX(oid) FROM obj",
myID := proto.UUID(proto.STORAGE, 1) // FIXME + " WHERE `partition=?`" and caller looping over partitions from readable set
).Scan(&lastOid)
err := b.query1("SELECT MAX(oid) FROM pt, obj WHERE nid=? AND rid=partition",
myID).Scan(&lastOid)
if err != nil { if err != nil {
// no objects // no objects
...@@ -300,16 +296,13 @@ func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) { ...@@ -300,16 +296,13 @@ func (b *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
return lastOid, nil return lastOid, nil
} }
func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (*proto.AnswerObject, error) { func (b *Backend) Load(ctx context.Context, xid zodb.Xid) (_ *proto.AnswerObject, err error) {
// XXX instead of defer defer func() {
obj, err := b.load(xid)
if err != nil { if err != nil {
err = &zodb.OpError{URL: b.url, Op: "load", Err: err} err = &zodb.OpError{URL: b.url, Op: "load", Err: err}
} }
return obj, err }()
}
func (b *Backend) load(xid zodb.Xid) (*proto.AnswerObject, error) {
obj := &proto.AnswerObject{Oid: xid.Oid, DataSerial: 0} obj := &proto.AnswerObject{Oid: xid.Oid, DataSerial: 0}
// TODO reenable, but XXX we have to use Query, not QueryRow for RawBytes support // TODO reenable, but XXX we have to use Query, not QueryRow for RawBytes support
//var data sql.RawBytes //var data sql.RawBytes
...@@ -332,7 +325,7 @@ func (b *Backend) load(xid zodb.Xid) (*proto.AnswerObject, error) { ...@@ -332,7 +325,7 @@ func (b *Backend) load(xid zodb.Xid) (*proto.AnswerObject, error) {
// XXX use conn for several query1 (see below) without intermediate returns to pool? // XXX use conn for several query1 (see below) without intermediate returns to pool?
err := b.query1( err = b.query1(
"SELECT tid, compression, data.hash, value, value_tid" + "SELECT tid, compression, data.hash, value, value_tid" +
" FROM obj LEFT JOIN data ON obj.data_id = data.id" + " FROM obj LEFT JOIN data ON obj.data_id = data.id" +
" WHERE partition=? AND oid=? AND tid<=?" + " WHERE partition=? AND oid=? AND tid<=?" +
......
...@@ -45,6 +45,12 @@ ...@@ -45,6 +45,12 @@
# (instead of adapter=Importer & database=/path_to_this_file). # (instead of adapter=Importer & database=/path_to_this_file).
adapter=MySQL adapter=MySQL
database=neo database=neo
# Keep writing back new transactions to the source database, provided it is
# not splitted. In case of any issue, the import can be aborted without losing
# data. Note however that it is asynchronous so don't stop the storage node
# too quickly after the last committed transaction (e.g. check with tools like
# fstail).
writeback=true
# The other sections are for source databases. # The other sections are for source databases.
[root] [root]
...@@ -52,7 +58,8 @@ database=neo ...@@ -52,7 +58,8 @@ database=neo
# ZEO is possible but less efficient: ZEO servers must be stopped # ZEO is possible but less efficient: ZEO servers must be stopped
# if NEO opens FileStorage DBs directly. # if NEO opens FileStorage DBs directly.
# Note that NEO uses 'new_oid' method to get the last OID, that's why the # Note that NEO uses 'new_oid' method to get the last OID, that's why the
# source DB can't be open read-only. NEO never modifies a FileStorage DB. # source DB can't be open read-only. Unless 'writeback' is enabled, NEO never
# modifies a FileStorage DB.
storage= storage=
<filestorage> <filestorage>
path /path/to/root.fs path /path/to/root.fs
......
...@@ -160,11 +160,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -160,11 +160,7 @@ class Storage(BaseStorage.BaseStorage,
def copyTransactionsFrom(self, source, verbose=False): def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """ """ Zope compliant API """
return self.importFrom(source) return self.app.importFrom(self, source)
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(self, source, start, stop, preindex)
def pack(self, t, referencesf, gc=False): def pack(self, t, referencesf, gc=False):
if gc: if gc:
......
...@@ -14,11 +14,14 @@ ...@@ -14,11 +14,14 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from cPickle import dumps, loads
from zlib import compress, decompress
import heapq import heapq
import time import time
try:
from ZODB._compat import dumps, loads, _protocol
except ImportError:
from cPickle import dumps, loads
_protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from . import OLD_ZODB from . import OLD_ZODB
if OLD_ZODB: if OLD_ZODB:
...@@ -26,6 +29,7 @@ if OLD_ZODB: ...@@ -26,6 +29,7 @@ if OLD_ZODB:
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress
from neo.lib.protocol import NodeTypes, Packets, \ from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
from neo.lib.util import makeChecksum, dump from neo.lib.util import makeChecksum, dump
...@@ -50,7 +54,6 @@ if SignalHandler: ...@@ -50,7 +54,6 @@ if SignalHandler:
import signal import signal
SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen) SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
class Application(ThreadedApplication): class Application(ThreadedApplication):
"""The client node application.""" """The client node application."""
...@@ -99,7 +102,7 @@ class Application(ThreadedApplication): ...@@ -99,7 +102,7 @@ class Application(ThreadedApplication):
# _connecting_to_master_node is used to prevent simultaneous master # _connecting_to_master_node is used to prevent simultaneous master
# node connection attempts # node connection attempts
self._connecting_to_master_node = Lock() self._connecting_to_master_node = Lock()
self.compress = compress self.compress = getCompress(compress)
def __getattr__(self, attr): def __getattr__(self, attr):
if attr in ('last_tid', 'pt'): if attr in ('last_tid', 'pt'):
...@@ -215,7 +218,7 @@ class Application(ThreadedApplication): ...@@ -215,7 +218,7 @@ class Application(ThreadedApplication):
node=node, node=node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification( p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, None) NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
try: try:
ask(conn, p, handler=handler) ask(conn, p, handler=handler)
except ConnectionClosed: except ConnectionClosed:
...@@ -388,7 +391,7 @@ class Application(ThreadedApplication): ...@@ -388,7 +391,7 @@ class Application(ThreadedApplication):
logging.error('wrong checksum from %s for oid %s', logging.error('wrong checksum from %s for oid %s',
conn, dump(oid)) conn, dump(oid))
raise NEOStorageReadRetry(False) raise NEOStorageReadRetry(False)
return (decompress(data) if compression else data, return (decompress_list[compression](data),
tid, next_tid, data_tid) tid, next_tid, data_tid)
raise NEOStorageCreationUndoneError(dump(oid)) raise NEOStorageCreationUndoneError(dump(oid))
return self._askStorageForRead(oid, return self._askStorageForRead(oid,
...@@ -435,17 +438,7 @@ class Application(ThreadedApplication): ...@@ -435,17 +438,7 @@ class Application(ThreadedApplication):
checksum = ZERO_HASH checksum = ZERO_HASH
else: else:
assert data_serial is None assert data_serial is None
size = len(data) size, compression, compressed_data = self.compress(data)
if self.compress:
compressed_data = compress(data)
if size < len(compressed_data):
compressed_data = data
compression = 0
else:
compression = 1
else:
compression = 0
compressed_data = data
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
txn_context.data_size += size txn_context.data_size += size
# Store object in tmp cache # Store object in tmp cache
...@@ -554,9 +547,12 @@ class Application(ThreadedApplication): ...@@ -554,9 +547,12 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.get(transaction) txn_context = self._txn_container.get(transaction)
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
ttid = txn_context.ttid ttid = txn_context.ttid
ext = transaction._extension
ext = dumps(ext, _protocol) if ext else ''
# user and description are cast to str in case they're unicode.
# BBB: This is not required anymore with recent ZODB.
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension), str(transaction.description), ext, txn_context.cache_dict)
txn_context.cache_dict)
queue = txn_context.queue queue = txn_context.queue
involved_nodes = txn_context.involved_nodes involved_nodes = txn_context.involved_nodes
# Ask in parallel all involved storage nodes to commit object metadata. # Ask in parallel all involved storage nodes to commit object metadata.
...@@ -786,10 +782,6 @@ class Application(ThreadedApplication): ...@@ -786,10 +782,6 @@ class Application(ThreadedApplication):
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
return None, txn_oid_list return None, txn_oid_list
def _insertMetadata(self, txn_info, extension):
for k, v in loads(extension).items():
txn_info[k] = v
def _getTransactionInformation(self, tid): def _getTransactionInformation(self, tid):
return self._askStorageForRead(tid, return self._askStorageForRead(tid,
Packets.AskTransactionInformation(tid)) Packets.AskTransactionInformation(tid))
...@@ -829,7 +821,8 @@ class Application(ThreadedApplication): ...@@ -829,7 +821,8 @@ class Application(ThreadedApplication):
if filter is None or filter(txn_info): if filter is None or filter(txn_info):
txn_info.pop('packed') txn_info.pop('packed')
txn_info.pop("oids") txn_info.pop("oids")
self._insertMetadata(txn_info, txn_ext) if txn_ext:
txn_info.update(loads(txn_ext))
append(txn_info) append(txn_info)
if len(undo_info) >= last - first: if len(undo_info) >= last - first:
break break
...@@ -857,7 +850,7 @@ class Application(ThreadedApplication): ...@@ -857,7 +850,7 @@ class Application(ThreadedApplication):
tid = None tid = None
for tid in tid_list: for tid in tid_list:
(txn_info, txn_ext) = self._getTransactionInformation(tid) (txn_info, txn_ext) = self._getTransactionInformation(tid)
txn_info['ext'] = loads(txn_ext) txn_info['ext'] = loads(txn_ext) if txn_ext else {}
append(txn_info) append(txn_info)
return (tid, txn_list) return (tid, txn_list)
...@@ -876,23 +869,29 @@ class Application(ThreadedApplication): ...@@ -876,23 +869,29 @@ class Application(ThreadedApplication):
txn_info['size'] = size txn_info['size'] = size
if filter is None or filter(txn_info): if filter is None or filter(txn_info):
result.append(txn_info) result.append(txn_info)
self._insertMetadata(txn_info, txn_ext) if txn_ext:
txn_info.update(loads(txn_ext))
return result return result
def importFrom(self, storage, source, start, stop, preindex=None): def importFrom(self, storage, source):
# TODO: The main difference with BaseStorage implementation is that # TODO: The main difference with BaseStorage implementation is that
# preindex can't be filled with the result 'store' (tid only # preindex can't be filled with the result 'store' (tid only
# known after 'tpc_finish'. This method could be dropped if we # known after 'tpc_finish'. This method could be dropped if we
# implemented IStorageRestoreable (a wrapper around source would # implemented IStorageRestoreable (a wrapper around source would
# still be required for partial import). # still be required for partial import).
if preindex is None:
preindex = {} preindex = {}
for transaction in source.iterator(start, stop): for transaction in source.iterator():
tid = transaction.tid tid = transaction.tid
self.tpc_begin(storage, transaction, tid, transaction.status) self.tpc_begin(storage, transaction, tid, transaction.status)
for r in transaction: for r in transaction:
oid = r.oid oid = r.oid
pre = preindex.get(oid) try:
pre = preindex[oid]
except KeyError:
try:
pre = self.load(oid)[1]
except NEOStorageNotFoundError:
pre = ZERO_TID
self.store(oid, pre, r.data, r.version, transaction) self.store(oid, pre, r.data, r.version, transaction)
preindex[oid] = tid preindex[oid] = tid
conflicted = self.tpc_vote(transaction) conflicted = self.tpc_vote(transaction)
......
...@@ -14,10 +14,14 @@ ...@@ -14,10 +14,14 @@
Give the name of the cluster Give the name of the cluster
</description> </description>
</key> </key>
<key name="compress" datatype="boolean"> <key name="compress" datatype=".compress">
<description> <description>
If true, data is automatically compressed (unless compressed size is The value is either of 'boolean' type or an explicit algorithm that
not smaller). This is the default behaviour. matches the regex 'zlib(=\d+)?', where the optional number is
the compression level.
Any record that is not smaller once compressed is stored uncompressed.
True is the default and its meaning may change over time:
currently, it is the same as 'zlib'.
</description> </description>
</key> </key>
<key name="read-only" datatype="boolean"> <key name="read-only" datatype="boolean">
......
...@@ -23,3 +23,11 @@ class NeoStorage(BaseConfig): ...@@ -23,3 +23,11 @@ class NeoStorage(BaseConfig):
config = self.config config = self.config
return Storage(**{k: getattr(config, k) return Storage(**{k: getattr(config, k)
for k in config.getSectionAttributes()}) for k in config.getSectionAttributes()})
def compress(value):
from ZConfig.datatypes import asBoolean
try:
return asBoolean(value)
except ValueError:
from neo.lib.compress import parseOption
return parseOption(value)
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from zlib import decompress
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
from neo.lib.compress import decompress_list
from neo.lib.protocol import Packets, uuid_str from neo.lib.protocol import Packets, uuid_str
from neo.lib.util import dump, makeChecksum from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady from neo.lib.exception import NodeNotReady
...@@ -129,8 +129,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -129,8 +129,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
'wrong checksum while getting back data for' 'wrong checksum while getting back data for'
' object %s during rebase of transaction %s' ' object %s during rebase of transaction %s'
% (dump(oid), dump(txn_context.ttid))) % (dump(oid), dump(txn_context.ttid)))
if compression: data = decompress_list[compression](data)
data = decompress(data)
size = len(data) size = len(data)
txn_context.data_size += size txn_context.data_size += size
if cached: if cached:
......
...@@ -47,7 +47,7 @@ class ConnectionPool(object): ...@@ -47,7 +47,7 @@ class ConnectionPool(object):
conn = MTClientConnection(app, app.storage_event_handler, node, conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher) dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name, app.id_timestamp) app.uuid, None, app.name, (), app.id_timestamp)
try: try:
app._ask(conn, p, handler=app.storage_bootstrap_handler) app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed: except ConnectionClosed:
......
...@@ -164,3 +164,17 @@ elif IF == 'frames': ...@@ -164,3 +164,17 @@ elif IF == 'frames':
write("Thread %s:\n" % thread_id) write("Thread %s:\n" % thread_id)
traceback.print_stack(frame) traceback.print_stack(frame)
write("End of dump\n") write("End of dump\n")
elif IF == 'profile':
DURATION = 60
def stop(prof, path):
prof.disable()
prof.dump_stats(path)
@defer
def profile(app):
import cProfile, threading, time
from .lib.protocol import uuid_str
path = 'neo-%s-%s.prof' % (uuid_str(app.uuid), time.time())
prof = cProfile.Profile()
threading.Timer(DURATION, stop, (prof, path)).start()
prof.enable()
...@@ -26,13 +26,14 @@ class BootstrapManager(EventHandler): ...@@ -26,13 +26,14 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it Manage the bootstrap stage, lookup for the primary master then connect to it
""" """
def __init__(self, app, node_type, server=None): def __init__(self, app, node_type, server=None, devpath=()):
""" """
Manage the bootstrap stage of a non-master node, it lookup for the Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node primary master node, connect to it then returns when the master node
is ready. is ready.
""" """
self.server = server self.server = server
self.devpath = devpath
self.node_type = node_type self.node_type = node_type
self.num_replicas = None self.num_replicas = None
self.num_partitions = None self.num_partitions = None
...@@ -43,7 +44,7 @@ class BootstrapManager(EventHandler): ...@@ -43,7 +44,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid, conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, None)) self.server, self.app.name, self.devpath, None))
def connectionFailed(self, conn): def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn) EventHandler.connectionFailed(self, conn)
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import zlib
decompress_list = (
lambda data: data,
zlib.decompress,
)
def parseOption(value):
x = value.split('=', 1)
try:
alg = ('zlib',).index(x[0])
if len(x) == 1:
return alg, None
level = int(x[1])
except Exception:
raise ValueError("not a valid 'compress' option: %r" % value)
if 0 < level <= zlib.Z_BEST_COMPRESSION:
return alg, level
raise ValueError("invalid compression level: %r" % level)
def getCompress(value):
if value:
alg, level = (0, None) if value is True else value
_compress = zlib.compress
if level:
zlib_compress = _compress
_compress = lambda data: zlib_compress(data, level)
alg += 1
assert 0 < alg < len(decompress_list), 'invalid compression algorithm'
def compress(data):
size = len(data)
compressed = _compress(data)
if len(compressed) < size:
return size, alg, compressed
return size, 0, data
compress._compress = _compress # for testBasicStore
return compress
return lambda data: (len(data), 0, data)
...@@ -34,6 +34,7 @@ class SocketConnector(object): ...@@ -34,6 +34,7 @@ class SocketConnector(object):
is_closed = is_server = None is_closed = is_server = None
connect_limit = {} connect_limit = {}
CONNECT_LIMIT = 1 # XXX actually this is (RE-)CONNECT_THROTTLE CONNECT_LIMIT = 1 # XXX actually this is (RE-)CONNECT_THROTTLE
SOMAXCONN = 5 # for threaded tests
def __new__(cls, addr, s=None): def __new__(cls, addr, s=None):
if s is None: if s is None:
...@@ -78,6 +79,7 @@ class SocketConnector(object): ...@@ -78,6 +79,7 @@ class SocketConnector(object):
def queue(self, data): def queue(self, data):
was_empty = not self.queued was_empty = not self.queued
self.queued += data self.queued += data
for data in data:
self.queue_size += len(data) self.queue_size += len(data)
return was_empty return was_empty
...@@ -123,7 +125,7 @@ class SocketConnector(object): ...@@ -123,7 +125,7 @@ class SocketConnector(object):
try: try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._bind(self.addr) self._bind(self.addr)
self.socket.listen(5) self.socket.listen(self.SOMAXCONN)
except socket.error, e: except socket.error, e:
self.socket.close() self.socket.close()
self._error('listen', e) self._error('listen', e)
......
...@@ -26,9 +26,6 @@ class PrimaryFailure(NeoException): ...@@ -26,9 +26,6 @@ class PrimaryFailure(NeoException):
class StoppedOperation(NeoException): class StoppedOperation(NeoException):
pass pass
class DatabaseFailure(NeoException):
pass
class NodeNotReady(NeoException): class NodeNotReady(NeoException):
pass pass
...@@ -22,14 +22,13 @@ def check_signature(reference, function): ...@@ -22,14 +22,13 @@ def check_signature(reference, function):
a, b, c, d = inspect.getargspec(function) a, b, c, d = inspect.getargspec(function)
x = len(A) - len(a) x = len(A) - len(a)
if x < 0: # ignore extra default parameters if x < 0: # ignore extra default parameters
if x + len(d) < 0: if B or x + len(d) < 0:
return False return False
del a[x:] del a[x:]
d = d[:x] or None d = d[:x] or None
elif x: # different signature elif x: # different signature
# We have no need yet to support methods with default parameters. return a == A[:-x] and (b or a and c) and (d or ()) == (D or ())[:-x]
return a == A[:-x] and (b or a and c) and not (d or D) return a == A and (b or not B) and (c or not C) and d == D
return a == A and b == B and c == C and d == D
def implements(obj, ignore=()): def implements(obj, ignore=()):
ignore = set(ignore) ignore = set(ignore)
...@@ -55,7 +54,7 @@ def implements(obj, ignore=()): ...@@ -55,7 +54,7 @@ def implements(obj, ignore=()):
while 1: while 1:
name, func = base.pop() name, func = base.pop()
x = getattr(obj, name) x = getattr(obj, name)
if x.im_class is tobj: if type(getattr(x, '__self__', None)) is tobj:
x = x.__func__ x = x.__func__
if x is func: if x is func:
try: try:
......
...@@ -290,3 +290,16 @@ class NEOLogger(Logger): ...@@ -290,3 +290,16 @@ class NEOLogger(Logger):
logging = NEOLogger() logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush()) signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
signal.signal(signal.SIGRTMIN+1, lambda signum, frame: logging.reopen()) signal.signal(signal.SIGRTMIN+1, lambda signum, frame: logging.reopen())
def patch():
def fork():
with logging:
pid = os_fork()
if not pid:
logging._setup()
return pid
os_fork = os.fork
os.fork = fork
patch()
del patch
...@@ -28,6 +28,7 @@ class Node(object): ...@@ -28,6 +28,7 @@ class Node(object):
_connection = None _connection = None
_identified = False _identified = False
devpath = ()
id_timestamp = None id_timestamp = None
def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN): def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN):
......
...@@ -25,6 +25,7 @@ def speedupFileStorageTxnLookup(): ...@@ -25,6 +25,7 @@ def speedupFileStorageTxnLookup():
from array import array from array import array
from bisect import bisect from bisect import bisect
from collections import defaultdict from collections import defaultdict
from neo.lib import logging
from ZODB.FileStorage.FileStorage import FileStorage, FileIterator from ZODB.FileStorage.FileStorage import FileStorage, FileIterator
typecode = 'L' if array('I').itemsize < 4 else 'I' typecode = 'L' if array('I').itemsize < 4 else 'I'
...@@ -44,6 +45,8 @@ def speedupFileStorageTxnLookup(): ...@@ -44,6 +45,8 @@ def speedupFileStorageTxnLookup():
try: try:
index = self._tidindex index = self._tidindex
except AttributeError: except AttributeError:
logging.info("Building index for faster lookup of"
" transactions in the FileStorage DB.")
# Cache a sorted list of all the file pos from oid index. # Cache a sorted list of all the file pos from oid index.
# To reduce memory usage, the list is splitted in arrays of # To reduce memory usage, the list is splitted in arrays of
# low order 32-bit words. # low order 32-bit words.
...@@ -52,10 +55,10 @@ def speedupFileStorageTxnLookup(): ...@@ -52,10 +55,10 @@ def speedupFileStorageTxnLookup():
tindex[x >> 32].append(x & 0xffffffff) tindex[x >> 32].append(x & 0xffffffff)
index = self._tidindex = [] index = self._tidindex = []
for h, l in sorted(tindex.iteritems()): for h, l in sorted(tindex.iteritems()):
x = array('I') l = array(typecode, sorted(l))
x.fromlist(sorted(l)) x = self._read_data_header(h << 32 | l[0])
l = self._read_data_header(h << 32 | x[0]) index.append((x.tid, h, l))
index.append((l.tid, h, x)) logging.info("... index built")
x = bisect(index, (start,)) - 1 x = bisect(index, (start,)) - 1
if x >= 0: if x >= 0:
x, h, index = index[x] x, h, index = index[x]
......
This diff is collapsed.
...@@ -15,12 +15,12 @@ ...@@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket import os, socket
from binascii import a2b_hex, b2a_hex from binascii import a2b_hex, b2a_hex
from datetime import timedelta, datetime from datetime import timedelta, datetime
from hashlib import sha1 from hashlib import sha1
from Queue import deque from Queue import deque
from struct import pack, unpack from struct import pack, unpack, Struct
from time import gmtime from time import gmtime
TID_LOW_OVERFLOW = 2**32 TID_LOW_OVERFLOW = 2**32
...@@ -102,11 +102,10 @@ def addTID(ptid, offset): ...@@ -102,11 +102,10 @@ def addTID(ptid, offset):
higher = (d.year, d.month, d.day, d.hour, d.minute) higher = (d.year, d.month, d.day, d.hour, d.minute)
return packTID(higher, lower) return packTID(higher, lower)
def u64(s): p64, u64 = (lambda unpack: (
return unpack('!Q', s)[0] unpack.__self__.pack,
lambda s: unpack(s)[0]
def p64(n): ))(Struct('!Q').unpack)
return pack('!Q', n)
def add64(packed, offset): def add64(packed, offset):
"""Add a python number to a 64-bits packed value""" """Add a python number to a 64-bits packed value"""
...@@ -226,3 +225,25 @@ class cached_property(object): ...@@ -226,3 +225,25 @@ class cached_property(object):
if obj is None: return self if obj is None: return self
value = obj.__dict__[self.func.__name__] = self.func(obj) value = obj.__dict__[self.func.__name__] = self.func(obj)
return value return value
# This module is always imported before multiprocessing is used, and the
# main process does not want to change name when task are run in threads.
spt_pid = os.getpid()
def setproctitle(title):
global spt_pid
pid = os.getpid()
if spt_pid == pid:
return
spt_pid = pid
# Try using https://pypi.org/project/setproctitle/
try:
# On Linux, this is done by clobbering argv, and the main process
# usually has a longer command line than the title of subprocesses.
os.environ['SPT_NOENV'] = '1'
from setproctitle import setproctitle
except ImportError:
return
finally:
del os.environ['SPT_NOENV']
setproctitle(title)
...@@ -24,7 +24,7 @@ from ..app import monotonic_time ...@@ -24,7 +24,7 @@ from ..app import monotonic_time
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid, def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp): address, name, devpath, id_timestamp):
app = self.app app = self.app
self.checkClusterName(name) self.checkClusterName(name)
if address == app.server: if address == app.server:
...@@ -101,6 +101,8 @@ class IdentificationHandler(EventHandler): ...@@ -101,6 +101,8 @@ class IdentificationHandler(EventHandler):
uuid=uuid, address=address) uuid=uuid, address=address)
else: else:
node.setUUID(uuid) node.setUUID(uuid)
if devpath:
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time() node.id_timestamp = monotonic_time()
node.setState(state) node.setState(state)
conn.setHandler(handler) conn.setHandler(handler)
...@@ -120,7 +122,7 @@ class IdentificationHandler(EventHandler): ...@@ -120,7 +122,7 @@ class IdentificationHandler(EventHandler):
class SecondaryIdentificationHandler(EventHandler): class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid, def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp): address, name, devpath, id_timestamp):
app = self.app app = self.app
self.checkClusterName(name) self.checkClusterName(name)
if address == app.server: if address == app.server:
......
...@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler): ...@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler):
super(ElectionHandler, self).connectionCompleted(conn) super(ElectionHandler, self).connectionCompleted(conn)
app = self.app app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER, conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election)) app.uuid, app.server, app.name, (), app.election))
def connectionFailed(self, conn): def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn) super(ElectionHandler, self).connectionFailed(conn)
......
...@@ -178,7 +178,7 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -178,7 +178,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
def tweak(self, drop_list=()): def tweak(self, drop_list=()):
"""Optimize partition table """Optimize partition table
This reassigns cells in 3 ways: This reassigns cells in 4 ways:
- Discard cells of nodes listed in 'drop_list'. For partitions with too - Discard cells of nodes listed in 'drop_list'. For partitions with too
few readable cells, some cells are instead marked as FEEDING. This is few readable cells, some cells are instead marked as FEEDING. This is
a preliminary step to drop these nodes, otherwise the partition table a preliminary step to drop these nodes, otherwise the partition table
...@@ -187,6 +187,8 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -187,6 +187,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
- When a transaction creates new objects (oids are roughly allocated - When a transaction creates new objects (oids are roughly allocated
sequentially), we expect better performance by maximizing the number sequentially), we expect better performance by maximizing the number
of involved nodes (i.e. parallelizing writes). of involved nodes (i.e. parallelizing writes).
- For maximum resiliency, cells of each partition are assigned as far
as possible from each other, by checking the topology path of nodes.
Examples of optimal partition tables with np=10, nr=1 and 5 nodes: Examples of optimal partition tables with np=10, nr=1 and 5 nodes:
...@@ -215,6 +217,17 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -215,6 +217,17 @@ class PartitionTable(neo.lib.pt.PartitionTable):
U. .U U. U. .U U.
.U U. U. .U U. U.
U. U. .U U. U. .U
For the topology, let's consider an example with paths of the form
(room, machine, disk):
- if there are more rooms than the number of replicas, 2 cells of the
same partition must not be assigned in the same room;
- otherwise, topology paths are checked at a deeper depth,
e.g. not on the same machine and distributed evenly
(off by 1) among rooms.
But the topology is expected to be optimal, otherwise it is ignored.
In some cases, we could fall back to a non-optimal topology but
that would cause extra replication if the user wants to fix it.
""" """
# Collect some data in a usable form for the rest of the method. # Collect some data in a usable form for the rest of the method.
node_list = {node: {} for node in self.count_dict node_list = {node: {} for node in self.count_dict
...@@ -242,6 +255,67 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -242,6 +255,67 @@ class PartitionTable(neo.lib.pt.PartitionTable):
i += 1 i += 1
option_dict = Counter(map(tuple, x)) option_dict = Counter(map(tuple, x))
# Initialize variables/functions to optimize the topology.
devpath_max = []
devpaths = [()] * node_count
if repeats > 1:
_devpaths = [x[0].devpath for x in node_list]
max_depth = min(map(len, _devpaths))
depth = 0
while 1:
if depth < max_depth:
depth += 1
x = Counter(x[:depth] for x in _devpaths)
n = len(x)
x = set(x.itervalues())
# TODO: Prove it works. If the code turns out to be:
# - too pessimistic, the topology is ignored when
# resiliency could be maximized;
# - or worse too optimistic, in which case this
# method raises, possibly after a very long time.
if len(x) == 1 or max(x) * repeats <= node_count:
i, x = divmod(repeats, n)
devpath_max.append((i + 1, x) if x else (i, n))
if n < repeats:
continue
devpaths = [x[:depth] for x in _devpaths]
break
logging.warning("Can't maximize resiliency: fix the topology"
" of your storage nodes and make sure they're all running."
" %s storage device failure(s) may be enough to lose all"
" the database." % (repeats - 1))
break
topology = [{} for _ in xrange(self.np)]
def update_topology():
for offset in option:
n = topology[offset]
for i, (j, k) in zip(devpath, devpath_max):
try:
i, x = n[i]
except KeyError:
n[i] = i, x = [0, {}]
if i == j or i + 1 == j and k == sum(
1 for i in n.itervalues() if i[0] == j):
# Too many cells would be assigned at this topology
# node.
return False
n = x
# The topology may be optimal with this option. Apply it.
for offset in option:
n = topology[offset]
for i in devpath:
n = n[i]
n[0] += 1
n = n[1]
return True
def revert_topology():
for offset in option:
n = topology[offset]
for i in devpath:
n = n[i]
n[0] -= 1
n = n[1]
# Strategies to find the "best" permutation of nodes. # Strategies to find the "best" permutation of nodes.
def node_options(): def node_options():
# The second part of the key goes with the above cosmetic sort. # The second part of the key goes with the above cosmetic sort.
...@@ -291,24 +365,27 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -291,24 +365,27 @@ class PartitionTable(neo.lib.pt.PartitionTable):
new = [] # the solution new = [] # the solution
stack = [] # data recursion stack = [] # data recursion
def options(): def options():
return iter(node_options[len(new)][-1]) x = node_options[len(new)]
return devpaths[x[-2]], iter(x[-1])
for node_options in node_options(): # for each strategy for node_options in node_options(): # for each strategy
iter_option = options() devpath, iter_option = options()
while 1: while 1:
try: try:
option = next(iter_option) option = next(iter_option)
except StopIteration: # 1st strategy only except StopIteration:
if new: if new:
iter_option = stack.pop() devpath, iter_option = stack.pop()
option_dict[new.pop()] += 1 option = new.pop()
revert_topology()
option_dict[option] += 1
continue continue
break break
if option_dict[option]: if option_dict[option] and update_topology():
new.append(option) new.append(option)
if len(new) == len(node_list): if len(new) == node_count:
break break
stack.append(iter_option) stack.append((devpath, iter_option))
iter_option = options() devpath, iter_option = options()
option_dict[option] -= 1 option_dict[option] -= 1
if new: if new:
break break
...@@ -384,13 +461,18 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -384,13 +461,18 @@ class PartitionTable(neo.lib.pt.PartitionTable):
if cell.isReadable(): if cell.isReadable():
if cell.getNode().isRunning(): if cell.getNode().isRunning():
lost = None lost = None
else : else:
cell_list.append(cell) cell_list.append(cell)
for cell in cell_list: for cell in cell_list:
if cell.getNode() is not lost: node = cell.getNode()
cell.setState(CellStates.OUT_OF_DATE) if node is not lost:
change_list.append((offset, cell.getUUID(), if cell.isFeeding():
CellStates.OUT_OF_DATE)) self.removeCell(offset, node)
state = CellStates.DISCARDED
else:
state = CellStates.OUT_OF_DATE
cell.setState(state)
change_list.append((offset, node.getUUID(), state))
if fully_readable and change_list: if fully_readable and change_list:
logging.warning(self._first_outdated_message) logging.warning(self._first_outdated_message)
return change_list return change_list
......
...@@ -65,6 +65,7 @@ UNIT_TEST_MODULES = [ ...@@ -65,6 +65,7 @@ UNIT_TEST_MODULES = [
'neo.tests.client.testZODBURI', 'neo.tests.client.testZODBURI',
# light functional tests # light functional tests
'neo.tests.threaded.test', 'neo.tests.threaded.test',
'neo.tests.threaded.testConfig',
'neo.tests.threaded.testImporter', 'neo.tests.threaded.testImporter',
'neo.tests.threaded.testReplication', 'neo.tests.threaded.testReplication',
'neo.tests.threaded.testSSL', 'neo.tests.threaded.testSSL',
......
...@@ -71,6 +71,7 @@ class Application(BaseApplication): ...@@ -71,6 +71,7 @@ class Application(BaseApplication):
self.dm.setup(reset=config.getReset(), dedup=config.getDedup()) self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.loadConfiguration() self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only # force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None: if config.getUUID() is not None:
...@@ -203,7 +204,8 @@ class Application(BaseApplication): ...@@ -203,7 +204,8 @@ class Application(BaseApplication):
pt = self.pt pt = self.pt
# search, find, connect and identify to the primary master # search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server) bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \ self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection() bootstrap.getPrimaryConnection()
uuid = self.uuid uuid = self.uuid
......
...@@ -51,7 +51,7 @@ class Checker(object): ...@@ -51,7 +51,7 @@ class Checker(object):
else: else:
conn = ClientConnection(app, StorageOperationHandler(app), node) conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE, conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, app.id_timestamp)) uuid, app.server, name, (), app.id_timestamp))
self.conn_dict[conn] = node.isIdentified() self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict) conn_set = set(self.conn_dict)
conn_set.discard(None) conn_set.discard(None)
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
LOG_QUERIES = False LOG_QUERIES = False
from neo.lib.exception import DatabaseFailure
DATABASE_MANAGER_DICT = { DATABASE_MANAGER_DICT = {
'Importer': 'importer.ImporterDatabaseManager', 'Importer': 'importer.ImporterDatabaseManager',
'MySQL': 'mysqldb.MySQLDatabaseManager', 'MySQL': 'mysqldb.MySQLDatabaseManager',
...@@ -33,3 +31,6 @@ def getAdapterKlass(name): ...@@ -33,3 +31,6 @@ def getAdapterKlass(name):
def buildDatabaseManager(name, args=(), kw={}): def buildDatabaseManager(name, args=(), kw={}):
return getAdapterKlass(name)(*args, **kw) return getAdapterKlass(name)(*args, **kw)
class DatabaseFailure(Exception):
pass
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -42,11 +42,11 @@ class ClientOperationHandler(BaseHandler): ...@@ -42,11 +42,11 @@ class ClientOperationHandler(BaseHandler):
# for read rpc # for read rpc
return self.app.tm.read_queue return self.app.tm.read_queue
def askObject(self, conn, oid, serial, tid): def askObject(self, conn, oid, at, before):
app = self.app app = self.app
if app.tm.loadLocked(oid): if app.tm.loadLocked(oid):
raise DelayEvent raise DelayEvent
o = app.dm.getObject(oid, serial, tid) o = app.dm.getObject(oid, at, before)
try: try:
serial, next_serial, compression, checksum, data, data_serial = o serial, next_serial, compression, checksum, data, data_serial = o
except TypeError: except TypeError:
......
...@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler): ...@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name, def requestIdentification(self, conn, node_type, uuid, address, name,
id_timestamp): devpath, id_timestamp):
self.checkClusterName(name) self.checkClusterName(name)
app = self.app app = self.app
# reject any incoming connections if not ready # reject any incoming connections if not ready
......
This diff is collapsed.
...@@ -26,10 +26,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -26,10 +26,7 @@ class MasterOperationHandler(BaseMasterHandler):
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
# XXX: see comment in protocol # XXX: see comment in protocol
assert self.app.operational and backup assert self.app.operational and backup
dm = self.app.dm self.app.replicator.startOperation(backup)
if not dm.getBackupTID():
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit()
def askLockInformation(self, conn, ttid, tid): def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid) self.app.tm.lock(ttid, tid)
......
...@@ -75,9 +75,6 @@ class StorageOperationHandler(EventHandler): ...@@ -75,9 +75,6 @@ class StorageOperationHandler(EventHandler):
deleteTransaction(tid) deleteTransaction(tid)
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
self.app.replicator.fetchTransactions(next_tid) self.app.replicator.fetchTransactions(next_tid)
else: else:
self.app.replicator.fetchObjects() self.app.replicator.fetchObjects()
...@@ -97,15 +94,12 @@ class StorageOperationHandler(EventHandler): ...@@ -97,15 +94,12 @@ class StorageOperationHandler(EventHandler):
for serial, oid_list in object_dict.iteritems(): for serial, oid_list in object_dict.iteritems():
for oid in oid_list: for oid in oid_list:
deleteObject(oid, serial) deleteObject(oid, serial)
# XXX: It should be possible not to commit here if it was the last
# chunk, because we'll either commit again when updating
# 'backup_tid' or the partition table.
self.app.dm.commit()
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
# TODO also provide feedback to master about current replication state (tid) # TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid) self.app.replicator.fetchObjects(next_tid, next_oid)
else: else:
# This will also commit.
self.app.replicator.finish() self.app.replicator.finish()
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
...@@ -267,6 +261,8 @@ class StorageOperationHandler(EventHandler): ...@@ -267,6 +261,8 @@ class StorageOperationHandler(EventHandler):
"partition %u dropped or truncated" "partition %u dropped or truncated"
% partition), msg_id) % partition), msg_id)
return return
if not object[2]: # creation undone
object = object[0], 0, ZERO_HASH, '', object[4]
# Same as in askFetchTransactions. # Same as in askFetchTransactions.
conn.send(Packets.AddObject(oid, *object), msg_id) conn.send(Packets.AddObject(oid, *object), msg_id)
yield conn.buffering yield conn.buffering
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -202,9 +202,9 @@ class ClientTests(NEOFunctionalTest): ...@@ -202,9 +202,9 @@ class ClientTests(NEOFunctionalTest):
self.neo.stop() self.neo.stop()
self.neo = NEOCluster(db_list=['test_neo1'], partitions=3, self.neo = NEOCluster(db_list=['test_neo1'], partitions=3,
importer=[("root", { importer={"zodb": [("root", {
"storage": "<filestorage>\npath %s\n</filestorage>" "storage": "<filestorage>\npath %s\n</filestorage>"
% dfs_storage.getName()})], % dfs_storage.getName()})]},
temp_dir=self.getTempDirectory()) temp_dir=self.getTempDirectory())
self.neo.start() self.neo.start()
neo_db, neo_conn = self.neo.getZODBConnection() neo_db, neo_conn = self.neo.getZODBConnection()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment