Commit b2bad94a authored by ben's avatar ben

Changed IterTreeReducer so other code uses it by subclassing

(instead of by passing various functions to its initializer)


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@64 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent c4e8d74c
...@@ -196,148 +196,85 @@ class IterTreeReducer: ...@@ -196,148 +196,85 @@ class IterTreeReducer:
iterator nature of the connection between hosts and the temporal iterator nature of the connection between hosts and the temporal
order in which the files are processed. order in which the files are processed.
The elements of the iterator are required to have a tuple-style There are three stub functions below: start_process, end_process,
.index, called "indexed elem" below. and branch_process. A class that subclasses this one should fill
in these functions with real values.
""" It is important that this class be pickable, so keep that in mind
def __init__(self, base_init, branch_reducer, when subclassing (this is used to resume failed sessions).
branch_base, base_final, initial_state = None):
"""ITR initializer
base_init is a function of one argument, an indexed elem. It
is called immediately on any elem in the iterator. It should
return some value type A.
branch_reducer and branch_base are used to form a value on a
bunch of reduced branches, in the way that a linked list of
type C can be folded to form a value type B.
base_final is called when leaving a tree. It takes three
arguments, the indexed elem, the output (type A) of base_init,
the output of branch_reducer on all the branches (type B) and
returns a value type C.
"""
self.base_init = base_init
self.branch_reducer = branch_reducer
self.base_final = base_final
self.branch_base = branch_base
if initial_state: self.setstate(initial_state)
else:
self.state = IterTreeReducerState(branch_base)
self.subreducer = None
def setstate(self, state):
"""Update with new state, recursive if necessary"""
self.state = state
if state.substate: self.subreducer = self.newinstance(state.substate)
else: self.subreducer = None
def getstate(self): return self.state """
def __init__(self, *args):
def getresult(self): """ITR initializer"""
"""Return results of calculation""" self.init_args = args
if not self.state.calculated: self.calculate_final_val() self.index = None
return self.state.final_val self.subinstance = None
self.finished = None
def intree(self, index): def intree(self, index):
"""Return true if index is still in current tree""" """Return true if index is still in current tree"""
return self.state.base_index == index[:len(self.state.base_index)] return self.base_index == index[:len(self.base_index)]
def newinstance(self, state = None): def set_subinstance(self):
"""Return reducer of same type as self """Return subinstance of same type as self"""
self.subinstance = self.__class__(*self.init_args)
If state is None, sets substate of self.state, otherwise
assume this is already set. def process_w_subinstance(self, args):
"""Give object to subinstance, if necessary update branch_val"""
""" if not self.subinstance: self.set_subinstance()
new = self.__class__(self.base_init, self.branch_reducer, if not self.subinstance(*args):
self.branch_base, self.base_final, state) self.branch_process(self.subinstance)
if state is None: self.state.substate = new.state self.set_subinstance()
return new assert self.subinstance(*args)
def process_w_subreducer(self, indexed_elem): def start_process(self, *args):
"""Give object to subreducer, if necessary update branch_val""" """Do some initial processing (stub)"""
if not self.subreducer: pass
self.subreducer = self.newinstance()
if not self.subreducer(indexed_elem): def end_process(self):
self.state.branch_val = self.branch_reducer(self.state.branch_val, """Do any final processing before leaving branch (stub)"""
self.subreducer.getresult()) pass
self.subreducer = self.newinstance()
assert self.subreducer(indexed_elem) def branch_process(self, subinstance):
"""Process a branch right after it is finished (stub)"""
def calculate_final_val(self): pass
"""Set final value"""
if self.subreducer: def Finish(self):
self.state.branch_val = self.branch_reducer(self.state.branch_val, """Call at end of sequence to tie everything up"""
self.subreducer.getresult()) assert not self.finished
if self.state.current_index is None: if self.subinstance:
# No input, set None as default value self.subinstance.Finish()
self.state.final_val = None self.branch_process(self.subinstance)
else: self.end_process()
self.state.final_val = self.base_final(self.state.base_elem, self.finished = 1
self.state.base_init_val,
self.state.branch_val) def __call__(self, *args):
self.state.calculated = 1 """Process args, where args[0] is current position in iterator
def __call__(self, indexed_elem): Returns true if args successfully processed, false if index is
"""Process elem, current position in iterator
Returns true if elem successfully processed, false if elem is
not in the current tree and thus the final result is not in the current tree and thus the final result is
available. available.
Also note below we set self.index after doing the necessary
start processing, in case there is a crash in the middle.
""" """
index = indexed_elem.index index = args[0]
assert type(index) is types.TupleType assert type(index) is types.TupleType
if self.state.current_index is None: # must be at base if self.index is None:
self.state.base_init_val = self.base_init(indexed_elem) self.start_process(*args)
# Do most crash-prone op first, so we don't leave inconsistent self.index = self.base_index = index
self.state.current_index = index
self.state.base_index = index
self.state.base_elem = indexed_elem
return 1 return 1
elif not index > self.state.current_index:
Log("Warning: oldindex %s >= newindex %s" % if index <= self.index:
(self.state.current_index, index), 2) Log("Warning: oldindex %s >= newindex %s" % (self.index, index), 2)
if not self.intree(index): if not self.intree(index):
self.calculate_final_val() self.Finish()
return None return None
else: else:
self.process_w_subreducer(indexed_elem) self.process_w_subinstance(args)
self.state.current_index = index self.index = index
return 1 return 1
class IterTreeReducerState:
"""Holds the state for IterTreeReducers
An IterTreeReducer cannot be pickled directly because it holds
some anonymous functions. This class contains the relevant data
that is likely to be picklable, so the ITR can be saved and loaded
if the associated functions are known.
"""
def __init__(self, branch_base):
"""ITRS initializer
Class variables:
self.current_index - last index processing started on, or None
self.base_index - index of first element processed
self.base_elem - first element processed
self.branch_val - default branch reducing value
self.calculated - true iff the final value has been calculated
self.base_init_val - return value of base_init function
self.final_val - Final value once it's calculated
self.substate - IterTreeReducerState when subreducer active
"""
self.current_index = None
self.calculated = None
self.branch_val = branch_base
self.substate = None
...@@ -196,148 +196,85 @@ class IterTreeReducer: ...@@ -196,148 +196,85 @@ class IterTreeReducer:
iterator nature of the connection between hosts and the temporal iterator nature of the connection between hosts and the temporal
order in which the files are processed. order in which the files are processed.
The elements of the iterator are required to have a tuple-style There are three stub functions below: start_process, end_process,
.index, called "indexed elem" below. and branch_process. A class that subclasses this one should fill
in these functions with real values.
""" It is important that this class be pickable, so keep that in mind
def __init__(self, base_init, branch_reducer, when subclassing (this is used to resume failed sessions).
branch_base, base_final, initial_state = None):
"""ITR initializer
base_init is a function of one argument, an indexed elem. It
is called immediately on any elem in the iterator. It should
return some value type A.
branch_reducer and branch_base are used to form a value on a
bunch of reduced branches, in the way that a linked list of
type C can be folded to form a value type B.
base_final is called when leaving a tree. It takes three
arguments, the indexed elem, the output (type A) of base_init,
the output of branch_reducer on all the branches (type B) and
returns a value type C.
"""
self.base_init = base_init
self.branch_reducer = branch_reducer
self.base_final = base_final
self.branch_base = branch_base
if initial_state: self.setstate(initial_state)
else:
self.state = IterTreeReducerState(branch_base)
self.subreducer = None
def setstate(self, state):
"""Update with new state, recursive if necessary"""
self.state = state
if state.substate: self.subreducer = self.newinstance(state.substate)
else: self.subreducer = None
def getstate(self): return self.state """
def __init__(self, *args):
def getresult(self): """ITR initializer"""
"""Return results of calculation""" self.init_args = args
if not self.state.calculated: self.calculate_final_val() self.index = None
return self.state.final_val self.subinstance = None
self.finished = None
def intree(self, index): def intree(self, index):
"""Return true if index is still in current tree""" """Return true if index is still in current tree"""
return self.state.base_index == index[:len(self.state.base_index)] return self.base_index == index[:len(self.base_index)]
def newinstance(self, state = None): def set_subinstance(self):
"""Return reducer of same type as self """Return subinstance of same type as self"""
self.subinstance = self.__class__(*self.init_args)
If state is None, sets substate of self.state, otherwise
assume this is already set. def process_w_subinstance(self, args):
"""Give object to subinstance, if necessary update branch_val"""
""" if not self.subinstance: self.set_subinstance()
new = self.__class__(self.base_init, self.branch_reducer, if not self.subinstance(*args):
self.branch_base, self.base_final, state) self.branch_process(self.subinstance)
if state is None: self.state.substate = new.state self.set_subinstance()
return new assert self.subinstance(*args)
def process_w_subreducer(self, indexed_elem): def start_process(self, *args):
"""Give object to subreducer, if necessary update branch_val""" """Do some initial processing (stub)"""
if not self.subreducer: pass
self.subreducer = self.newinstance()
if not self.subreducer(indexed_elem): def end_process(self):
self.state.branch_val = self.branch_reducer(self.state.branch_val, """Do any final processing before leaving branch (stub)"""
self.subreducer.getresult()) pass
self.subreducer = self.newinstance()
assert self.subreducer(indexed_elem) def branch_process(self, subinstance):
"""Process a branch right after it is finished (stub)"""
def calculate_final_val(self): pass
"""Set final value"""
if self.subreducer: def Finish(self):
self.state.branch_val = self.branch_reducer(self.state.branch_val, """Call at end of sequence to tie everything up"""
self.subreducer.getresult()) assert not self.finished
if self.state.current_index is None: if self.subinstance:
# No input, set None as default value self.subinstance.Finish()
self.state.final_val = None self.branch_process(self.subinstance)
else: self.end_process()
self.state.final_val = self.base_final(self.state.base_elem, self.finished = 1
self.state.base_init_val,
self.state.branch_val) def __call__(self, *args):
self.state.calculated = 1 """Process args, where args[0] is current position in iterator
def __call__(self, indexed_elem): Returns true if args successfully processed, false if index is
"""Process elem, current position in iterator
Returns true if elem successfully processed, false if elem is
not in the current tree and thus the final result is not in the current tree and thus the final result is
available. available.
Also note below we set self.index after doing the necessary
start processing, in case there is a crash in the middle.
""" """
index = indexed_elem.index index = args[0]
assert type(index) is types.TupleType assert type(index) is types.TupleType
if self.state.current_index is None: # must be at base if self.index is None:
self.state.base_init_val = self.base_init(indexed_elem) self.start_process(*args)
# Do most crash-prone op first, so we don't leave inconsistent self.index = self.base_index = index
self.state.current_index = index
self.state.base_index = index
self.state.base_elem = indexed_elem
return 1 return 1
elif not index > self.state.current_index:
Log("Warning: oldindex %s >= newindex %s" % if index <= self.index:
(self.state.current_index, index), 2) Log("Warning: oldindex %s >= newindex %s" % (self.index, index), 2)
if not self.intree(index): if not self.intree(index):
self.calculate_final_val() self.Finish()
return None return None
else: else:
self.process_w_subreducer(indexed_elem) self.process_w_subinstance(args)
self.state.current_index = index self.index = index
return 1 return 1
class IterTreeReducerState:
"""Holds the state for IterTreeReducers
An IterTreeReducer cannot be pickled directly because it holds
some anonymous functions. This class contains the relevant data
that is likely to be picklable, so the ITR can be saved and loaded
if the associated functions are known.
"""
def __init__(self, branch_base):
"""ITRS initializer
Class variables:
self.current_index - last index processing started on, or None
self.base_index - index of first element processed
self.base_elem - first element processed
self.branch_val - default branch reducing value
self.calculated - true iff the final value has been calculated
self.base_init_val - return value of base_init function
self.final_val - Final value once it's calculated
self.substate - IterTreeReducerState when subreducer active
"""
self.current_index = None
self.calculated = None
self.branch_val = branch_base
self.substate = None
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment