Commit 09ecdfa0 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Remove PartitionTable.setRow() as it is meaningless. Fix more bugs with...

Remove PartitionTable.setRow() as it is meaningless. Fix more bugs with pyflakes. Add FIXME comments to neo/client/handler.py. The real fix needs a confirmation by aurel.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@80 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f6938688
from Queue import Queue
from threading import Lock
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.utils import p64, u64, cp, z64
from thread import get_ident
from neo.client.dispatcher import Dispatcher
from neo.event import EventManager
import logging
class NEOStorageError(POSException.StorageError):
pass
......
import logging
import os
from time import time
from threading import Lock, Condition, Thread, local
from threading import Lock, local
from cPickle import dumps, loads
from zlib import compress, adler32, decompress
from Queue import Queue, Empty
from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode
from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import ListeningConnection, ClientConnection
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, CLIENT_NODE_TYPE, \
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
from neo.client.handler import ClientEventHandler
from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageNotFoundError
from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn
from ZODB.POSException import UndoError
from ZODB.POSException import UndoError, StorageTransactionError
class ConnectionManager(object):
......@@ -337,7 +340,7 @@ class Application(ThreadingMixIn, object):
# Uncompress data
if compression:
data = decompressed(data)
data = decompress(data)
# Put in cache only when using load
if cache:
......@@ -370,7 +373,7 @@ class Application(ThreadingMixIn, object):
return self._load(oid, serial)[:2], None
def loadBefore(oid, tid):
def loadBefore(self, oid, tid):
"""Load an object for a given oid before tid committed."""
# Do not try in cache as it managed only up-to-date object
return self._load(oid, tid)
......@@ -403,7 +406,7 @@ class Application(ThreadingMixIn, object):
def store(self, oid, serial, data, version, transaction):
"""Store object."""
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
raise StorageTransactionError(self, transaction)
# Find which storage node to use
partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
......@@ -431,7 +434,7 @@ class Application(ThreadingMixIn, object):
# remove from dict and raise ConflictError, don't care of
# previous node which already store data as it would be resent
# again if conflict is resolved or txn will be aborted
self.txn_data_dict.pop(oid)
del self.txn_data_dict[oid]
raise NEOStorageConflictError(self.object_stored[1])
# Store object in tmp cache
......@@ -442,10 +445,11 @@ class Application(ThreadingMixIn, object):
def tpc_vote(self, transaction):
"""Store current transaction."""
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
raise StorageTransactionError(self, transaction)
user = transaction.user
desc = transaction.description
ext = dumps(transaction._extension)
oid_list = self.txn_data_dict.keys()
# Store data on each node
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
......@@ -468,7 +472,7 @@ class Application(ThreadingMixIn, object):
"""Clear some transaction parameters."""
self.tid = None
self.txn = None
self.txn_data_dict = {}
self.txn_data_dict.clear()
self.txn_voted = 0
self.txn_finished = 0
......@@ -480,7 +484,7 @@ class Application(ThreadingMixIn, object):
# Abort txn in node where objects were stored
aborted_node = {}
for oid in self.txn_oid_list:
for oid in self.self.txn_data_dict.iterkeys():
partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
......@@ -527,12 +531,12 @@ class Application(ThreadingMixIn, object):
# Wait for answer
self._waitMessage()
if self.txn_finished != 1:
raise NEOStorateError('tpc_finish failed')
raise NEOStorageError('tpc_finish failed')
# Update cache
self.cache_lock_acquire()
try:
for oid in self.txn_data_dict.keys:
for oid in self.txn_data_dict.iterkeys():
ddata = self.txn_data_dict[oid]
# Now serial is same as tid
self.cache[oid] = self.tid, ddata
......@@ -543,8 +547,8 @@ class Application(ThreadingMixIn, object):
def undo(self, transaction_id, txn, wrapper):
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
if transaction_id is not self.txn:
raise StorageTransactionError(self, transaction_id)
# First get transaction information from master node
partition_id = transaction_id % self.num_paritions
......@@ -555,7 +559,7 @@ class Application(ThreadingMixIn, object):
continue
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, tid)
p.askTransactionInformation(msg_id, transaction_id)
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
# Wait for answer
......@@ -596,14 +600,13 @@ class Application(ThreadingMixIn, object):
self.store(oid, self.tid, data, None, txn)
except NEOStorageConflictError, serial:
if serial <= self.tid:
new_data = wrapper.tryToResolveConflict(oid, self.tid, serial,
data)
new_data = wrapper.tryToResolveConflict(oid, self.tid,
serial, data)
if new_data is not None:
self.store(oid, self.tid, new_data, None, txn)
continue
raise POSException.ConflictError(oid=oid,
serials=(self.tid,
serial),data=data)
raise ConflictError(oid = oid, serials = (self.tid, serial),
data = data)
self.tpc_vote(txn)
self.tpc_finish(txn)
......@@ -710,7 +713,7 @@ class Application(ThreadingMixIn, object):
# Now that we have object informations, get txn informations
history_list = []
for serial, size in self.local_var.hisory[1]:
partition_id = tid % self.num_paritions
partition_id = serial % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
......
......@@ -2,11 +2,13 @@ import logging
from neo.handler import EventHandler
from neo.connection import ClientConnection
from neo.protocol import Packet, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, TEMPORARILY_DOWN_STATE, BROKEN_STATE
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
from neo.client.NEOStorage import NEOStorageError
from neo.exception import ElectionFailure
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
......@@ -214,6 +216,8 @@ class ClientEventHandler(EventHandler):
or app.primary_master_node.getUUID() != uuid:
return
# FIXME this part requires a serious fix. Look at
# neo/storage/verification.py for details.
for offset, node in row_list:
app.pt.setRow(offset, row)
else:
......@@ -284,6 +288,8 @@ class ClientEventHandler(EventHandler):
or app.primary_master_node.getUUID() != uuid:
return
# FIXME this part requires a serious fix. Look at
# neo/storage/verification.py for details.
for cell in cell_list:
app.pt.addNode(cell)
else:
......
......@@ -242,9 +242,6 @@ class PartitionTable(object):
return ()
return [(cell.getUUID(), cell.getState()) for cell in row]
def setRow(self, offset, row):
self.partition_list[offset] = row
def tweak(self):
"""Test if nodes are distributed uniformly. Otherwise, correct the partition
table."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment