Commit 10a451a4 authored by Romain Courteaud's avatar Romain Courteaud

ReplicateStorage: stop relying on closure

Simplify the code, to allow more changes later.
parent 51d97ae6
...@@ -202,719 +202,753 @@ ...@@ -202,719 +202,753 @@
arguments); arguments);
}; };
ReplicateStorage.prototype.repair = function () { function dispatchQueue(context, function_used, argument_list,
var context = this, number_queue) {
argument_list = arguments, var result_promise_list = [],
skip_document_dict = {}; i;
// Do not sync the signature document
skip_document_dict[context._signature_hash] = null;
function dispatchQueue(function_used, argument_list, number_queue) {
var result_promise_list = [],
i;
function pushAndExecute(queue) {
queue
.push(function () {
if (argument_list.length > 0) {
var argument_array = argument_list.shift(),
sub_queue = new RSVP.Queue();
argument_array[0] = sub_queue;
function_used.apply(context, argument_array);
pushAndExecute(queue);
return sub_queue;
}
});
}
for (i = 0; i < number_queue; i += 1) {
result_promise_list.push(new RSVP.Queue());
pushAndExecute(result_promise_list[i]);
}
if (number_queue > 1) {
return RSVP.all(result_promise_list);
}
return result_promise_list[0];
}
function propagateAttachmentDeletion(skip_attachment_dict, function pushAndExecute(queue) {
destination, queue
id, name) {
return destination.removeAttachment(id, name)
.push(function () {
return context._signature_sub_storage.removeAttachment(id, name);
})
.push(function () { .push(function () {
skip_attachment_dict[name] = null; if (argument_list.length > 0) {
var argument_array = argument_list.shift(),
sub_queue = new RSVP.Queue();
argument_array[0] = sub_queue;
function_used.apply(context, argument_array);
pushAndExecute(queue);
return sub_queue;
}
}); });
} }
for (i = 0; i < number_queue; i += 1) {
function propagateAttachmentModification(skip_attachment_dict, result_promise_list.push(new RSVP.Queue());
destination, pushAndExecute(result_promise_list[i]);
blob, hash, id, name) { }
return destination.putAttachment(id, name, blob) if (number_queue > 1) {
.push(function () { return RSVP.all(result_promise_list);
return context._signature_sub_storage.putAttachment(id, name,
JSON.stringify({
hash: hash
}));
})
.push(function () {
skip_attachment_dict[name] = null;
});
} }
return result_promise_list[0];
}
function checkAndPropagateAttachment(skip_attachment_dict, function propagateAttachmentDeletion(context, skip_attachment_dict,
status_hash, local_hash, blob, destination,
source, destination, id, name, id, name) {
conflict_force, conflict_revert, return destination.removeAttachment(id, name)
conflict_ignore) { .push(function () {
var remote_blob; return context._signature_sub_storage.removeAttachment(id, name);
return destination.getAttachment(id, name) })
.push(function (result) { .push(function () {
remote_blob = result; skip_attachment_dict[name] = null;
return jIO.util.readBlobAsArrayBuffer(remote_blob); });
}) }
.push(function (evt) {
return generateHashFromArrayBuffer(
evt.target.result
);
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
remote_blob = null;
return null;
}
throw error;
})
.push(function (remote_hash) {
if (local_hash === remote_hash) {
// Same modifications on both side
if (local_hash === null) {
// Deleted on both side, drop signature
return context._signature_sub_storage.removeAttachment(id, name)
.push(function () {
skip_attachment_dict[id] = null;
});
}
return context._signature_sub_storage.putAttachment(id, name, function propagateAttachmentModification(context, skip_attachment_dict,
JSON.stringify({ destination,
hash: local_hash blob, hash, id, name) {
})) return destination.putAttachment(id, name, blob)
.push(function () {
return context._signature_sub_storage.putAttachment(id, name,
JSON.stringify({
hash: hash
}));
})
.push(function () {
skip_attachment_dict[name] = null;
});
}
function checkAndPropagateAttachment(context, skip_document_dict,
skip_attachment_dict,
status_hash, local_hash, blob,
source, destination, id, name,
conflict_force, conflict_revert,
conflict_ignore) {
var remote_blob;
return destination.getAttachment(id, name)
.push(function (result) {
remote_blob = result;
return jIO.util.readBlobAsArrayBuffer(remote_blob);
})
.push(function (evt) {
return generateHashFromArrayBuffer(
evt.target.result
);
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
remote_blob = null;
return null;
}
throw error;
})
.push(function (remote_hash) {
if (local_hash === remote_hash) {
// Same modifications on both side
if (local_hash === null) {
// Deleted on both side, drop signature
return context._signature_sub_storage.removeAttachment(id, name)
.push(function () { .push(function () {
skip_document_dict[id] = null; skip_attachment_dict[id] = null;
}); });
} }
if ((remote_hash === status_hash) || (conflict_force === true)) { return context._signature_sub_storage.putAttachment(id, name,
// Modified only locally. No conflict or force JSON.stringify({
if (local_hash === null) { hash: local_hash
// Deleted locally }))
return propagateAttachmentDeletion(skip_attachment_dict, .push(function () {
destination, skip_document_dict[id] = null;
id, name); });
} }
return propagateAttachmentModification(skip_attachment_dict,
destination, blob,
local_hash, id, name);
}
// Conflict cases if ((remote_hash === status_hash) || (conflict_force === true)) {
if (conflict_ignore === true) { // Modified only locally. No conflict or force
return; if (local_hash === null) {
// Deleted locally
return propagateAttachmentDeletion(context, skip_attachment_dict,
destination,
id, name);
} }
return propagateAttachmentModification(context,
skip_attachment_dict,
destination, blob,
local_hash, id, name);
}
if ((conflict_revert === true) || (local_hash === null)) { // Conflict cases
// Automatically resolve conflict or force revert if (conflict_ignore === true) {
if (remote_hash === null) { return;
// Deleted remotely }
return propagateAttachmentDeletion(skip_attachment_dict,
source, id, name);
}
return propagateAttachmentModification(
skip_attachment_dict,
source,
remote_blob,
remote_hash,
id,
name
);
}
// Minimize conflict if it can be resolved if ((conflict_revert === true) || (local_hash === null)) {
// Automatically resolve conflict or force revert
if (remote_hash === null) { if (remote_hash === null) {
// Copy remote modification remotely // Deleted remotely
return propagateAttachmentModification(skip_attachment_dict, return propagateAttachmentDeletion(context, skip_attachment_dict,
destination, blob, source, id, name);
local_hash, id, name);
} }
throw new jIO.util.jIOError("Conflict on '" + id + return propagateAttachmentModification(
"' with attachment '" + context,
name + "'", skip_attachment_dict,
409); source,
}); remote_blob,
} remote_hash,
id,
name
);
}
function checkAttachmentSignatureDifference(queue, skip_attachment_dict, // Minimize conflict if it can be resolved
source, if (remote_hash === null) {
destination, id, name, // Copy remote modification remotely
conflict_force, return propagateAttachmentModification(context,
conflict_revert, skip_attachment_dict,
conflict_ignore, destination, blob,
is_creation, is_modification) { local_hash, id, name);
var blob, }
status_hash; throw new jIO.util.jIOError("Conflict on '" + id +
queue "' with attachment '" +
.push(function () { name + "'",
// Optimisation to save a get call to signature storage 409);
if (is_creation === true) { });
return RSVP.all([ }
source.getAttachment(id, name),
{hash: null}
]);
}
if (is_modification === true) {
return RSVP.all([
source.getAttachment(id, name),
context._signature_sub_storage.getAttachment(
id,
name,
{format: 'json'}
)
]);
}
throw new jIO.util.jIOError("Unexpected call of"
+ " checkAttachmentSignatureDifference",
409);
})
.push(function (result_list) {
blob = result_list[0];
status_hash = result_list[1].hash;
return jIO.util.readBlobAsArrayBuffer(blob);
})
.push(function (evt) {
var array_buffer = evt.target.result,
local_hash = generateHashFromArrayBuffer(array_buffer);
if (local_hash !== status_hash) { function checkAttachmentSignatureDifference(queue, context,
return checkAndPropagateAttachment(skip_attachment_dict, skip_document_dict,
status_hash, local_hash, blob, skip_attachment_dict,
source, destination, id, name, source,
conflict_force, conflict_revert, destination, id, name,
conflict_ignore); conflict_force,
} conflict_revert,
}); conflict_ignore,
} is_creation, is_modification) {
var blob,
status_hash;
queue
.push(function () {
// Optimisation to save a get call to signature storage
if (is_creation === true) {
return RSVP.all([
source.getAttachment(id, name),
{hash: null}
]);
}
if (is_modification === true) {
return RSVP.all([
source.getAttachment(id, name),
context._signature_sub_storage.getAttachment(
id,
name,
{format: 'json'}
)
]);
}
throw new jIO.util.jIOError("Unexpected call of"
+ " checkAttachmentSignatureDifference",
409);
})
.push(function (result_list) {
blob = result_list[0];
status_hash = result_list[1].hash;
return jIO.util.readBlobAsArrayBuffer(blob);
})
.push(function (evt) {
var array_buffer = evt.target.result,
local_hash = generateHashFromArrayBuffer(array_buffer);
function checkAttachmentLocalDeletion(queue, skip_attachment_dict, if (local_hash !== status_hash) {
destination, id, name, source, return checkAndPropagateAttachment(context, skip_document_dict,
conflict_force, conflict_revert, skip_attachment_dict,
conflict_ignore) { status_hash, local_hash, blob,
var status_hash; source, destination, id, name,
queue conflict_force, conflict_revert,
.push(function () { conflict_ignore);
return context._signature_sub_storage.getAttachment(id, name, }
{format: 'json'}); });
}) }
.push(function (result) {
status_hash = result.hash;
return checkAndPropagateAttachment(skip_attachment_dict,
status_hash, null, null,
source, destination, id, name,
conflict_force, conflict_revert,
conflict_ignore);
});
}
function pushDocumentAttachment(skip_attachment_dict, id, source, function checkAttachmentLocalDeletion(queue, context, skip_document_dict,
destination, options) { skip_attachment_dict,
var queue = new RSVP.Queue(), destination, id, name, source,
local_dict = {}, conflict_force, conflict_revert,
signature_dict = {}; conflict_ignore) {
var status_hash;
queue
.push(function () {
return context._signature_sub_storage.getAttachment(id, name,
{format: 'json'});
})
.push(function (result) {
status_hash = result.hash;
return checkAndPropagateAttachment(context, skip_document_dict,
skip_attachment_dict,
status_hash, null, null,
source, destination, id, name,
conflict_force, conflict_revert,
conflict_ignore);
});
}
return queue function pushDocumentAttachment(context, skip_document_dict,
.push(function () { skip_attachment_dict, id, source,
return RSVP.all([ destination, options) {
source.allAttachments(id) var queue = new RSVP.Queue(),
.push(undefined, function (error) { local_dict = {},
if ((error instanceof jIO.util.jIOError) && signature_dict = {};
(error.status_code === 404)) { return queue
return {}; .push(function () {
} return RSVP.all([
throw error; source.allAttachments(id)
}), .push(undefined, function (error) {
context._signature_sub_storage.allAttachments(id) if ((error instanceof jIO.util.jIOError) &&
.push(undefined, function (error) { (error.status_code === 404)) {
if ((error instanceof jIO.util.jIOError) && return {};
(error.status_code === 404)) {
return {};
}
throw error;
})
]);
})
.push(function (result_list) {
var is_modification,
is_creation,
key,
argument_list = [];
for (key in result_list[0]) {
if (result_list[0].hasOwnProperty(key)) {
if (!skip_attachment_dict.hasOwnProperty(key)) {
local_dict[key] = null;
} }
throw error;
}),
context._signature_sub_storage.allAttachments(id)
.push(undefined, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return {};
}
throw error;
})
]);
})
.push(function (result_list) {
var is_modification,
is_creation,
key,
argument_list = [];
for (key in result_list[0]) {
if (result_list[0].hasOwnProperty(key)) {
if (!skip_attachment_dict.hasOwnProperty(key)) {
local_dict[key] = null;
} }
} }
for (key in result_list[1]) { }
if (result_list[1].hasOwnProperty(key)) { for (key in result_list[1]) {
if (!skip_attachment_dict.hasOwnProperty(key)) { if (result_list[1].hasOwnProperty(key)) {
signature_dict[key] = null; if (!skip_attachment_dict.hasOwnProperty(key)) {
} signature_dict[key] = null;
} }
} }
}
for (key in local_dict) { for (key in local_dict) {
if (local_dict.hasOwnProperty(key)) { if (local_dict.hasOwnProperty(key)) {
is_modification = signature_dict.hasOwnProperty(key) is_modification = signature_dict.hasOwnProperty(key)
&& options.check_modification; && options.check_modification;
is_creation = !signature_dict.hasOwnProperty(key) is_creation = !signature_dict.hasOwnProperty(key)
&& options.check_creation; && options.check_creation;
if (is_modification === true || is_creation === true) { if (is_modification === true || is_creation === true) {
argument_list.push([undefined,
context,
skip_document_dict,
skip_attachment_dict,
source,
destination, id, key,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
is_creation,
is_modification]);
}
}
}
return dispatchQueue(
context,
checkAttachmentSignatureDifference,
argument_list,
context._parallel_operation_attachment_amount
);
})
.push(function () {
var key, argument_list = [];
if (options.check_deletion === true) {
for (key in signature_dict) {
if (signature_dict.hasOwnProperty(key)) {
if (!local_dict.hasOwnProperty(key)) {
argument_list.push([undefined, argument_list.push([undefined,
skip_attachment_dict, context,
source, skip_document_dict,
destination, id, key, skip_attachment_dict,
options.conflict_force, destination, id, key,
options.conflict_revert, source,
options.conflict_ignore, options.conflict_force,
is_creation, options.conflict_revert,
is_modification]); options.conflict_ignore]);
} }
} }
} }
return dispatchQueue( return dispatchQueue(
checkAttachmentSignatureDifference, context,
checkAttachmentLocalDeletion,
argument_list, argument_list,
context._parallel_operation_attachment_amount context._parallel_operation_attachment_amount
); );
}) }
.push(function () { });
var key, argument_list = []; }
if (options.check_deletion === true) {
for (key in signature_dict) {
if (signature_dict.hasOwnProperty(key)) {
if (!local_dict.hasOwnProperty(key)) {
argument_list.push([undefined,
skip_attachment_dict,
destination, id, key,
source,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore]);
}
}
}
return dispatchQueue(
checkAttachmentLocalDeletion,
argument_list,
context._parallel_operation_attachment_amount
);
}
});
}
function repairDocumentAttachment(id) {
var skip_attachment_dict = {};
return new RSVP.Queue()
.push(function () {
if (context._check_local_attachment_modification ||
context._check_local_attachment_creation ||
context._check_local_attachment_deletion) {
return pushDocumentAttachment(
skip_attachment_dict,
id,
context._local_sub_storage,
context._remote_sub_storage,
{
conflict_force: (context._conflict_handling ===
CONFLICT_KEEP_LOCAL),
conflict_revert: (context._conflict_handling ===
CONFLICT_KEEP_REMOTE),
conflict_ignore: (context._conflict_handling ===
CONFLICT_CONTINUE),
check_modification:
context._check_local_attachment_modification,
check_creation: context._check_local_attachment_creation,
check_deletion: context._check_local_attachment_deletion
}
);
}
})
.push(function () {
if (context._check_remote_attachment_modification ||
context._check_remote_attachment_creation ||
context._check_remote_attachment_deletion) {
return pushDocumentAttachment(
skip_attachment_dict,
id,
context._remote_sub_storage,
context._local_sub_storage,
{
use_revert_post: context._use_remote_post,
conflict_force: (context._conflict_handling ===
CONFLICT_KEEP_REMOTE),
conflict_revert: (context._conflict_handling ===
CONFLICT_KEEP_LOCAL),
conflict_ignore: (context._conflict_handling ===
CONFLICT_CONTINUE),
check_modification:
context._check_remote_attachment_modification,
check_creation: context._check_remote_attachment_creation,
check_deletion: context._check_remote_attachment_deletion
}
);
}
});
}
function propagateModification(source, destination, doc, hash, id,
options) {
var result,
post_id,
to_skip = true;
if (options === undefined) {
options = {};
}
if (options.use_post) {
result = destination.post(doc)
.push(function (new_id) {
to_skip = false;
post_id = new_id;
return source.put(post_id, doc);
})
.push(function () {
// Copy all attachments
// This is not related to attachment replication
// It's just about not losing user data
return source.allAttachments(id);
})
.push(function (attachment_dict) {
var key,
copy_queue = new RSVP.Queue();
function copyAttachment(name) { function repairDocumentAttachment(context, id, skip_document_dict) {
copy_queue var skip_attachment_dict = {};
.push(function () { return new RSVP.Queue()
return source.getAttachment(id, name); .push(function () {
}) if (context._check_local_attachment_modification ||
.push(function (blob) { context._check_local_attachment_creation ||
return source.putAttachment(post_id, name, blob); context._check_local_attachment_deletion) {
}); return pushDocumentAttachment(
context,
skip_document_dict,
skip_attachment_dict,
id,
context._local_sub_storage,
context._remote_sub_storage,
{
conflict_force: (context._conflict_handling ===
CONFLICT_KEEP_LOCAL),
conflict_revert: (context._conflict_handling ===
CONFLICT_KEEP_REMOTE),
conflict_ignore: (context._conflict_handling ===
CONFLICT_CONTINUE),
check_modification:
context._check_local_attachment_modification,
check_creation: context._check_local_attachment_creation,
check_deletion: context._check_local_attachment_deletion
} }
);
for (key in attachment_dict) { }
if (attachment_dict.hasOwnProperty(key)) { })
copyAttachment(key); .push(function () {
} if (context._check_remote_attachment_modification ||
context._check_remote_attachment_creation ||
context._check_remote_attachment_deletion) {
return pushDocumentAttachment(
context,
skip_document_dict,
skip_attachment_dict,
id,
context._remote_sub_storage,
context._local_sub_storage,
{
use_revert_post: context._use_remote_post,
conflict_force: (context._conflict_handling ===
CONFLICT_KEEP_REMOTE),
conflict_revert: (context._conflict_handling ===
CONFLICT_KEEP_LOCAL),
conflict_ignore: (context._conflict_handling ===
CONFLICT_CONTINUE),
check_modification:
context._check_remote_attachment_modification,
check_creation: context._check_remote_attachment_creation,
check_deletion: context._check_remote_attachment_deletion
} }
return copy_queue; );
}) }
.push(function () { });
return source.remove(id); }
})
.push(function () {
return context._signature_sub_storage.remove(id);
})
.push(function () {
to_skip = true;
return context._signature_sub_storage.put(post_id, {
"hash": hash
});
})
.push(function () {
skip_document_dict[post_id] = null;
});
} else {
result = destination.put(id, doc)
.push(function () {
return context._signature_sub_storage.put(id, {
"hash": hash
});
});
}
return result
.push(function () {
if (to_skip) {
skip_document_dict[id] = null;
}
});
}
function propagateDeletion(destination, id) { function propagateModification(context, source, destination, doc, hash, id,
// Do not delete a document if it has an attachment skip_document_dict,
// ie, replication should prevent losing user data options) {
// Synchronize attachments before, to ensure var result,
// all of them will be deleted too post_id,
return repairDocumentAttachment(id) to_skip = true;
if (options === undefined) {
options = {};
}
if (options.use_post) {
result = destination.post(doc)
.push(function (new_id) {
to_skip = false;
post_id = new_id;
return source.put(post_id, doc);
})
.push(function () { .push(function () {
return destination.allAttachments(id); // Copy all attachments
// This is not related to attachment replication
// It's just about not losing user data
return source.allAttachments(id);
}) })
.push(function (attachment_dict) { .push(function (attachment_dict) {
if (JSON.stringify(attachment_dict) === "{}") { var key,
return destination.remove(id) copy_queue = new RSVP.Queue();
function copyAttachment(name) {
copy_queue
.push(function () { .push(function () {
return context._signature_sub_storage.remove(id); return source.getAttachment(id, name);
})
.push(function (blob) {
return source.putAttachment(post_id, name, blob);
}); });
} }
}, function (error) {
if ((error instanceof jIO.util.jIOError) && for (key in attachment_dict) {
(error.status_code === 404)) { if (attachment_dict.hasOwnProperty(key)) {
return; copyAttachment(key);
}
} }
throw error; return copy_queue;
}) })
.push(function () { .push(function () {
skip_document_dict[id] = null; return source.remove(id);
})
.push(function () {
return context._signature_sub_storage.remove(id);
})
.push(function () {
to_skip = true;
return context._signature_sub_storage.put(post_id, {
"hash": hash
});
})
.push(function () {
skip_document_dict[post_id] = null;
});
} else {
result = destination.put(id, doc)
.push(function () {
return context._signature_sub_storage.put(id, {
"hash": hash
});
}); });
} }
return result
.push(function () {
if (to_skip) {
skip_document_dict[id] = null;
}
});
}
function checkAndPropagate(status_hash, local_hash, doc, function propagateDeletion(context, destination, id, skip_document_dict) {
source, destination, id, // Do not delete a document if it has an attachment
conflict_force, conflict_revert, // ie, replication should prevent losing user data
conflict_ignore, // Synchronize attachments before, to ensure
options) { // all of them will be deleted too
return destination.get(id) return repairDocumentAttachment(context, id, skip_document_dict)
.push(function (remote_doc) { .push(function () {
return [remote_doc, generateHash(stringify(remote_doc))]; return destination.allAttachments(id);
}, function (error) { })
if ((error instanceof jIO.util.jIOError) && .push(function (attachment_dict) {
(error.status_code === 404)) { if (JSON.stringify(attachment_dict) === "{}") {
return [null, null]; return destination.remove(id)
} .push(function () {
throw error; return context._signature_sub_storage.remove(id);
}) });
.push(function (remote_list) { }
var remote_doc = remote_list[0], }, function (error) {
remote_hash = remote_list[1]; if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return;
}
throw error;
})
.push(function () {
skip_document_dict[id] = null;
});
}
if (local_hash === remote_hash) { function checkAndPropagate(context, skip_document_dict,
// Same modifications on both side status_hash, local_hash, doc,
if (local_hash === null) { source, destination, id,
// Deleted on both side, drop signature conflict_force, conflict_revert,
return context._signature_sub_storage.remove(id) conflict_ignore,
.push(function () { options) {
skip_document_dict[id] = null; return destination.get(id)
}); .push(function (remote_doc) {
} return [remote_doc, generateHash(stringify(remote_doc))];
}, function (error) {
if ((error instanceof jIO.util.jIOError) &&
(error.status_code === 404)) {
return [null, null];
}
throw error;
})
.push(function (remote_list) {
var remote_doc = remote_list[0],
remote_hash = remote_list[1];
return context._signature_sub_storage.put(id, { if (local_hash === remote_hash) {
"hash": local_hash // Same modifications on both side
}) if (local_hash === null) {
// Deleted on both side, drop signature
return context._signature_sub_storage.remove(id)
.push(function () { .push(function () {
skip_document_dict[id] = null; skip_document_dict[id] = null;
}); });
} }
if ((remote_hash === status_hash) || (conflict_force === true)) { return context._signature_sub_storage.put(id, {
// Modified only locally. No conflict or force "hash": local_hash
if (local_hash === null) { })
// Deleted locally .push(function () {
return propagateDeletion(destination, id); skip_document_dict[id] = null;
} });
return propagateModification(source, destination, doc, }
local_hash, id,
{use_post: ((options.use_post) &&
(remote_hash === null))});
}
// Conflict cases if ((remote_hash === status_hash) || (conflict_force === true)) {
if (conflict_ignore === true) { // Modified only locally. No conflict or force
return; if (local_hash === null) {
// Deleted locally
return propagateDeletion(context, destination, id,
skip_document_dict);
} }
return propagateModification(context, source, destination, doc,
local_hash, id, skip_document_dict,
{use_post: ((options.use_post) &&
(remote_hash === null))});
}
if ((conflict_revert === true) || (local_hash === null)) { // Conflict cases
// Automatically resolve conflict or force revert if (conflict_ignore === true) {
if (remote_hash === null) { return;
// Deleted remotely }
return propagateDeletion(source, id);
}
return propagateModification(
destination,
source,
remote_doc,
remote_hash,
id,
{use_post: ((options.use_revert_post) &&
(local_hash === null))}
);
}
// Minimize conflict if it can be resolved if ((conflict_revert === true) || (local_hash === null)) {
// Automatically resolve conflict or force revert
if (remote_hash === null) { if (remote_hash === null) {
// Copy remote modification remotely // Deleted remotely
return propagateModification(source, destination, doc, return propagateDeletion(context, source, id, skip_document_dict);
local_hash, id,
{use_post: options.use_post});
} }
throw new jIO.util.jIOError("Conflict on '" + id + "': " + return propagateModification(
stringify(doc || '') + " !== " + context,
stringify(remote_doc || ''), destination,
409); source,
}); remote_doc,
} remote_hash,
id,
skip_document_dict,
{use_post: ((options.use_revert_post) &&
(local_hash === null))}
);
}
function checkLocalDeletion(queue, destination, id, source, // Minimize conflict if it can be resolved
conflict_force, conflict_revert, if (remote_hash === null) {
conflict_ignore, options) { // Copy remote modification remotely
var status_hash; return propagateModification(context, source, destination, doc,
queue local_hash, id, skip_document_dict,
.push(function () { {use_post: options.use_post});
return context._signature_sub_storage.get(id); }
}) throw new jIO.util.jIOError("Conflict on '" + id + "': " +
.push(function (result) { stringify(doc || '') + " !== " +
status_hash = result.hash; stringify(remote_doc || ''),
return checkAndPropagate(status_hash, null, null, 409);
});
}
function checkLocalDeletion(queue, context, skip_document_dict,
destination, id, source,
conflict_force, conflict_revert,
conflict_ignore, options) {
var status_hash;
queue
.push(function () {
return context._signature_sub_storage.get(id);
})
.push(function (result) {
status_hash = result.hash;
return checkAndPropagate(context, skip_document_dict,
status_hash, null, null,
source, destination, id,
conflict_force, conflict_revert,
conflict_ignore,
options);
});
}
function checkSignatureDifference(queue, context, skip_document_dict,
source, destination, id,
conflict_force, conflict_revert,
conflict_ignore,
is_creation, is_modification,
getMethod, options) {
queue
.push(function () {
// Optimisation to save a get call to signature storage
if (is_creation === true) {
return RSVP.all([
getMethod(id),
{hash: null}
]);
}
if (is_modification === true) {
return RSVP.all([
getMethod(id),
context._signature_sub_storage.get(id)
]);
}
throw new jIO.util.jIOError("Unexpected call of"
+ " checkSignatureDifference",
409);
})
.push(function (result_list) {
var doc = result_list[0],
local_hash = generateHash(stringify(doc)),
status_hash = result_list[1].hash;
if (local_hash !== status_hash) {
return checkAndPropagate(context, skip_document_dict,
status_hash, local_hash, doc,
source, destination, id, source, destination, id,
conflict_force, conflict_revert, conflict_force, conflict_revert,
conflict_ignore, conflict_ignore,
options); options);
}); }
} });
}
function checkSignatureDifference(queue, source, destination, id,
conflict_force, conflict_revert,
conflict_ignore,
is_creation, is_modification,
getMethod, options) {
queue
.push(function () {
// Optimisation to save a get call to signature storage
if (is_creation === true) {
return RSVP.all([
getMethod(id),
{hash: null}
]);
}
if (is_modification === true) {
return RSVP.all([
getMethod(id),
context._signature_sub_storage.get(id)
]);
}
throw new jIO.util.jIOError("Unexpected call of"
+ " checkSignatureDifference",
409);
})
.push(function (result_list) {
var doc = result_list[0],
local_hash = generateHash(stringify(doc)),
status_hash = result_list[1].hash;
if (local_hash !== status_hash) { function pushStorage(context, skip_document_dict,
return checkAndPropagate(status_hash, local_hash, doc, source, destination, signature_allDocs, options) {
source, destination, id, var argument_list = [],
conflict_force, conflict_revert, argument_list_deletion = [];
conflict_ignore, if (!options.hasOwnProperty("use_post")) {
options); options.use_post = false;
}
});
} }
if (!options.hasOwnProperty("use_revert_post")) {
function pushStorage(source, destination, signature_allDocs, options) { options.use_revert_post = false;
var argument_list = [], }
argument_list_deletion = []; return source.allDocs(context._query_options)
if (!options.hasOwnProperty("use_post")) { .push(function (source_allDocs) {
options.use_post = false; var i,
} local_dict = {},
if (!options.hasOwnProperty("use_revert_post")) { signature_dict = {},
options.use_revert_post = false; is_modification,
} is_creation,
return source.allDocs(context._query_options) key,
.push(function (source_allDocs) { queue = new RSVP.Queue();
var i, for (i = 0; i < source_allDocs.data.total_rows; i += 1) {
local_dict = {}, if (!skip_document_dict.hasOwnProperty(
signature_dict = {}, source_allDocs.data.rows[i].id
is_modification, )) {
is_creation, local_dict[source_allDocs.data.rows[i].id] = i;
key,
queue = new RSVP.Queue();
for (i = 0; i < source_allDocs.data.total_rows; i += 1) {
if (!skip_document_dict.hasOwnProperty(
source_allDocs.data.rows[i].id
)) {
local_dict[source_allDocs.data.rows[i].id] = i;
}
} }
for (i = 0; i < signature_allDocs.data.total_rows; i += 1) { }
if (!skip_document_dict.hasOwnProperty( for (i = 0; i < signature_allDocs.data.total_rows; i += 1) {
signature_allDocs.data.rows[i].id if (!skip_document_dict.hasOwnProperty(
)) { signature_allDocs.data.rows[i].id
signature_dict[signature_allDocs.data.rows[i].id] = i; )) {
signature_dict[signature_allDocs.data.rows[i].id] = i;
}
}
i = 0;
for (key in local_dict) {
if (local_dict.hasOwnProperty(key)) {
is_modification = signature_dict.hasOwnProperty(key)
&& options.check_modification;
is_creation = !signature_dict.hasOwnProperty(key)
&& options.check_creation;
if (is_modification === true || is_creation === true) {
argument_list[i] = [undefined, context, skip_document_dict,
source, destination,
key,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
is_creation, is_modification,
source.get.bind(source),
options];
i += 1;
} }
} }
}
queue
.push(function () {
return dispatchQueue(
context,
checkSignatureDifference,
argument_list,
options.operation_amount
);
});
if (options.check_deletion === true) {
i = 0; i = 0;
for (key in local_dict) { for (key in signature_dict) {
if (local_dict.hasOwnProperty(key)) { if (signature_dict.hasOwnProperty(key)) {
is_modification = signature_dict.hasOwnProperty(key) if (!local_dict.hasOwnProperty(key)) {
&& options.check_modification; argument_list_deletion[i] = [undefined,
is_creation = !signature_dict.hasOwnProperty(key) context,
&& options.check_creation; skip_document_dict,
if (is_modification === true || is_creation === true) { destination, key,
argument_list[i] = [undefined, source, destination, source,
key, options.conflict_force,
options.conflict_force, options.conflict_revert,
options.conflict_revert, options.conflict_ignore,
options.conflict_ignore, options];
is_creation, is_modification,
source.get.bind(source),
options];
i += 1; i += 1;
} }
} }
} }
queue queue.push(function () {
.push(function () { return dispatchQueue(
return dispatchQueue( context,
checkSignatureDifference, checkLocalDeletion,
argument_list, argument_list_deletion,
options.operation_amount options.operation_amount
); );
}); });
if (options.check_deletion === true) { }
i = 0; return queue;
for (key in signature_dict) {
if (signature_dict.hasOwnProperty(key)) {
if (!local_dict.hasOwnProperty(key)) {
argument_list_deletion[i] = [undefined,
destination, key,
source,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
options];
i += 1;
}
}
}
queue.push(function () {
return dispatchQueue(
checkLocalDeletion,
argument_list_deletion,
options.operation_amount
);
});
}
return queue;
});
}
function repairDocument(queue, id) {
queue.push(function () {
return repairDocumentAttachment(id);
}); });
} }
function repairDocument(queue, context, id, skip_document_dict) {
queue.push(function () {
return repairDocumentAttachment(context, id, skip_document_dict);
});
}
ReplicateStorage.prototype.repair = function () {
var context = this,
argument_list = arguments,
skip_document_dict = {};
// Do not sync the signature document
skip_document_dict[context._signature_hash] = null;
return new RSVP.Queue() return new RSVP.Queue()
.push(function () { .push(function () {
...@@ -967,7 +1001,8 @@ ...@@ -967,7 +1001,8 @@
if (context._check_local_modification || if (context._check_local_modification ||
context._check_local_creation || context._check_local_creation ||
context._check_local_deletion) { context._check_local_deletion) {
return pushStorage(context._local_sub_storage, return pushStorage(context, skip_document_dict,
context._local_sub_storage,
context._remote_sub_storage, context._remote_sub_storage,
signature_allDocs, signature_allDocs,
{ {
...@@ -993,7 +1028,8 @@ ...@@ -993,7 +1028,8 @@
if (context._check_remote_modification || if (context._check_remote_modification ||
context._check_remote_creation || context._check_remote_creation ||
context._check_remote_deletion) { context._check_remote_deletion) {
return pushStorage(context._remote_sub_storage, return pushStorage(context, skip_document_dict,
context._remote_sub_storage,
context._local_sub_storage, context._local_sub_storage,
signature_allDocs, { signature_allDocs, {
use_revert_post: context._use_remote_post, use_revert_post: context._use_remote_post,
...@@ -1022,17 +1058,19 @@ ...@@ -1022,17 +1058,19 @@
return context._signature_sub_storage.allDocs() return context._signature_sub_storage.allDocs()
.push(function (result) { .push(function (result) {
var i, var i,
argument_list = [], local_argument_list = [],
len = result.data.total_rows; len = result.data.total_rows;
for (i = 0; i < len; i += 1) { for (i = 0; i < len; i += 1) {
argument_list.push( local_argument_list.push(
[undefined, result.data.rows[i].id] [undefined, context, result.data.rows[i].id,
skip_document_dict]
); );
} }
return dispatchQueue( return dispatchQueue(
context,
repairDocument, repairDocument,
argument_list, local_argument_list,
context._parallel_operation_amount context._parallel_operation_amount
); );
}); });
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment