Commit b50822bf authored by Romain Courteaud's avatar Romain Courteaud

WIP: ReplicateStorage: use a property as hash

Speed up replication by using a document property to check if there was a change.
As allDocs can return this property directly, it reduces the required calculation.
parent 027845e3
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
*/ */
/*jslint nomen: true*/ /*jslint nomen: true*/
/*global jIO, RSVP, Rusha*/ /*global jIO, RSVP, Rusha, console*/
(function (jIO, RSVP, Rusha, stringify) { (function (jIO, RSVP, Rusha, stringify) {
"use strict"; "use strict";
...@@ -46,6 +46,10 @@ ...@@ -46,6 +46,10 @@
function ReplicateStorage(spec) { function ReplicateStorage(spec) {
this._query_options = spec.query || {}; this._query_options = spec.query || {};
if (spec.signature_select_metadata !== undefined) {
this._query_options.select_list = [spec.signature_select_metadata];
}
this._signature_select_metadata = spec.signature_select_metadata;
this._local_sub_storage = jIO.createJIO(spec.local_sub_storage); this._local_sub_storage = jIO.createJIO(spec.local_sub_storage);
this._remote_sub_storage = jIO.createJIO(spec.remote_sub_storage); this._remote_sub_storage = jIO.createJIO(spec.remote_sub_storage);
...@@ -241,6 +245,26 @@ ...@@ -241,6 +245,26 @@
return result_promise_list[0]; return result_promise_list[0];
} }
function callAllDocsOnStorage(context, storage, cache, cache_key) {
return new RSVP.Queue()
.push(function () {
if (!cache.hasOwnProperty(cache_key)) {
return storage.allDocs(context._query_options)
.push(function (result) {
var i,
cache_entry = {};
for (i = 0; i < result.data.total_rows; i += 1) {
cache_entry[result.data.rows[i].id] = result.data.rows[i].value;
}
cache[cache_key] = cache_entry;
});
}
})
.push(function () {
return cache[cache_key];
});
}
function propagateAttachmentDeletion(context, skip_attachment_dict, function propagateAttachmentDeletion(context, skip_attachment_dict,
destination, destination,
id, name) { id, name) {
...@@ -604,14 +628,31 @@ ...@@ -604,14 +628,31 @@
function propagateModification(context, source, destination, doc, hash, id, function propagateModification(context, source, destination, doc, hash, id,
skip_document_dict, skip_document_dict,
options) { options) {
var result, var result = new RSVP.Queue(),
post_id, post_id,
to_skip = true; to_skip = true;
if (options === undefined) { if (options === undefined) {
options = {}; options = {};
} }
// if (options.hasOwnProperty('signature_select_metadata')) {
if (doc === null) {
result
.push(function () {
return source.get(id);
})
.push(function (source_doc) {
doc = source_doc;
}, function (error) {
console.warn(error);
// XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX
throw error;
});
}
if (options.use_post) { if (options.use_post) {
result = destination.post(doc) result
.push(function () {
return destination.post(doc);
})
.push(function (new_id) { .push(function (new_id) {
to_skip = false; to_skip = false;
post_id = new_id; post_id = new_id;
...@@ -660,7 +701,10 @@ ...@@ -660,7 +701,10 @@
skip_document_dict[post_id] = null; skip_document_dict[post_id] = null;
}); });
} else { } else {
result = destination.put(id, doc) result
.push(function () {
return destination.put(id, doc);
})
.push(function () { .push(function () {
return context._signature_sub_storage.put(id, { return context._signature_sub_storage.put(id, {
"hash": hash "hash": hash
...@@ -704,25 +748,39 @@ ...@@ -704,25 +748,39 @@
} }
function checkAndPropagate(context, skip_document_dict, function checkAndPropagate(context, skip_document_dict,
cache, destination_key,
status_hash, local_hash, doc, status_hash, local_hash, doc,
source, destination, id, source, destination, id,
conflict_force, conflict_revert, conflict_force, conflict_revert,
conflict_ignore, conflict_ignore,
options) { options) {
return destination.get(id) return new RSVP.Queue()
.push(function (remote_doc) { .push(function () {
return [remote_doc, generateHash(stringify(remote_doc))]; if (options.signature_select_metadata !== undefined) {
}, function (error) { return callAllDocsOnStorage(context, destination,
if ((error instanceof jIO.util.jIOError) && cache, destination_key)
(error.status_code === 404)) { .push(function (result) {
return [null, null]; if (result.hasOwnProperty(id)) {
return [null, result[id][options.signature_select_metadata]];
}
return [null, null];
});
} }
throw error; return destination.get(id)
.push(function (remote_doc) {
return [remote_doc, generateHash(stringify(remote_doc))];
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return [null, null];
}
throw error;
});
}) })
.push(function (remote_list) { .push(function (remote_list) {
var remote_doc = remote_list[0], var remote_doc = remote_list[0],
remote_hash = remote_list[1]; remote_hash = remote_list[1];
if (local_hash === remote_hash) { if (local_hash === remote_hash) {
// Same modifications on both side // Same modifications on both side
if (local_hash === null) { if (local_hash === null) {
...@@ -785,14 +843,17 @@ ...@@ -785,14 +843,17 @@
local_hash, id, skip_document_dict, local_hash, id, skip_document_dict,
{use_post: options.use_post}); {use_post: options.use_post});
} }
doc = doc || local_hash;
remote_doc = remote_doc || remote_hash;
throw new jIO.util.jIOError("Conflict on '" + id + "': " + throw new jIO.util.jIOError("Conflict on '" + id + "': " +
stringify(doc || '') + " !== " + stringify(doc) + " !== " +
stringify(remote_doc || ''), stringify(remote_doc),
409); 409);
}); });
} }
function checkLocalDeletion(queue, context, skip_document_dict, function checkLocalDeletion(queue, context, skip_document_dict,
cache, destination_key,
destination, id, source, destination, id, source,
conflict_force, conflict_revert, conflict_force, conflict_revert,
conflict_ignore, options) { conflict_ignore, options) {
...@@ -804,6 +865,7 @@ ...@@ -804,6 +865,7 @@
.push(function (result) { .push(function (result) {
status_hash = result.hash; status_hash = result.hash;
return checkAndPropagate(context, skip_document_dict, return checkAndPropagate(context, skip_document_dict,
cache, destination_key,
status_hash, null, null, status_hash, null, null,
source, destination, id, source, destination, id,
conflict_force, conflict_revert, conflict_force, conflict_revert,
...@@ -813,20 +875,29 @@ ...@@ -813,20 +875,29 @@
} }
function checkSignatureDifference(queue, context, skip_document_dict, function checkSignatureDifference(queue, context, skip_document_dict,
cache, destination_key,
source, destination, id, source, destination, id,
conflict_force, conflict_revert, conflict_force, conflict_revert,
conflict_ignore, conflict_ignore,
status_hash, local_hash, status_hash,
options) { options) {
queue queue
.push(function () { .push(function () {
return source.get(id); if (local_hash === null) {
// Hash was not provided by the allDocs query
return source.get(id);
}
return null;
}) })
.push(function (doc) { .push(function (doc) {
var local_hash = generateHash(stringify(doc)); if (local_hash === null) {
// Hash was not provided by the allDocs query
local_hash = generateHash(stringify(doc));
}
if (local_hash !== status_hash) { if (local_hash !== status_hash) {
return checkAndPropagate(context, skip_document_dict, return checkAndPropagate(context, skip_document_dict,
cache, destination_key,
status_hash, local_hash, doc, status_hash, local_hash, doc,
source, destination, id, source, destination, id,
conflict_force, conflict_revert, conflict_force, conflict_revert,
...@@ -837,6 +908,7 @@ ...@@ -837,6 +908,7 @@
} }
function pushStorage(context, skip_document_dict, function pushStorage(context, skip_document_dict,
cache, source_key, destination_key,
source, destination, signature_allDocs, options) { source, destination, signature_allDocs, options) {
var argument_list = [], var argument_list = [],
argument_list_deletion = []; argument_list_deletion = [];
...@@ -846,7 +918,7 @@ ...@@ -846,7 +918,7 @@
if (!options.hasOwnProperty("use_revert_post")) { if (!options.hasOwnProperty("use_revert_post")) {
options.use_revert_post = false; options.use_revert_post = false;
} }
return source.allDocs(context._query_options) return callAllDocsOnStorage(context, source, cache, source_key)
.push(function (source_allDocs) { .push(function (source_allDocs) {
var i, var i,
local_dict = {}, local_dict = {},
...@@ -854,15 +926,26 @@ ...@@ -854,15 +926,26 @@
is_modification, is_modification,
is_creation, is_creation,
status_hash, status_hash,
local_hash,
key, key,
queue = new RSVP.Queue(); queue = new RSVP.Queue();
for (key in source_allDocs) {
if (source_allDocs.hasOwnProperty(key)) {
if (!skip_document_dict.hasOwnProperty(key)) {
local_dict[key] = source_allDocs[key];
}
}
}
/*
for (i = 0; i < source_allDocs.data.total_rows; i += 1) { for (i = 0; i < source_allDocs.data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty( if (!skip_document_dict.hasOwnProperty(
source_allDocs.data.rows[i].id source_allDocs.data.rows[i].id
)) { )) {
local_dict[source_allDocs.data.rows[i].id] = i; local_dict[source_allDocs.data.rows[i].id] =
source_allDocs.data.rows[i].value;
} }
} }
*/
for (i = 0; i < signature_allDocs.data.total_rows; i += 1) { for (i = 0; i < signature_allDocs.data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty( if (!skip_document_dict.hasOwnProperty(
signature_allDocs.data.rows[i].id signature_allDocs.data.rows[i].id
...@@ -884,14 +967,28 @@ ...@@ -884,14 +967,28 @@
status_hash = signature_dict[key]; status_hash = signature_dict[key];
} }
local_hash = null;
if (options.signature_select_metadata !== undefined) {
local_hash = local_dict[key][options.signature_select_metadata];
if (is_modification === true) {
// Bypass fetching all documents and calculating the sha
// Compare the select list values returned by allDocs calls
is_modification = false;
if (local_hash !== status_hash) {
is_modification = true;
}
}
}
if (is_modification === true || is_creation === true) { if (is_modification === true || is_creation === true) {
argument_list.push([undefined, context, skip_document_dict, argument_list.push([undefined, context, skip_document_dict,
cache, destination_key,
source, destination, source, destination,
key, key,
options.conflict_force, options.conflict_force,
options.conflict_revert, options.conflict_revert,
options.conflict_ignore, options.conflict_ignore,
status_hash, local_hash, status_hash,
options]); options]);
} }
} }
...@@ -912,6 +1009,7 @@ ...@@ -912,6 +1009,7 @@
argument_list_deletion.push([undefined, argument_list_deletion.push([undefined,
context, context,
skip_document_dict, skip_document_dict,
cache, destination_key,
destination, key, destination, key,
source, source,
options.conflict_force, options.conflict_force,
...@@ -943,7 +1041,8 @@ ...@@ -943,7 +1041,8 @@
ReplicateStorage.prototype.repair = function () { ReplicateStorage.prototype.repair = function () {
var context = this, var context = this,
argument_list = arguments, argument_list = arguments,
skip_document_dict = {}; skip_document_dict = {},
cache = {};
// Do not sync the signature document // Do not sync the signature document
skip_document_dict[context._signature_hash] = null; skip_document_dict[context._signature_hash] = null;
...@@ -1006,6 +1105,7 @@ ...@@ -1006,6 +1105,7 @@
context._check_local_creation || context._check_local_creation ||
context._check_local_deletion) { context._check_local_deletion) {
return pushStorage(context, skip_document_dict, return pushStorage(context, skip_document_dict,
cache, 'local', 'remote',
context._local_sub_storage, context._local_sub_storage,
context._remote_sub_storage, context._remote_sub_storage,
signature_allDocs, signature_allDocs,
...@@ -1020,7 +1120,8 @@ ...@@ -1020,7 +1120,8 @@
check_modification: context._check_local_modification, check_modification: context._check_local_modification,
check_creation: context._check_local_creation, check_creation: context._check_local_creation,
check_deletion: context._check_local_deletion, check_deletion: context._check_local_deletion,
operation_amount: context._parallel_operation_amount operation_amount: context._parallel_operation_amount,
signature_select_metadata: context._signature_select_metadata
}) })
.push(function () { .push(function () {
return signature_allDocs; return signature_allDocs;
...@@ -1033,6 +1134,7 @@ ...@@ -1033,6 +1134,7 @@
context._check_remote_creation || context._check_remote_creation ||
context._check_remote_deletion) { context._check_remote_deletion) {
return pushStorage(context, skip_document_dict, return pushStorage(context, skip_document_dict,
cache, 'remote', 'local',
context._remote_sub_storage, context._remote_sub_storage,
context._local_sub_storage, context._local_sub_storage,
signature_allDocs, { signature_allDocs, {
...@@ -1046,7 +1148,8 @@ ...@@ -1046,7 +1148,8 @@
check_modification: context._check_remote_modification, check_modification: context._check_remote_modification,
check_creation: context._check_remote_creation, check_creation: context._check_remote_creation,
check_deletion: context._check_remote_deletion, check_deletion: context._check_remote_deletion,
operation_amount: context._parallel_operation_amount operation_amount: context._parallel_operation_amount,
signature_select_metadata: context._signature_select_metadata
}); });
} }
}) })
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment