Commit 425f7194 authored by Romain Courteaud's avatar Romain Courteaud

ReplicateStorage: use a property as hash

Speed up document replication by using a document property to check if there was a change.
As allDocs can return this property directly, it reduces the required calculation.
parent 40ef31b0
......@@ -28,6 +28,15 @@
CONFLICT_KEEP_REMOTE = 2,
CONFLICT_CONTINUE = 3;
function SkipError(message) {
if ((message !== undefined) && (typeof message !== "string")) {
throw new TypeError('You must pass a string.');
}
this.message = message || "Skip some asynchronous code";
}
SkipError.prototype = new Error();
SkipError.prototype.constructor = SkipError;
/****************************************************
Use a local jIO to read/write/search documents
Synchronize in background those document with a remote jIO.
......@@ -46,6 +55,10 @@
function ReplicateStorage(spec) {
this._query_options = spec.query || {};
if (spec.signature_select_metadata !== undefined) {
this._query_options.select_list = [spec.signature_select_metadata];
}
this._signature_select_metadata = spec.signature_select_metadata;
this._local_sub_storage = jIO.createJIO(spec.local_sub_storage);
this._remote_sub_storage = jIO.createJIO(spec.remote_sub_storage);
......@@ -239,6 +252,26 @@
return result_promise_list[0];
}
function callAllDocsOnStorage(context, storage, cache, cache_key) {
return new RSVP.Queue()
.push(function () {
if (!cache.hasOwnProperty(cache_key)) {
return storage.allDocs(context._query_options)
.push(function (result) {
var i,
cache_entry = {};
for (i = 0; i < result.data.total_rows; i += 1) {
cache_entry[result.data.rows[i].id] = result.data.rows[i].value;
}
cache[cache_key] = cache_entry;
});
}
})
.push(function () {
return cache[cache_key];
});
}
function propagateAttachmentDeletion(context, skip_attachment_dict,
destination,
id, name) {
......@@ -602,14 +635,32 @@
function propagateModification(context, source, destination, doc, hash, id,
skip_document_dict,
options) {
var result,
var result = new RSVP.Queue(),
post_id,
to_skip = true;
if (options === undefined) {
options = {};
}
if (doc === null) {
result
.push(function () {
return source.get(id);
})
.push(function (source_doc) {
doc = source_doc;
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
throw new SkipError(id);
}
throw error;
});
}
if (options.use_post) {
result = destination.post(doc)
result
.push(function () {
return destination.post(doc);
})
.push(function (new_id) {
to_skip = false;
post_id = new_id;
......@@ -658,7 +709,10 @@
skip_document_dict[post_id] = null;
});
} else {
result = destination.put(id, doc)
result
.push(function () {
return destination.put(id, doc);
})
.push(function () {
return context._signature_sub_storage.put(id, {
"hash": hash
......@@ -670,6 +724,12 @@
if (to_skip) {
skip_document_dict[id] = null;
}
})
.push(undefined, function (error) {
if (error instanceof SkipError) {
return;
}
throw error;
});
}
......@@ -702,25 +762,39 @@
}
function checkAndPropagate(context, skip_document_dict,
cache, destination_key,
status_hash, local_hash, doc,
source, destination, id,
conflict_force, conflict_revert,
conflict_ignore,
options) {
return destination.get(id)
.push(function (remote_doc) {
return [remote_doc, generateHash(stringify(remote_doc))];
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return [null, null];
return new RSVP.Queue()
.push(function () {
if (options.signature_select_metadata !== undefined) {
return callAllDocsOnStorage(context, destination,
cache, destination_key)
.push(function (result) {
if (result.hasOwnProperty(id)) {
return [null, result[id][options.signature_select_metadata]];
}
return [null, null];
});
}
throw error;
return destination.get(id)
.push(function (remote_doc) {
return [remote_doc, generateHash(stringify(remote_doc))];
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return [null, null];
}
throw error;
});
})
.push(function (remote_list) {
var remote_doc = remote_list[0],
remote_hash = remote_list[1];
if (local_hash === remote_hash) {
// Same modifications on both side
if (local_hash === null) {
......@@ -783,14 +857,17 @@
local_hash, id, skip_document_dict,
{use_post: options.use_post});
}
doc = doc || local_hash;
remote_doc = remote_doc || remote_hash;
throw new jIO.util.jIOError("Conflict on '" + id + "': " +
stringify(doc || '') + " !== " +
stringify(remote_doc || ''),
stringify(doc) + " !== " +
stringify(remote_doc),
409);
});
}
function checkLocalDeletion(queue, context, skip_document_dict,
cache, destination_key,
destination, id, source,
conflict_force, conflict_revert,
conflict_ignore, options) {
......@@ -802,6 +879,7 @@
.push(function (result) {
status_hash = result.hash;
return checkAndPropagate(context, skip_document_dict,
cache, destination_key,
status_hash, null, null,
source, destination, id,
conflict_force, conflict_revert,
......@@ -811,20 +889,29 @@
}
function checkSignatureDifference(queue, context, skip_document_dict,
cache, destination_key,
source, destination, id,
conflict_force, conflict_revert,
conflict_ignore,
status_hash,
local_hash, status_hash,
options) {
queue
.push(function () {
return source.get(id);
if (local_hash === null) {
// Hash was not provided by the allDocs query
return source.get(id);
}
return null;
})
.push(function (doc) {
var local_hash = generateHash(stringify(doc));
if (local_hash === null) {
// Hash was not provided by the allDocs query
local_hash = generateHash(stringify(doc));
}
if (local_hash !== status_hash) {
return checkAndPropagate(context, skip_document_dict,
cache, destination_key,
status_hash, local_hash, doc,
source, destination, id,
conflict_force, conflict_revert,
......@@ -835,6 +922,7 @@
}
function pushStorage(context, skip_document_dict,
cache, source_key, destination_key,
source, destination, signature_allDocs, options) {
var argument_list = [],
argument_list_deletion = [];
......@@ -844,7 +932,7 @@
if (!options.hasOwnProperty("use_revert_post")) {
options.use_revert_post = false;
}
return source.allDocs(context._query_options)
return callAllDocsOnStorage(context, source, cache, source_key)
.push(function (source_allDocs) {
var i,
local_dict = {},
......@@ -852,15 +940,26 @@
is_modification,
is_creation,
status_hash,
local_hash,
key,
queue = new RSVP.Queue();
for (key in source_allDocs) {
if (source_allDocs.hasOwnProperty(key)) {
if (!skip_document_dict.hasOwnProperty(key)) {
local_dict[key] = source_allDocs[key];
}
}
}
/*
for (i = 0; i < source_allDocs.data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty(
source_allDocs.data.rows[i].id
)) {
local_dict[source_allDocs.data.rows[i].id] = i;
local_dict[source_allDocs.data.rows[i].id] =
source_allDocs.data.rows[i].value;
}
}
*/
for (i = 0; i < signature_allDocs.data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty(
signature_allDocs.data.rows[i].id
......@@ -882,14 +981,28 @@
status_hash = signature_dict[key];
}
local_hash = null;
if (options.signature_select_metadata !== undefined) {
local_hash = local_dict[key][options.signature_select_metadata];
if (is_modification === true) {
// Bypass fetching all documents and calculating the sha
// Compare the select list values returned by allDocs calls
is_modification = false;
if (local_hash !== status_hash) {
is_modification = true;
}
}
}
if (is_modification === true || is_creation === true) {
argument_list.push([undefined, context, skip_document_dict,
cache, destination_key,
source, destination,
key,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
status_hash,
local_hash, status_hash,
options]);
}
}
......@@ -910,6 +1023,7 @@
argument_list_deletion.push([undefined,
context,
skip_document_dict,
cache, destination_key,
destination, key,
source,
options.conflict_force,
......@@ -941,7 +1055,8 @@
ReplicateStorage.prototype.repair = function () {
var context = this,
argument_list = arguments,
skip_document_dict = {};
skip_document_dict = {},
cache = {};
return new RSVP.Queue()
.push(function () {
......@@ -1004,6 +1119,7 @@
context._check_local_creation ||
context._check_local_deletion) {
return pushStorage(context, skip_document_dict,
cache, 'local', 'remote',
context._local_sub_storage,
context._remote_sub_storage,
signature_allDocs,
......@@ -1018,7 +1134,8 @@
check_modification: context._check_local_modification,
check_creation: context._check_local_creation,
check_deletion: context._check_local_deletion,
operation_amount: context._parallel_operation_amount
operation_amount: context._parallel_operation_amount,
signature_select_metadata: context._signature_select_metadata
})
.push(function () {
return signature_allDocs;
......@@ -1031,6 +1148,7 @@
context._check_remote_creation ||
context._check_remote_deletion) {
return pushStorage(context, skip_document_dict,
cache, 'remote', 'local',
context._remote_sub_storage,
context._local_sub_storage,
signature_allDocs, {
......@@ -1044,7 +1162,8 @@
check_modification: context._check_remote_modification,
check_creation: context._check_remote_creation,
check_deletion: context._check_remote_deletion,
operation_amount: context._parallel_operation_amount
operation_amount: context._parallel_operation_amount,
signature_select_metadata: context._signature_select_metadata
});
}
})
......
......@@ -73,6 +73,7 @@
equal(jio.__storage._check_remote_attachment_creation, false);
equal(jio.__storage._check_remote_attachment_deletion, false);
equal(jio.__storage._check_remote_attachment_modification, false);
equal(jio.__storage._signature_select_metadata, undefined);
equal(jio.__storage._custom_signature_sub_storage, false);
equal(jio.__storage._signature_hash,
......@@ -127,12 +128,14 @@
check_local_attachment_modification: true,
check_remote_attachment_creation: true,
check_remote_attachment_deletion: true,
check_remote_attachment_modification: true
check_remote_attachment_modification: true,
signature_select_metadata: 'bar'
});
deepEqual(
jio.__storage._query_options,
{query: 'portal_type: "Foo"', limit: [0, 1234567890]}
{query: 'portal_type: "Foo"', limit: [0, 1234567890],
select_list: ['bar']}
);
equal(jio.__storage._use_remote_post, true);
equal(jio.__storage._conflict_handling, 3);
......@@ -150,6 +153,7 @@
equal(jio.__storage._check_remote_attachment_creation, true);
equal(jio.__storage._check_remote_attachment_deletion, true);
equal(jio.__storage._check_remote_attachment_modification, true);
equal(jio.__storage._signature_select_metadata, 'bar');
equal(jio.__storage._custom_signature_sub_storage, false);
ok(jio.__storage._signature_sub_storage instanceof jio.constructor);
......@@ -175,7 +179,7 @@
"replicatestorage200");
equal(jio.__storage._signature_hash,
"_replicate_11881e431308c0ec8c0e6430be98db380e1b92f8");
"_replicate_291eaf37f6fa1ba6b6b115ab92b44cc88be0bb06");
});
test("reject unknow conflict resolution", function () {
......@@ -730,4 +734,4 @@
});
});
}(jIO, QUnit, Blob));
}(jIO, QUnit, Blob));
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -42,6 +42,7 @@
<script src="jio.storage/replicatestorage.tests.js"></script>
<script src="jio.storage/replicatestorage_repair.tests.js"></script>
<script src="jio.storage/replicatestorage_repairattachment.tests.js"></script>
<script src="jio.storage/replicatestorage_fastrepair.tests.js"></script>
<script src="jio.storage/shastorage.tests.js"></script>
<!--script src="jio.storage/qiniustorage.tests.js"></script-->
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment