Commit ca75709f authored by Julien Muchembled's avatar Julien Muchembled

storage: disable data deduplication by default

parent 03b5b47e
...@@ -147,3 +147,6 @@ class ConfigurationManager(object): ...@@ -147,3 +147,6 @@ class ConfigurationManager(object):
n = self.__get('autostart', True) n = self.__get('autostart', True)
if n: if n:
return int(n) return int(n)
def getDedup(self):
return self.__get('dedup', True)
...@@ -30,6 +30,9 @@ parser.add_option('-d', '--database', help = 'database connections string') ...@@ -30,6 +30,9 @@ parser.add_option('-d', '--database', help = 'database connections string')
parser.add_option('-e', '--engine', help = 'database engine') parser.add_option('-e', '--engine', help = 'database engine')
parser.add_option('-w', '--wait', help='seconds to wait for backend to be ' parser.add_option('-w', '--wait', help='seconds to wait for backend to be '
'available, before erroring-out (-1 = infinite)', type='float', default=0) 'available, before erroring-out (-1 = infinite)', type='float', default=0)
parser.add_option('--dedup', action='store_true',
help = 'enable deduplication of data'
' when setting up a new storage node')
parser.add_option('--disable-drop-partitions', action='store_true', parser.add_option('--disable-drop-partitions', action='store_true',
help = 'do not delete data of discarded cells, which is' help = 'do not delete data of discarded cells, which is'
' useful for big databases because the current' ' useful for big databases because the current'
......
...@@ -69,7 +69,7 @@ class Application(BaseApplication): ...@@ -69,7 +69,7 @@ class Application(BaseApplication):
# operation related data # operation related data
self.operational = False self.operational = False
self.dm.setup(reset=config.getReset()) self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.loadConfiguration() self.loadConfiguration()
# force node uuid from command line argument, for testing purpose only # force node uuid from command line argument, for testing purpose only
......
...@@ -322,8 +322,8 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -322,8 +322,8 @@ class ImporterDatabaseManager(DatabaseManager):
for zodb in self.zodb: for zodb in self.zodb:
zodb.close() zodb.close()
def setup(self, reset=0): def setup(self, reset=False, dedup=False):
self.db.setup(reset) self.db.setup(reset, dedup)
zodb_state = self.getConfiguration("zodb") zodb_state = self.getConfiguration("zodb")
if zodb_state: if zodb_state:
logging.warning("Ignoring configuration file for oid mapping." logging.warning("Ignoring configuration file for oid mapping."
......
...@@ -131,7 +131,7 @@ class DatabaseManager(object): ...@@ -131,7 +131,7 @@ class DatabaseManager(object):
def erase(self): def erase(self):
"""""" """"""
def _setup(self): def _setup(self, dedup=False):
"""To be overridden by the backend to set up a database """To be overridden by the backend to set up a database
It must recover self._uncommitted_data from temporary object table. It must recover self._uncommitted_data from temporary object table.
...@@ -142,14 +142,14 @@ class DatabaseManager(object): ...@@ -142,14 +142,14 @@ class DatabaseManager(object):
""" """
@requires(_setup) @requires(_setup)
def setup(self, reset=0): def setup(self, reset=False, dedup=False):
"""Set up a database, discarding existing data first if reset is True """Set up a database, discarding existing data first if reset is True
""" """
if reset: if reset:
self.erase() self.erase()
self._readable_set = set() self._readable_set = set()
self._uncommitted_data = defaultdict(int) self._uncommitted_data = defaultdict(int)
self._setup() self._setup(dedup)
@abstract @abstract
def nonempty(self, table): def nonempty(self, table):
......
...@@ -172,7 +172,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -172,7 +172,7 @@ class MySQLDatabaseManager(DatabaseManager):
if e.args[0] != NO_SUCH_TABLE: if e.args[0] != NO_SUCH_TABLE:
raise raise
def _setup(self): def _setup(self, dedup=False):
self._config.clear() self._config.clear()
q = self.query q = self.query
p = engine = self._engine p = engine = self._engine
...@@ -240,9 +240,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -240,9 +240,9 @@ class MySQLDatabaseManager(DatabaseManager):
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
hash BINARY(20) NOT NULL, hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL, compression TINYINT UNSIGNED NULL,
value MEDIUMBLOB NOT NULL, value MEDIUMBLOB NOT NULL%s
UNIQUE (hash, compression) ) ENGINE=%s""" % (""",
) ENGINE=""" + engine) UNIQUE (hash, compression)""" if dedup else "", engine))
q("""CREATE TABLE IF NOT EXISTS bigdata ( q("""CREATE TABLE IF NOT EXISTS bigdata (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
......
...@@ -112,7 +112,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -112,7 +112,7 @@ class SQLiteDatabaseManager(DatabaseManager):
if not e.args[0].startswith("no such table:"): if not e.args[0].startswith("no such table:"):
raise raise
def _setup(self): def _setup(self, dedup=False):
# SQLite does support transactional Data Definition Language statements # SQLite does support transactional Data Definition Language statements
# but unfortunately, the built-in Python binding automatically commits # but unfortunately, the built-in Python binding automatically commits
# between such statements. This anti-feature causes this method to be # between such statements. This anti-feature causes this method to be
...@@ -179,9 +179,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -179,9 +179,10 @@ class SQLiteDatabaseManager(DatabaseManager):
compression INTEGER NOT NULL, compression INTEGER NOT NULL,
value BLOB NOT NULL) value BLOB NOT NULL)
""") """)
q("""CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON if dedup:
data(hash, compression) q("""CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON
""") data(hash, compression)
""")
# The table "ttrans" stores information on uncommitted transactions. # The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans ( q("""CREATE TABLE IF NOT EXISTS ttrans (
......
...@@ -39,7 +39,7 @@ class StorageMySQLdbTests(StorageDBTests): ...@@ -39,7 +39,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.assertEqual(db.db, DB_PREFIX + '0') self.assertEqual(db.db, DB_PREFIX + '0')
self.assertEqual(db.user, DB_USER) self.assertEqual(db.user, DB_USER)
try: try:
db.setup(reset) db.setup(reset, True)
except NotSupportedError as m: except NotSupportedError as m:
code, m = m.args code, m = m.args
if code != UNKNOWN_STORAGE_ENGINE: if code != UNKNOWN_STORAGE_ENGINE:
......
...@@ -27,7 +27,7 @@ class StorageSQLiteTests(StorageDBTests): ...@@ -27,7 +27,7 @@ class StorageSQLiteTests(StorageDBTests):
def getDB(self, reset=0): def getDB(self, reset=0):
db = SQLiteDatabaseManager(':memory:') db = SQLiteDatabaseManager(':memory:')
db.setup(reset) db.setup(reset, True)
return db return db
def test_lockDatabase(self): def test_lockDatabase(self):
......
...@@ -652,7 +652,7 @@ class NEOCluster(object): ...@@ -652,7 +652,7 @@ class NEOCluster(object):
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True, storage_count=None, db_list=None, clear_databases=True,
db_user=DB_USER, db_password='', compress=True, db_user=DB_USER, db_password='', compress=True,
importer=None, autostart=None): importer=None, autostart=None, dedup=False):
self.name = 'neo_%s' % self._allocate('name', self.name = 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100)) lambda: random.randint(0, 100))
self.compress = compress self.compress = compress
...@@ -661,7 +661,7 @@ class NEOCluster(object): ...@@ -661,7 +661,7 @@ class NEOCluster(object):
for _ in xrange(master_count)] for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list) self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
kw = Node.convertInitArgs(replicas=replicas, adapter=adapter, kw = Node.convertInitArgs(replicas=replicas, adapter=adapter,
partitions=partitions, reset=clear_databases) partitions=partitions, reset=clear_databases, dedup=dedup)
kw['cluster'] = weak_self = weakref.proxy(self) kw['cluster'] = weak_self = weakref.proxy(self)
kw['getSSL'] = self.SSL kw['getSSL'] = self.SSL
if upstream is not None: if upstream is not None:
......
...@@ -37,8 +37,8 @@ from neo.lib import logging ...@@ -37,8 +37,8 @@ from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes, from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID) Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID)
from .. import expectedFailure, unpickle_state, Patch, TransactionalResource from .. import expectedFailure, unpickle_state, Patch, TransactionalResource
from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \
RandomConflictDict, ThreadId, with_cluster NEOThreadedTest, RandomConflictDict, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.transactions import Transaction from neo.client.transactions import Transaction
...@@ -198,9 +198,9 @@ class Test(NEOThreadedTest): ...@@ -198,9 +198,9 @@ class Test(NEOThreadedTest):
def testUndoConflictDuringStore(self, cluster): def testUndoConflictDuringStore(self, cluster):
self._testUndoConflict(cluster, 1) self._testUndoConflict(cluster, 1)
@with_cluster() def testStorageDataLock(self, dedup=False):
def testStorageDataLock(self, cluster): with NEOCluster(dedup=dedup) as cluster:
if 1: cluster.start()
storage = cluster.getZODBStorage() storage = cluster.getZODBStorage()
data_info = {} data_info = {}
...@@ -212,8 +212,6 @@ class Test(NEOThreadedTest): ...@@ -212,8 +212,6 @@ class Test(NEOThreadedTest):
r1 = storage.store(oid, None, data, '', txn) r1 = storage.store(oid, None, data, '', txn)
r2 = storage.tpc_vote(txn) r2 = storage.tpc_vote(txn)
tid = storage.tpc_finish(txn) tid = storage.tpc_finish(txn)
data_info[key] = 0
storage.sync()
txn = [transaction.Transaction() for x in xrange(4)] txn = [transaction.Transaction() for x in xrange(4)]
for t in txn: for t in txn:
...@@ -221,20 +219,20 @@ class Test(NEOThreadedTest): ...@@ -221,20 +219,20 @@ class Test(NEOThreadedTest):
storage.store(oid if tid else storage.new_oid(), storage.store(oid if tid else storage.new_oid(),
tid, data, '', t) tid, data, '', t)
tid = None tid = None
data_info[key] = 4 data_info[key] = 4 if dedup else 1
storage.sync() self.tic()
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
storage.tpc_abort(txn.pop()) storage.tpc_abort(txn.pop())
for t in txn: for t in txn:
storage.tpc_vote(t) storage.tpc_vote(t)
storage.sync() self.tic()
data_info[key] -= 1 data_info[key] -= dedup
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
storage.tpc_abort(txn[1]) storage.tpc_abort(txn[1])
storage.sync() self.tic()
data_info[key] -= 1 data_info[key] -= dedup
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
tid1 = storage.tpc_finish(txn[2]) tid1 = storage.tpc_finish(txn[2])
...@@ -243,10 +241,13 @@ class Test(NEOThreadedTest): ...@@ -243,10 +241,13 @@ class Test(NEOThreadedTest):
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
storage.tpc_abort(txn[0]) storage.tpc_abort(txn[0])
storage.sync() self.tic()
data_info[key] -= 1 data_info[key] -= dedup
self.assertEqual(data_info, cluster.storage.getDataLockInfo()) self.assertEqual(data_info, cluster.storage.getDataLockInfo())
def testStorageDataLockWithDeduplication(self, dedup=False):
self.testStorageDataLock(True)
@with_cluster() @with_cluster()
def testStorageDataLock2(self, cluster): def testStorageDataLock2(self, cluster):
storage = cluster.getZODBStorage() storage = cluster.getZODBStorage()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment