From 34ecf9158812828c1e8a0c8ecc40528a9d93f276 Mon Sep 17 00:00:00 2001
From: Tristan Cavelier <tristan.cavelier@tiolive.com>
Date: Mon, 28 Apr 2014 10:52:59 +0200
Subject: [PATCH] replicate synchronizes on allDocs

---
 src/jio.storage/replicatestorage.js | 229 ++++++++++++++++++++--------
 1 file changed, 167 insertions(+), 62 deletions(-)

diff --git a/src/jio.storage/replicatestorage.js b/src/jio.storage/replicatestorage.js
index 283825f..e58f501 100644
--- a/src/jio.storage/replicatestorage.js
+++ b/src/jio.storage/replicatestorage.js
@@ -44,9 +44,7 @@
     resolve = require('rsvp').resolve,
     addStorageFunction = require('jio').addStorage,
     uniqueJSONStringify = require('jio').util.uniqueJSONStringify,
-    forEach = require('jio').util.forEach,
     chain = resolve(),
-    resolved = chain,
     cache = {};
 
   function logandreturn() {
@@ -65,6 +63,52 @@
     return promise.then(null, function (reason) { return reason; });
   }
 
+  //////////////////////////////////////////////////////////////////////
+
+  function RowFIFO() {
+    this.index = 0;
+    this.end = 0;
+    this.length = 0;
+    this.ids = {};
+  }
+
+  RowFIFO.prototype.extend = function (array) {
+    var i, l, value;
+    for (i = 0, l = array.length; i < l; i += 1) {
+      value = array[i];
+      if (this.ids[value.id]) { return; }
+      this.ids[value.id] = true;
+      this[this.end] = value;
+      this.end += 1;
+      this.length += 1;
+    }
+    return this;
+  };
+
+  RowFIFO.prototype.push = function () {
+    this.extend([].slice.call(arguments));
+    return this.length;
+  };
+
+  RowFIFO.prototype.shift = function () {
+    if (this.index >= this.end) { return; }
+    this.length -= 1;
+    var val = this[this.index];
+    delete this[this.index];
+    delete this.ids[val.id];
+    this.index += 1;
+    return val;
+  };
+
+  function exportAllDocsRowsToFIFO(this_storage, allDocs) {
+    var fifo;
+    fifo = this_storage._cache.rowsToSynchronize =
+      this_storage._cache.rowsToSynchronize || new RowFIFO();
+    fifo.extend(allDocs.data.rows);
+  }
+
+  //////////////////////////////////////////////////////////////////////
+
   /**
    *     firstFulfilled(promises): promises< last_fulfilment_value >
    *
@@ -151,6 +195,36 @@
     }, onCancel);
   }
 
+  function arrayShifter(array, callback) {
+    var cancelled, p1 = resolve(), p2;
+    return new Promise(function (done, fail, notify) {
+      var value;
+      function next() {
+        if (array.length) {
+          try {
+            value = callback.call(null, array.shift(), array);
+          } catch (e) {
+            return fail(e);
+          }
+          if (cancelled) { return; }
+          if (value && typeof value.then === "function") {
+            p1 = value;
+            p2 = value.then(next, fail, notify);
+          } else {
+            p2 = p2.then(next, fail, notify);
+          }
+          return;
+        }
+        done();
+      }
+      p2 = p1.then(next);
+    }, function () {
+      cancelled = true;
+      if (typeof p1.cancel === "function") { p1.cancel(); }
+      if (typeof p2.cancel === "function") { p2.cancel(); }
+    });
+  }
+
   // //////////////////////////////////////////////////////////////////////
 
   // /**
@@ -237,13 +311,12 @@
     }
   }
 
-  ReplicateStorage.prototype.syncAllDocs = function (command, alldocs) {
-    if (this._cache.syncAllDocs) {
-      return this._cache.syncAllDocs;
+  ReplicateStorage.prototype.syncRowFIFO = function (command) {
+    if (this._cache.syncRowFIFO) {
+      return this._cache.syncRowFIFO;
     }
-    console.log('syncing');
 
-    var storage_list = this._storage_list, it = this, cache_storage;
+    var storage_list = this._storage_list, it = this, cache_storage, p;
     if (this._cache_storage) {
       cache_storage = command.storage(this._cache_storage);
     }
@@ -252,18 +325,19 @@
       return command.storage(description);
     });
 
-
-    function returnThe404ReasonsElseNull(reason) {
-      if (reason.status === 404) {
-        return 404;
-      }
-      return null;
+    function doNothing() {
+      return;
     }
 
     function getSubStoragesDocument(id) {
       return all(storage_list.map(function (storage) {
-        return storage.get({"_id": id}).
-          then(null, returnThe404ReasonsElseNull);
+        return success(storage.get({"_id": id}));
+      }));
+    }
+
+    function removeSubStorageDocuments(id) {
+      return all(storage_list.map(function (storage) {
+        return success(storage.remove({"_id": id}));
       }));
     }
 
@@ -271,22 +345,34 @@
       return it.syncGetAnswerList(command, answers);
     }
 
+    function is404Answer(answer) {
+      return answer.status === 404;
+    }
+
+    function isSuccessAnswer(answer) {
+      return answer.result === "success";
+    }
+
     function checkAnswers(id, answers) {
       if (cache_storage) {
-        if (answers.every(function (answer) {
-          return answer.status === 404;
-        })) {
+        if (answers.every(is404Answer)) {
           cache_storage.remove({"_id": id});
-        } else if (answers.every(function (answer) {
-          return answer.result === "success";
-        })) {
+        } else if (answers.every(isSuccessAnswer)) {
           cache_storage.remove({"_id": id});
         }
       }
     }
 
     function deleteCache() {
-      delete it._cache.syncAllDocs;
+      delete it._cache.syncRowFIFO;
+    }
+
+    if (cache_storage) {
+      p = cache_storage.allDocs().then(function (answer) {
+        exportAllDocsRowsToFIFO(it, answer);
+      });
+    } else {
+      p = chain;
     }
 
     /*
@@ -300,33 +386,34 @@
      * - a ko > b ok
      * - a ko > b ko
      */
