Commit c0a7c667 authored by Vincent Bechu's avatar Vincent Bechu Committed by Vincent Bechu

Replicatestorage parallel operation

/reviewed-on nexedi/jio!48
parent fa945210
......@@ -62,6 +62,12 @@
});
this._use_remote_post = spec.use_remote_post || false;
// Number of request we allow browser execution for attachments
this._parallel_operation_attachment_amount =
spec.parallel_operation_attachment_amount || 1;
// Number of request we allow browser execution for documents
this._parallel_operation_amount =
spec.parallel_operation_amount || 1;
this._conflict_handling = spec.conflict_handling || 0;
// 0: no resolution (ie, throw an Error)
......@@ -204,6 +210,30 @@
// Do not sync the signature document
skip_document_dict[context._signature_hash] = null;
function dispatchQueue(function_used, argument_list, number_queue) {
var result_promise_list = [],
i;
function pushAndExecute(queue) {
queue
.push(function () {
if (argument_list.length > 0) {
var argument_array = argument_list.shift();
argument_array[0] = queue;
function_used.apply(context, argument_array);
pushAndExecute(queue);
}
});
}
for (i = 0; i < number_queue; i += 1) {
result_promise_list.push(new RSVP.Queue());
pushAndExecute(result_promise_list[i]);
}
if (number_queue > 1) {
return RSVP.all(result_promise_list);
}
return result_promise_list[0];
}
function propagateAttachmentDeletion(skip_attachment_dict,
destination,
......@@ -784,7 +814,7 @@
})
.push(function (result_list) {
var i,
sub_queue = new RSVP.Queue();
argument_list = [];
function getResult(j) {
return function (id) {
......@@ -796,20 +826,26 @@
}
for (i = 0; i < result_list.length; i += 1) {
checkSignatureDifference(sub_queue, source, destination,
argument_list[i] = [undefined, source, destination,
id_list[i].parameter_list[0],
conflict_force, conflict_revert,
conflict_ignore,
document_status_list[i].is_creation,
document_status_list[i].is_modification,
getResult(i), options);
getResult(i), options];
}
return sub_queue;
return dispatchQueue(
checkSignatureDifference,
argument_list,
options.operation_amount
);
});
}
function pushStorage(source, destination, options) {
var queue = new RSVP.Queue();
var queue = new RSVP.Queue(),
argument_list = [],
argument_list_deletion = [];
if (!options.hasOwnProperty("use_post")) {
options.use_post = false;
}
......@@ -846,6 +882,7 @@
signature_dict[result_list[1].data.rows[i].id] = i;
}
}
i = 0;
for (key in local_dict) {
if (local_dict.hasOwnProperty(key)) {
is_modification = signature_dict.hasOwnProperty(key)
......@@ -863,29 +900,50 @@
is_modification: is_modification
});
} else {
checkSignatureDifference(queue, source, destination, key,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
is_creation, is_modification,
source.get.bind(source),
options);
argument_list[i] = [undefined, source, destination,
key,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
is_creation, is_modification,
source.get.bind(source),
options];
i += 1;
}
}
}
}
queue
.push(function () {
return dispatchQueue(
checkSignatureDifference,
argument_list,
options.operation_amount
);
});
if (options.check_deletion === true) {
i = 0;
for (key in signature_dict) {
if (signature_dict.hasOwnProperty(key)) {
if (!local_dict.hasOwnProperty(key)) {
checkLocalDeletion(queue, destination, key, source,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
options);
argument_list_deletion[i] = [undefined,
destination, key,
source,
options.conflict_force,
options.conflict_revert,
options.conflict_ignore,
options];
i += 1;
}
}
}
queue.push(function () {
return dispatchQueue(
checkLocalDeletion,
argument_list_deletion,
options.operation_amount
);
});
}
if ((options.use_bulk_get === true) && (document_list.length !== 0)) {
checkBulkSignatureDifference(queue, source, destination,
......@@ -898,6 +956,12 @@
});
}
function repairDocument(queue, id) {
queue.push(function () {
return repairDocumentAttachment(id);
});
}
return new RSVP.Queue()
.push(function () {
// Ensure that the document storage is usable
......@@ -950,7 +1014,8 @@
CONFLICT_CONTINUE),
check_modification: context._check_local_modification,
check_creation: context._check_local_creation,
check_deletion: context._check_local_deletion
check_deletion: context._check_local_deletion,
operation_amount: context._parallel_operation_amount
});
}
})
......@@ -981,7 +1046,8 @@
CONFLICT_CONTINUE),
check_modification: context._check_remote_modification,
check_creation: context._check_remote_creation,
check_deletion: context._check_remote_deletion
check_deletion: context._check_remote_deletion,
operation_amount: context._parallel_operation_amount
});
}
})
......@@ -997,19 +1063,19 @@
return context._signature_sub_storage.allDocs()
.push(function (result) {
var i,
repair_document_queue = new RSVP.Queue();
function repairDocument(id) {
repair_document_queue
.push(function () {
return repairDocumentAttachment(id);
});
}
argument_list = [],
len = result.data.total_rows;
for (i = 0; i < result.data.total_rows; i += 1) {
repairDocument(result.data.rows[i].id);
for (i = 0; i < len; i += 1) {
argument_list.push(
[undefined, result.data.rows[i].id]
);
}
return repair_document_queue;
return dispatchQueue(
repairDocument,
argument_list,
context._parallel_operation_attachment_amount
);
});
}
});
......
/*jslint nomen: true*/
/*global Blob*/
(function (jIO, QUnit, Blob) {
/*global Blob, console, RSVP*/
(function (jIO, QUnit, Blob, RSVP) {
"use strict";
var test = QUnit.test,
stop = QUnit.stop,
......@@ -92,6 +92,8 @@
deepEqual(jio.__storage._query_options, {});
equal(jio.__storage._use_remote_post, false);
equal(jio.__storage._conflict_handling, 0);
equal(jio.__storage._parallel_operation_attachment_amount, 1);
equal(jio.__storage._parallel_operation_amount, 1);
equal(jio.__storage._check_local_creation, true);
equal(jio.__storage._check_local_deletion, true);
equal(jio.__storage._check_local_modification, true);
......@@ -136,6 +138,8 @@
query: {query: 'portal_type: "Foo"', limit: [0, 1234567890]},
use_remote_post: true,
conflict_handling: 3,
parallel_operation_attachment_amount: 2713,
parallel_operation_amount: 2711,
check_local_creation: false,
check_local_deletion: false,
check_local_modification: false,
......@@ -156,6 +160,8 @@
);
equal(jio.__storage._use_remote_post, true);
equal(jio.__storage._conflict_handling, 3);
equal(jio.__storage._parallel_operation_attachment_amount, 2713);
equal(jio.__storage._parallel_operation_amount, 2711);
equal(jio.__storage._check_local_creation, false);
equal(jio.__storage._check_local_deletion, false);
equal(jio.__storage._check_local_modification, false);
......@@ -4328,6 +4334,93 @@
});
});
test("use 2 queue in parallel", function () {
stop();
expect(1);
var context = this,
start_sync = [],
sync_pause;
function Storage2711() {
this._sub_storage = jIO.createJIO({type: "memory"});
return this;
}
Storage2711.prototype.put = function (id, doc) {
this._sub_storage.put(id, doc);
return id;
};
Storage2711.prototype.get = function (id) {
var storage = this;
start_sync[id] = true;
return ((id === "0") ? RSVP.delay(500) : RSVP.delay(100))
.then(function () {
if (id === "2") {
sync_pause = start_sync.toString();
}
start_sync[id] = false;
return storage._sub_storage.get(id);
});
};
Storage2711.prototype.buildQuery = function () {
return this._sub_storage.buildQuery.apply(this._sub_storage, arguments);
};
Storage2711.prototype.bulk = function () {
return this._sub_storage.bulk.apply(this._sub_storage, arguments);
};
Storage2711.prototype.hasCapacity = function () {
return this._sub_storage.hasCapacity.apply(this._sub_storage, arguments);
};
jIO.addStorage(
'parallel',
Storage2711
);
this.jio = jIO.createJIO({
type: "replicate",
conflict_handling: 1,
check_local_creation: true,
check_local_modification: true,
parallel_operation_amount: 2,
local_sub_storage: {
type: "uuid",
sub_storage: {
type: "memory"
}
},
remote_sub_storage: {
type: "parallel"
}
});
return context.jio.put("0", {"title": "foo"})
.push(function () {
return context.jio.put("1", {"title": "foo1"});
})
.push(function () {
return context.jio.put("2", {"title": "foo2"});
})
.push(function () {
return context.jio.put("3", {"title": "foo3"});
})
.push(function () {
return context.jio.repair();
})
.then(function () {
equal(sync_pause, "true,false,true", "rigth order");
})
.fail(function (error) {
ok(false, error);
})
.always(function () {
start();
});
});
/////////////////////////////////////////////////////////////////
// attachment replication
/////////////////////////////////////////////////////////////////
......@@ -7410,4 +7503,123 @@
});
});
}(jIO, QUnit, Blob));
test("use 2 queue in parallel", function () {
stop();
expect(1);
var context = this,
start_sync = [],
sync_pause;
function Storage2713() {
this._sub_storage = jIO.createJIO({type: "memory"});
return this;
}
Storage2713.prototype.put = function (id, doc) {
this._sub_storage.put(id, doc);
return id;
};
Storage2713.prototype.get = function (id) {
console.log("get", id);
return this._sub_storage.get(id);
};
Storage2713.prototype.getAttachment = function (id) {
var storage = this,
argument_list = arguments;
start_sync[id] = true;
return ((id === "0") ? RSVP.delay(500) : RSVP.delay(100))
.then(function () {
if (id === "2") {
sync_pause = start_sync.toString();
}
start_sync[id] = false;
return storage._sub_storage.getAttachment.apply(
storage._sub_storage,
argument_list
);
});
};
Storage2713.prototype.putAttachment = function () {
return this._sub_storage.putAttachment.apply(this._sub_storage,
arguments);
};
Storage2713.prototype.removeAttachment = function () {
return this._sub_storage.removeAttachment.apply(this._sub_storage,
arguments);
};
Storage2713.prototype.allAttachments = function () {
return this._sub_storage.allAttachments.apply(this._sub_storage,
arguments);
};
Storage2713.prototype.buildQuery = function () {
return this._sub_storage.buildQuery.apply(this._sub_storage, arguments);
};
Storage2713.prototype.bulk = function () {
return this._sub_storage.bulk.apply(this._sub_storage, arguments);
};
Storage2713.prototype.hasCapacity = function () {
return this._sub_storage.hasCapacity.apply(this._sub_storage, arguments);
};
jIO.addStorage(
'parallel_attachment',
Storage2713
);
this.jio = jIO.createJIO({
type: "replicate",
conflict_handling: 1,
check_local_attachment_creation: true,
check_local_attachment_modification: true,
parallel_operation_attachment_amount: 2,
local_sub_storage: {
type: "uuid",
sub_storage: {
type: "memory"
}
},
remote_sub_storage: {
type: "parallel_attachment"
}
});
return context.jio.put("0", {"title": "foo"})
.push(function () {
return context.jio.putAttachment("0", "foo", new Blob(["0"]));
})
.push(function () {
return context.jio.put("1", {"title": "foo1"});
})
.push(function () {
return context.jio.putAttachment("1", "foo", new Blob(["1"]));
})
.push(function () {
return context.jio.put("2", {"title": "foo2"});
})
.push(function () {
return context.jio.putAttachment("2", "foo", new Blob(["2"]));
})
.push(function () {
return context.jio.put("3", {"title": "foo3"});
})
.push(function () {
return context.jio.putAttachment("3", "foo", new Blob(["3"]));
})
.push(function () {
return context.jio.repair();
})
.then(function () {
equal(sync_pause, "true,false,true", "rigth order");
})
.fail(function (error) {
ok(false, error);
})
.always(function () {
start();
});
});
}(jIO, QUnit, Blob, RSVP));
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment