1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
import os, sys
from time import time, gmtime
from struct import pack, unpack
from neo.config import ConfigurationManager
from neo import protocol
from neo.protocol import Packet, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
PENDING_STATE, INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
UUID_NAMESPACES, ADMIN_NODE_TYPE, BOOTING
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \
OperationFailure
from neo.master.identification import IdentificationEventHandler
from neo.master.administration import AdministrationEventHandler
from neo.master.election import ClientElectionEventHandler, ServerElectionEventHandler
from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler
from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler
from neo.master.secondary import PrimaryMasterEventHandler, SecondaryMasterEventHandler
from neo.master.pt import PartitionTable
from neo.util import dump
from neo.connector import getConnectorHandler
from neo.master import ENABLE_PENDING_NODES
class Application(object):
"""The master node application."""
def __init__(self, file, section):
config = ConfigurationManager(file, section)
self.connector_handler = getConnectorHandler(config.getConnector())
self.name = config.getName()
if len(self.name) == 0:
raise RuntimeError, 'cluster name must be non-empty'
self.server = config.getServer()
logging.debug('IP address is %s, port is %d', *(self.server))
# Exclude itself from the list.
self.master_node_list = [n for n in config.getMasterNodeList() if n != self.server]
logging.debug('master nodes are %s', self.master_node_list)
# Internal attributes.
self.em = EventManager()
self.nm = NodeManager()
# Partition table
replicas, partitions = config.getReplicas(), config.getPartitions()
if replicas < 0:
raise RuntimeError, 'replicas must be a positive integer'
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
replicas, partitions, self.name)
self.primary = None
self.primary_master_node = None
# Generate an UUID for self
self.uuid = self.getNewUUID(MASTER_NODE_TYPE)
# The last OID.
self.loid = INVALID_OID
# The last TID.
self.ltid = INVALID_TID
# The target node's uuid to request next.
self.target_uuid = None
def run(self):
"""Make sure that the status is sane and start a loop."""
for server in self.master_node_list:
self.nm.add(MasterNode(server = server))
# Make a listening port.
self.listening_conn = ListeningConnection(self.em, None,
addr = self.server, connector_handler = self.connector_handler)
self.cluster_state = BOOTING
# Start the election of a primary master node.
self.electPrimary()
# Start a normal operation.
while 1:
try:
if self.primary:
self.playPrimaryRole()
else:
self.playSecondaryRole()
raise RuntimeError, 'should not reach here'
except (ElectionFailure, PrimaryFailure):
# Forget all connections.
for conn in self.em.getConnectionList():
if not conn.isListeningConnection():
conn.close()
# Reelect a new primary master.
self.electPrimary(bootstrap = False)
def electPrimary(self, bootstrap = True):
"""Elect a primary master node.
The difficulty is that a master node must accept connections from
others while attempting to connect to other master nodes at the
same time. Note that storage nodes and client nodes may connect
to self as well as master nodes."""
logging.info('begin the election of a primary master')
self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set()
self.listening_conn.setHandler(ServerElectionEventHandler(self))
client_handler = ClientElectionEventHandler(self)
em = self.em
nm = self.nm
while 1:
t = 0
self.primary = None
self.primary_master_node = None
for node in nm.getMasterNodeList():
self.unconnected_master_node_set.add(node.getServer())
# For now, believe that every node should be available,
# since down or broken nodes may be already repaired.
node.setState(RUNNING_STATE)
self.negotiating_master_node_set.clear()
try:
while 1:
em.poll(1)
current_time = time()
if current_time >= t + 1:
t = current_time
# Expire temporarily down nodes. For now, assume that a node
# which is down for 60 seconds is really down, if this is a
# bootstrap. 60 seconds may sound too long, but this is reasonable
# when rebooting many cluster machines. Otherwise, wait for only
# 10 seconds, because stopping the whole cluster for a long time
# is a bad idea.
if bootstrap:
expiration = 60
else:
expiration = 10
for node in nm.getMasterNodeList():
if node.getState() == TEMPORARILY_DOWN_STATE \
and node.getLastStateChange() + expiration < current_time:
logging.info('%s is down' % (node, ))
node.setState(DOWN_STATE)
self.unconnected_master_node_set.discard(node.getServer())
# Try to connect to master nodes.
if self.unconnected_master_node_set:
for addr in list(self.unconnected_master_node_set):
ClientConnection(em, client_handler, addr = addr,
connector_handler = self.connector_handler)
if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0:
break
# Now there are three situations:
# - I am the primary master
# - I am secondary but don't know who is primary
# - I am secondary and know who is primary
if self.primary is None:
# I am the primary.
self.primary = True
logging.info('I am the primary, so sending an announcement')
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection):
conn.notify(protocol.announcePrimaryMaster())
conn.abort()
closed = False
t = time()
while not closed:
em.poll(1)
closed = True
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection):
closed = False
break
if t + 10 < time():
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection):
conn.close()
closed = True
else:
# Wait for an announcement. If this is too long, probably
# the primary master is down.
t = time()
while self.primary_master_node is None:
em.poll(1)
if t + 10 < time():
raise ElectionFailure, 'no primary master elected'
# Now I need only a connection to the primary master node.
primary = self.primary_master_node
addr = primary.getServer()
for conn in em.getConnectionList():
if isinstance(conn, ServerConnection) \
or isinstance(conn, ClientConnection) \
and addr != conn.getAddress():
conn.close()
# But if there is no such connection, something wrong happened.
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection) \
and addr == conn.getAddress():
break
else:
raise ElectionFailure, 'no connection remains to the primary'
return
except ElectionFailure, m:
logging.error('election failed; %s' % m)
# Ask all connected nodes to reelect a single primary master.
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection):
conn.notify(protocol.reelectPrimaryMaster())
conn.abort()
# Wait until the connections are closed.
self.primary = None
self.primary_master_node = None
closed = False
t = time()
while not closed:
try:
em.poll(1)
except ElectionFailure:
pass
closed = True
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection):
# Still not closed.
closed = False
break
if time() > t + 10:
# If too long, do not wait.
break
# Close all connections.
for conn in em.getConnectionList():
if not conn.isListeningConnection():
conn.close()
bootstrap = False
def broadcastNodeInformation(self, node):
"""Broadcast a Notify Node Information packet."""
logging.debug('broadcasting node information')
node_type = node.getNodeType()
state = node.getState()
uuid = node.getUUID()
# The server address may be None.
addr = node.getServer()
if addr is None:
ip_address, port = None, None
else:
ip_address, port = addr
if ip_address is None:
ip_address = '0.0.0.0'
if port is None:
port = 0
if node_type == CLIENT_NODE_TYPE:
# Only to master nodes and storage nodes.
for c in self.em.getConnectionList():
if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
node_list = [(node_type, ip_address, port, uuid, state)]
c.notify(protocol.notifyNodeInformation(node_list))
elif node.getNodeType() in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
for c in self.em.getConnectionList():
if c.getUUID() is not None:
node_list = [(node_type, ip_address, port, uuid, state)]
c.notify(protocol.notifyNodeInformation(node_list))
elif node.getNodeType() != ADMIN_NODE_TYPE:
raise RuntimeError('unknown node type')
def broadcastPartitionChanges(self, ptid, cell_list):
"""Broadcast a Notify Partition Changes packet."""
logging.info('broadcastPartitionChanges')
self.pt.log()
for c in self.em.getConnectionList():
if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID())
if n.getNodeType() in (CLIENT_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
# Split the packet if too big.
size = len(cell_list)
start = 0
while size:
amt = min(10000, size)
p = protocol.notifyPartitionChanges(ptid,
cell_list[start:start+amt])
c.notify(p)
size -= amt
start += amt
def sendPartitionTable(self, conn):
""" Send the partition table through the given connection """
row_list = []
for offset in xrange(self.pt.getPartitions()):
row_list.append((offset, self.pt.getRow(offset)))
# Split the packet if too huge.
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable( self.pt.getID(), row_list))
del row_list[:]
if row_list:
conn.notify(protocol.sendPartitionTable(self.pt.getID(), row_list))
def sendNodesInformations(self, conn):
""" Send informations on all nodes through the given connection """
node_list = []
for n in self.nm.getNodeList():
if n.getNodeType() != ADMIN_NODE_TYPE:
try:
ip_address, port = n.getServer()
except TypeError:
ip_address, port = '0.0.0.0', 0
node_list.append((n.getNodeType(), ip_address, port,
n.getUUID() or INVALID_UUID, n.getState()))
# Split the packet if too huge.
if len(node_list) == 10000:
conn.notify(protocol.notifyNodeInformation(node_list))
del node_list[:]
if node_list:
conn.notify(protocol.notifyNodeInformation(node_list))
def recoverStatus(self):
"""Recover the status about the cluster. Obtain the last OID, the last TID,
and the last Partition Table ID from storage nodes, then get back the latest
partition table or make a new table from scratch, if this is the first time."""
logging.info('begin the recovery of the status')
self.changeClusterState(protocol.RECOVERING)
em = self.em
nm = self.nm
self.loid = INVALID_OID
self.ltid = INVALID_TID
self.pt.setID(INVALID_PTID)
while 1:
self.target_uuid = None
self.pt.clear()
if self.pt.getID() != INVALID_PTID:
# I need to retrieve last ids again.
logging.info('resending Ask Last IDs')
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE \
and node.getState() == RUNNING_STATE:
conn.ask(protocol.askLastIDs())
# Wait for at least one storage node to appear.
while self.target_uuid is None:
em.poll(1)
# Wait a bit, 1 second is too short for the ZODB test running on a
# dedibox
t = time()
while time() < t + 5:
em.poll(1)
# Now I have at least one to ask.
prev_lptid = self.pt.getID()
node = nm.getNodeByUUID(self.target_uuid)
if node is None or node.getState() != RUNNING_STATE:
# Weird. It's dead.
logging.info('the target storage node is dead')
continue
for conn in em.getConnectionList():
if conn.getUUID() == self.target_uuid:
break
else:
# Why?
logging.info('no connection to the target storage node')
continue
if self.pt.getID() == INVALID_PTID:
# This looks like the first time. So make a fresh table.
logging.debug('creating a new partition table')
self.pt.setID(pack('!Q', 1)) # ptid != INVALID_PTID
self.pt.make(nm.getStorageNodeList())
else:
# Obtain a partition table. It is necessary to split this
# message, because the packet size can be huge.
logging.debug('asking a partition table to %s', node)
start = 0
size = self.pt.getPartitions()
while size:
amt = min(1000, size)
conn.ask(protocol.askPartitionTable(range(start, start + amt)))
size -= amt
start += amt
t = time()
while 1:
em.poll(1)
if node.getState() != RUNNING_STATE:
# Dead.
break
if self.pt.filled() or t + 30 < time():
break
if self.pt.getID() != prev_lptid or not self.pt.filled():
# I got something newer or the target is dead.
logging.debug('lptid = %s, prev_lptid = %s',
dump(self.pt.getID()), dump(prev_lptid))
self.pt.log()
continue
# Wait until the cluster gets operational or the Partition
# Table ID turns out to be not the latest.
logging.info('waiting for the cluster to be operational')
self.pt.log()
while 1:
em.poll(1)
if self.pt.operational():
break
if self.pt.getID() != prev_lptid:
break
if self.pt.getID() != prev_lptid:
# I got something newer.
continue
break
def verifyTransaction(self, tid):
em = self.em
uuid_set = set()
# Determine to which nodes I should ask.
partition = self.getPartition(tid)
transaction_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, readable=True)]
if len(transaction_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(transaction_uuid_list)
# Gather OIDs.
self.asking_uuid_dict = {}
self.unfinished_oid_set = set()
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in transaction_uuid_list:
self.asking_uuid_dict[uuid] = False
conn.ask(protocol.askTransactionInformation(tid))
if len(self.asking_uuid_dict) == 0:
raise VerificationFailure
while 1:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
if self.unfinished_oid_set is None or len(self.unfinished_oid_set) == 0:
# Not commitable.
return None
else:
# Verify that all objects are present.
for oid in self.unfinished_oid_set:
self.asking_uuid_dict.clear()
partition = self.getPartition(oid)
object_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, readable=True)]
if len(object_uuid_list) == 0:
raise VerificationFailure
uuid_set.update(object_uuid_list)
self.object_present = True
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in object_uuid_list:
self.asking_uuid_dict[uuid] = False
conn.ask(protocol.askObjectPresent(oid, tid))
while 1:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
if not self.object_present:
# Not commitable.
return None
return uuid_set
def verifyData(self):
"""Verify the data in storage nodes and clean them up, if necessary."""
logging.info('start to verify data')
em = self.em
nm = self.nm
# Wait ask/request primary master exchange with the last storage node
# because it have to be in the verification state
t = time()
while time() < t + 1:
em.poll(1)
self.changeClusterState(protocol.VERIFYING)
# FIXME this part has a potential problem that the write buffers can
# be very huge. Thus it would be better to flush the buffers from time
# to time _without_ reading packets.
# Send the current partition table to storage and admin nodes, so that
# all nodes share the same view.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
self.sendPartitionTable(conn)
# Gather all unfinished transactions.
#
# FIXME this part requires more brainstorming. Currently, this deals with
# only unfinished transactions. But how about finished transactions?
# Suppose that A and B have an unfinished transaction. First, A and B are
# asked to commit the transaction. Then, A succeeds. B gets down. Now,
# A believes that the transaction has been committed, while B still believes
# that the transaction is unfinished. Next, if B goes back and A is working,
# no problem; because B's unfinished transaction will be committed correctly.
# However, when B goes back, if A is down, what happens? If the state is
# not very good, B may be asked to abort the transaction!
#
# This situation won't happen frequently, and B shouldn't be asked to drop
# the transaction, if the cluster is not ready. However, there might be
# some corner cases where this may happen. That's why more brainstorming
# is required.
self.asking_uuid_dict = {}
self.unfinished_tid_set = set()
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE:
self.asking_uuid_dict[uuid] = False
conn.ask(protocol.askUnfinishedTransactions())
while 1:
em.poll(1)
if not self.pt.operational():
raise VerificationFailure
if False not in self.asking_uuid_dict.values():
break
# Gather OIDs for each unfinished TID, and verify whether the transaction
# can be finished or must be aborted. This could be in parallel in theory,
# but not so easy. Thus do it one-by-one at the moment.
for tid in self.unfinished_tid_set:
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
# Make sure that no node has this transaction.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE:
conn.notify(protocol.deleteTransaction(tid))
else:
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in uuid_set:
conn.ask(protocol.commitTransaction(tid))
# If possible, send the packets now.
em.poll(0)
# At this stage, all non-working nodes are out-of-date.
cell_list = self.pt.outdate()
# Tweak the partition table, if the distribution of storage nodes
# is not uniform.
cell_list.extend(self.pt.tweak())
# If anything changed, send the changes.
if cell_list:
self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
def provideService(self):
"""This is the normal mode for a primary master node. Handle transactions
and stop the service only if a catastrophy happens or the user commits
a shutdown."""
logging.info('provide service')
em = self.em
nm = self.nm
self.changeClusterState(protocol.RUNNING)
# This dictionary is used to hold information on transactions being finished.
self.finishing_transaction_dict = {}
# Now everything is passive.
expiration = 10
while 1:
t = 0
try:
em.poll(1)
# implement an expiration of temporary down nodes.
# If a temporary down storage node is expired, it moves to
# down state, and the partition table must drop the node,
# thus repartitioning must be performed.
current_time = time()
if current_time >= t + 1:
t = current_time
for node in nm.getStorageNodeList():
if node.getState() == TEMPORARILY_DOWN_STATE \
and node.getLastStateChange() + expiration < current_time:
logging.info('%s is down' % (node, ))
node.setState(DOWN_STATE)
self.broadcastNodeInformation(node)
cell_list = self.pt.dropNode(node)
self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
if not self.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
except OperationFailure:
# If not operational, send Stop Operation packets to storage nodes
# and client nodes. Abort connections to client nodes.
logging.critical('No longer operational, so stopping the service')
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE):
conn.notify(protocol.stopOperation())
if node.getNodeType() == CLIENT_NODE_TYPE:
conn.abort()
# Then, go back, and restart.
return
def playPrimaryRole(self):
logging.info('play the primary role with %s (%s:%d)',
dump(self.uuid), *(self.server))
# all incoming connections identify through this handler
self.listening_conn.setHandler(IdentificationEventHandler(self))
# If I know any storage node, make sure that they are not in the running state,
# because they are not connected at this stage.
for node in self.nm.getStorageNodeList():
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
while 1:
recovering = True
while recovering:
self.recoverStatus()
recovering = False
try:
self.verifyData()
except VerificationFailure:
recovering = True
self.provideService()
def playSecondaryRole(self):
"""I play a secondary role, thus only wait for a primary master to fail."""
logging.info('play the secondary role with %s (%s:%d)',
dump(self.uuid), *(self.server))
handler = PrimaryMasterEventHandler(self)
em = self.em
nm = self.nm
# Make sure that every connection has the secondary event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
while 1:
em.poll(1)
def changeClusterState(self, state):
""" Change the cluster state and apply right handler on each connections """
if self.cluster_state == state:
return
nm, em = self.nm, self.em
# select the storage handler
if state == protocol.BOOTING:
storage_handler = RecoveryEventHandler
elif state == protocol.RECOVERING:
storage_handler = RecoveryEventHandler
elif state == protocol.VERIFYING:
storage_handler = VerificationEventHandler
elif state == protocol.RUNNING:
storage_handler = StorageServiceEventHandler
else:
RuntimeError('Unexpected node type')
# change handlers
for conn in em.getConnectionList():
node = nm.getNodeByUUID(conn.getUUID())
if conn.isListeningConnection() or node is None:
# not identified or listening, keep the identification handler
continue
node_type = node.getNodeType()
if node_type in (ADMIN_NODE_TYPE, MASTER_NODE_TYPE):
# those node types keep their own handler
continue
if node_type == CLIENT_NODE_TYPE:
if state != RUNNING:
# FIXME: cut the connection ?
pass
handler = ClientServiceEventHandler
elif node_type == STORAGE_NODE_TYPE:
handler = storage_handler
handler = handler(self)
conn.setHandler(handler)
handler.connectionCompleted(conn)
self.cluster_state = state
def getNextOID(self):
if self.loid is None:
raise RuntimeError, 'I do not know the last OID'
oid = unpack('!Q', self.loid)[0]
self.loid = pack('!Q', oid + 1)
return self.loid
def getNextTID(self):
tm = time()
gmt = gmtime(tm)
upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \
+ gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min
lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0))
tid = pack('!LL', upper, lower)
if tid <= self.ltid:
upper, lower = unpack('!LL', self.ltid)
if lower == 0xffffffff:
# This should not happen usually.
from datetime import timedelta, datetime
d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday,
gmt.tm_hour, gmt.tm_min) \
+ timedelta(0, 60)
upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \
+ d.day - 1) * 24 + d.hour) * 60 + d.minute
lower = 0
else:
lower += 1
tid = pack('!LL', upper, lower)
self.ltid = tid
return tid
def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.pt.getPartitions()
def getNewOIDList(self, num_oids):
return [self.getNextOID() for i in xrange(num_oids)]
def getNewUUID(self, node_type):
# build an UUID
uuid = os.urandom(15)
while uuid == INVALID_UUID[1:]:
uuid = os.urandom(15)
# look for the prefix
prefix = UUID_NAMESPACES.get(node_type, None)
if prefix is None:
raise RuntimeError, 'No UUID namespace found for this node type'
return prefix + uuid
def isValidUUID(self, uuid, addr):
node = self.nm.getNodeByUUID(uuid)
if node is not None and node.getServer() is not None and node.getServer() != addr:
return False
return uuid != self.uuid and uuid != INVALID_UUID
def getClusterState(self):
return self.cluster_state
def shutdown(self):
"""Close all connections and exit"""
# change handler
handler = ShutdownEventHandler(self)
for c in self.em.getConnectionList():
c.setHandler(handler)
# wait for all transaction to be finished
while 1:
self.em.poll(1)
if len(self.finishing_transaction_dict) == 0:
if self.cluster_state == RUNNING:
sys.exit("Application has been asked to shut down")
else:
# no more transaction, ask clients to shutdown
logging.info("asking all clients to shutdown")
for c in self.em.getConnectionList():
node = self.nm.getNodeByUUID(c.getUUID())
if node.getType() == CLIENT_NODE_TYPE:
ip_address, port = node.getServer()
node_list = [(node.getType(), ip_address, port, node.getUUID(), DOWN_STATE)]
c.notify(protocol.notifyNodeInformation(node_list))
# then ask storages and master nodes to shutdown
logging.info("asking all remaining nodes to shutdown")
for c in self.em.getConnectionList():
node = self.nm.getNodeByUUID(c.getUUID())
if node.getType() in (STORAGE_NODE_TYPE, MASTER_NODE_TYPE):
ip_address, port = node.getServer()
node_list = [(node.getType(), ip_address, port, node.getUUID(), DOWN_STATE)]
c.notify(protocol.notifyNodeInformation(node_list))
# then shutdown
sys.exit("Cluster has been asked to shut down")
def identifyStorageNode(self, uuid, node):
# TODO: check all cases here, when server address change...
# in verification and running states, if the node is unknown but the
# uuid != INVALID_UUID, we have to give it a new uuid, but in recovery
# the node must keep it's UUID
state = protocol.RUNNING_STATE
handler = None
if self.cluster_state == protocol.RECOVERING:
# TODO: Enable empty node rejection when manual startup is ok :
if False and uuid == protocol.INVALID_UUID:
logging.info('reject empty storage node')
raise protocol.NotReadyError
handler = RecoveryEventHandler
elif self.cluster_state == protocol.VERIFYING:
if uuid == INVALID_UUID or node is None:
# if node is unknown, it has been forget when the current
# partition was validated by the admin
uuid = INVALID_UUID
state = protocol.PENDING_STATE
handler = VerificationEventHandler
elif self.cluster_state == protocol.RUNNING:
if uuid == INVALID_UUID or node is None:
# same as for verification
uuid = INVALID_UUID
state = protocol.PENDING_STATE
handler = StorageServiceEventHandler
elif self.cluster_state == protocol.STOPPING:
# FIXME: raise a ShutdowningError ?
raise protocol.NotReadyError
else:
raise RuntimeError('unhandled cluster state')
return (uuid, state, handler)
def identifyNode(self, node_type, uuid, node):
state = protocol.RUNNING_STATE
handler = IdentificationEventHandler
if node_type == protocol.ADMIN_NODE_TYPE:
# always accept admin nodes
klass = AdminNode
handler = AdministrationEventHandler
logging.info('Accept an admin %s' % dump(uuid))
elif node_type == protocol.MASTER_NODE_TYPE:
# always put other master in waiting state
klass = MasterNode
handler = SecondaryMasterEventHandler
logging.info('Accept a master %s' % dump(uuid))
elif node_type == protocol.CLIENT_NODE_TYPE:
# refuse any client before running
if self.cluster_state != protocol.RUNNING:
logging.info('reject a connection from a client')
raise protocol.NotReadyError
klass = ClientNode
# FIXME: Apply an handler dedicated to client nodes
handler = ClientServiceEventHandler
logging.info('Accept a client %s' % dump(uuid))
elif node_type == protocol.STORAGE_NODE_TYPE:
klass = StorageNode
(uuid, state, handler) = self.identifyStorageNode(uuid, node)
return (uuid, node, state, handler, klass)