-    this._cache.syncAllDocs =
-      forEach(alldocs.data.rows, function (row) {
+    p = p.then(function () {
+      return arrayShifter(it._cache.rowsToSynchronize, function (row) {
+        if (cache_storage) {
+          return cache_storage.get({"_id": row.id}).then(function (answer) {
+            if (answer.data.state === "Deleted") {
+              return removeSubStorageDocuments(row.id).
+                then(checkAnswers.bind(null, row.id), doNothing);
+            }
+            return getSubStoragesDocument(row.id).
+              then(synchronizeDocument).
+              then(checkAnswers.bind(null, row.id), doNothing);
+          }, function (reason) {
+            if (reason.status === 404) {
+              return getSubStoragesDocument(row.id).
+                then(synchronizeDocument).
+                then(checkAnswers.bind(null, row.id), doNothing);
+            }
+            throw reason;
+          });
+        }
         return getSubStoragesDocument(row.id).
           then(synchronizeDocument).
-          then(checkAnswers.bind(null, row.id));
+          then(null, doNothing);
       });
-    if (cache_storage) {
-      this._cache.syncAllDocs = this._cache.syncAllDocs.then(function () {
-        return cache_storage.allDocs({"include_docs": true});
-      }).then(function (answers) {
-        console.log(answers);
-        forEach(answers.data.rows, function (row) {
-          if (row.doc.state === "Deleted") {
-            return it._remove(command, {"_id": row.id}).
-              then(null, returnThe404ReasonsElseNull).
-              then(function () {
-                cache_storage.remove({"_id": row.id});
-              }).then(null, function () { return; }); // ignore error
-          }
-          return getSubStoragesDocument(row.id).
-            then(synchronizeDocument).
-            then(checkAnswers.bind(null, row.id));
-        });
-      }).then(null, function () { return; }); // ignore error
-    }
-    this._cache.syncAllDocs.then(deleteCache, deleteCache);
-    return this._cache.syncAllDocs;
+    });
+    p.then(deleteCache, deleteCache);
+    this._cache.syncRowFIFO = p;
+    return p;
   };
 
   ReplicateStorage.prototype.syncGetAnswerList = function (command,
@@ -336,7 +423,7 @@
     /*jslint continue: true */
     for (i = 0, l = answer_list.length; i < l; i += 1) {
       answer = answer_list[i];
-      if (!answer || answer === 404) { continue; }
+      if (!answer || answer.result !== "success") { continue; }
       if (!winner) {
         winner = answer;
         winner_index = i;
@@ -361,9 +448,15 @@
     // document synchronisation
     for (i = 0, l = answer_list.length; i < l; i += 1) {
       answer = answer_list[i];
-      if (!answer) { continue; }
-      if (i === winner_index) { continue; }
-      if (answer === 404) {
+      if (!answer) {
+        promise_list.push(resolve({"status": 0}));
+        continue;
+      }
+      if (i === winner_index) {
+        promise_list.push(resolve({"result": "success"}));
+        continue;
+      }
+      if (answer.status === 404) {
         delete winner._id;
         promise_list.push(success(
           command.storage(this._storage_list[i]).post(winner)
@@ -373,14 +466,18 @@
         // resolving the get method.
         continue;
       }
-      delete answer._attachments;
-      if (uniqueJSONStringify(answer.data) !== winner_str) {
-        promise_list.push(success(
-          command.storage(this._storage_list[i]).put(winner)
-        ));
+      if (answer.result === "success") {
+        delete answer._attachments;
+        if (uniqueJSONStringify(answer.data) !== winner_str) {
+          promise_list.push(success(
+            command.storage(this._storage_list[i]).put(winner)
+          ));
+          continue;
+        }
+        promise_list.push(resolve({"result": "success"}));
         continue;
       }
-      promise_list.push(resolved);
+      promise_list.push(resolve({"status": 0}));
     }
     return all(promise_list);
     // XXX .then synchronize attachments
@@ -404,6 +501,13 @@
           if (typeof metadata._id !== "string" || metadata._id === "") {
             metadata._id = a.id;
           }
+          if (thiz._cache_storage && cache_promise === 0) {
+            // the metadata is set, but the cache needs to be updated
+            cache_promise = command.storage(thiz._cache_storage).put({
+              "_id": metadata._id,
+              "state": "Updated"
+            });
+          }
           done(a);
           return a;
         }, function (e) {
@@ -413,6 +517,8 @@
               "_id": metadata._id,
               "state": "Updated"
             });
+          } else {
+            cache_promise = 0;
           }
           error_count += 1;
           if (error_count === promises.length) {
@@ -660,7 +766,7 @@
   };
 
   ReplicateStorage.prototype._allDocs = function (command, param, option) {
-    var promise_list = [], index, length = this._storage_list.length;
+    var promise_list = [], index, me = this, length = me._storage_list.length;
     for (index = 0; index < length; index += 1) {
       promise_list[index] =
         command.storage(this._storage_list[index]).allDocs(option);
@@ -696,16 +802,15 @@
         }
       }
       return {"data": {"total_rows": (rows || []).length, "rows": rows || []}};
+    }).then(function (answer) {
+      exportAllDocsRowsToFIFO(me, answer);
+      me.syncRowFIFO(command);
+      return answer;
     });
   };
 
   ReplicateStorage.prototype.allDocs = function (command, param, option) {
-    var this_ = this;
     return this._allDocs(command, param, option).
-      then(function (answer) {
-        this_.syncAllDocs(command, answer);
-        return answer;
-      }).
       then(command.success, command.error, command.notify);
   };
 
-- 
2.30.9