Commit 28e097c8 authored by Julien Muchembled's avatar Julien Muchembled

qa: when comparing replicas, checksum metadata & data rather than only keys

parent 60bcbc5c
......@@ -858,8 +858,9 @@ class MySQLDatabaseManager(DatabaseManager):
r = self.query('SELECT tid, oid FROM obj FORCE INDEX(tid)'
' WHERE `partition` = %d AND tid <= %d'
' AND (tid = %d AND %d <= oid OR %d < tid)'
' ORDER BY tid ASC, oid ASC LIMIT %d' % (
partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length))
' ORDER BY tid ASC, oid ASC%s' % (
partition, u64(max_tid), min_tid, u64(min_oid), min_tid,
'' if length is None else ' LIMIT %s' % length))
return [(p64(serial), p64(oid)) for serial, oid in r]
def _getTIDList(self, offset, length, partition_list):
......@@ -871,17 +872,11 @@ class MySQLDatabaseManager(DatabaseManager):
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
max_tid = u64(max_tid)
r = self.query("""SELECT tid FROM trans
WHERE `partition` = %(partition)d
AND tid >= %(min_tid)d AND tid <= %(max_tid)d
ORDER BY tid ASC LIMIT %(length)d""" % {
'partition': partition,
'min_tid': min_tid,
'max_tid': max_tid,
'length': length,
})
r = self.query("SELECT tid FROM trans"
" WHERE `partition` = %s AND tid >= %s AND tid <= %s"
" ORDER BY tid ASC%s" % (
partition, u64(min_tid), u64(max_tid),
'' if length is None else ' LIMIT %s' % length))
return [p64(t[0]) for t in r]
def _updatePackFuture(self, oid, orig_serial, max_serial):
......
......@@ -593,6 +593,8 @@ class SQLiteDatabaseManager(DatabaseManager):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
if length is None:
length = -1
return [(p64(serial), p64(oid)) for serial, oid in self.query("""\
SELECT tid, oid FROM obj
WHERE partition=? AND tid<=?
......@@ -611,6 +613,8 @@ class SQLiteDatabaseManager(DatabaseManager):
p64 = util.p64
min_tid = u64(min_tid)
max_tid = u64(max_tid)
if length is None:
length = -1
return [p64(t[0]) for t in self.query("""\
SELECT tid FROM trans
WHERE partition=? AND ?<=tid AND tid<=?
......
......@@ -16,10 +16,11 @@
# XXX: Consider using ClusterStates.STOPPING to stop clusters
import os, random, select, socket, sys, tempfile
import hashlib, os, random, select, socket, sys, tempfile
import thread, threading, time, traceback, weakref
from collections import deque
from contextlib import contextmanager
from cPickle import dumps
from email import message_from_string
from itertools import count
from functools import partial, wraps
......@@ -35,7 +36,7 @@ from neo.lib.connection import BaseConnection, \
from neo.lib.connector import SocketConnector, ConnectorException
from neo.lib.handler import EventHandler
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import uuid_str, \
from neo.lib.protocol import ZERO_OID, ZERO_TID, MAX_TID, uuid_str, \
ClusterStates, Enum, NodeStates, NodeTypes, Packets
from neo.lib.util import cached_property, parseMasterList, p64
from neo.master.recovery import RecoveryManager
......@@ -482,6 +483,17 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
(r,), = self.dm.query("SELECT COUNT(*) FROM " + table)
return r
def checksumPartition(self, partition, max_tid=MAX_TID):
dm = self.dm
args = ZERO_TID, max_tid, None, partition
trans = hashlib.md5()
for tid in dm.getReplicationTIDList(*args):
trans.update(dumps(dm.getTransaction(tid)))
obj = hashlib.md5()
for tid, oid in dm.getReplicationObjectList(*args, min_oid=ZERO_OID):
obj.update(dumps(dm.fetchObject(oid, tid)))
return trans.hexdigest(), obj.hexdigest()
class ClientApplication(Node, neo.client.app.Application):
max_reconnection_to_master = 10
......@@ -1186,6 +1198,21 @@ class NEOThreadedTest(NeoTestBase):
assertStartsWith(msg.pop(0), ' %s' % x)
self.assertFalse(expected)
def checkPartitionReplicated(self, source, destination, partition, **kw):
self.assertEqual(source.checksumPartition(partition, **kw),
destination.checksumPartition(partition, **kw))
def checkReplicas(self, cluster):
pt = cluster.primary_master.pt
storage_dict = {x.uuid: x for x in cluster.storage_list}
for offset in xrange(pt.getPartitions()):
checksum_list = [
storage_dict[x.getUUID()].checksumPartition(offset)
for x in pt.getCellList(offset)]
self.assertLess(1, len(checksum_list))
self.assertEqual(1, len(set(checksum_list)),
(offset, checksum_list))
class ThreadId(list):
......
......@@ -60,16 +60,6 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
class ReplicationTests(NEOThreadedTest):
def checksumPartition(self, storage, partition, max_tid=MAX_TID):
dm = storage.dm
args = partition, None, ZERO_TID, max_tid
return dm.checkTIDRange(*args), \
dm.checkSerialRange(min_oid=ZERO_OID, *args)
def checkPartitionReplicated(self, source, destination, partition, **kw):
self.assertEqual(self.checksumPartition(source, partition, **kw),
self.checksumPartition(destination, partition, **kw))
def checkBackup(self, cluster, **kw):
upstream_pt = cluster.upstream.primary_master.pt
pt = cluster.primary_master.pt
......@@ -89,17 +79,6 @@ class ReplicationTests(NEOThreadedTest):
checked += 1
return checked
def checkReplicas(self, cluster):
pt = cluster.primary_master.pt
storage_dict = {x.uuid: x for x in cluster.storage_list}
for offset in xrange(pt.getPartitions()):
checksum_list = [
self.checksumPartition(storage_dict[x.getUUID()], offset)
for x in pt.getCellList(offset)]
self.assertLess(1, len(checksum_list))
self.assertEqual(1, len(set(checksum_list)),
(offset, checksum_list))
def testBackupNormalCase(self):
np = 7
nr = 2
......
......@@ -21,7 +21,11 @@ functional = int(os.getenv('NEO_TEST_ZODB_FUNCTIONAL', 0))
if functional:
from ..functional import NEOCluster, NEOFunctionalTest as TestCase
else:
from ..threaded import NEOCluster, NEOThreadedTest as TestCase
from ..threaded import NEOCluster, NEOThreadedTest
x = dict.fromkeys(x for x in dir(NEOThreadedTest) if x.startswith('check'))
assert x
TestCase = type('', (NEOThreadedTest,), x)
del x
class ZODBTestCase(TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment