Commit 902c76b3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5dda5ec4
Application (master)
.em EventManager
.nm NodeManager
.tm TransactionManager
.pt PartitionTable
.backup_app BackupApplication
.admin_handler AdministrationHandler
.secondary_master_handler SecondaryMasterHandler
.client_service_handler ClientServiceHandler
.storage_service_handler StorageServiceHandler
.listening_conn ListeningConnection
Handlers
--------
AdministrationHandler < MasterHandler
.connectionLost(...)
.setClusterSate(...)
.setNodeState(...)
.addPendingNodes(...)
.tweakPartitionTable(...)
.truncate(conn, tid)
.checkReplicas(...)
SecondaryMasterHandler < MasterHandler
.connectionLost(...)
.announcePrimary(...)
.reelectPrimary(...)
ClientServiceHandler < MasterHandler
.connectionLost(...)
.askNodeInformation(conn) # send informations about master and storages only
.askBeginTransaction(conn, tid)
.askNewOIDs(conn, num_oids)
.askFinalTID(conn, ttid)
.askPack(conn, tid)
.abortTransaction(conn, tid)
StorageServiceHandler < BaseServiceHandler
.connectionCompleted(conn, new)
.connectionLost(conn, new_state)
.askUnfinishedTransactions(conn)
.answerInformationLocked(conn, ttid)
.notifyPartitionCorrupted(conn, partition, cell_list)
.notifyReplicationDone(conn, offset, tid)
.answerPack(conn, status)
MasterHandler < EventHandler
.connectionCompleted(...)
.requestIdentification(...)
.askClusterState(...)
.askRecovery(...)
.askLastIDs(...)
.askLastTransaction(...)
.askNodeInformation(...)
.askPartitionTable(...)
BaseServiceHandler < MasterHandler
.connectionCompleted(...)
.connectionLost(...)
.notifyReady(conn)
EventHandler
.packetReceived(conn, pkt, **kw)
-> .dispatch(conn, pkt, **kw)
-> conn.setPeerId(pkt.getId())
-> getattr(self, pkt.handler_method_name) (conn, *pkt.decode(), **kw)
Connections
-----------
ListeningConnection < BaseConnection
.__init__(self, app, handler, addr):
connector = self.ConnectorClass(addr)
BaseConnection.__init__(self, app, connector, addr)
connector.makeListeningConnection() # ~= self.socket.bind(addr) + listen()
self.em.register(self)
.readable(self):
connector, addr = self.connector.accept()
conn = ServerConnection(self.em, self._handlers.getHandler(), connector, addr)
conn._connected()
BaseConnection
.ConnectorClass = SocketConnector # by default
.em EventManager
.connector self.ConnectorClass() # by default ?
.addr
._handlers HandlerSwitcher
HandlerSwitcher
# pending handlers and related requests
._pending [] of [{}(msg_id -> answer_class, timeout, on_timeout, kw), handler]
.emit(request, timeout, on_timeout, kw)
.handle(connection, pkt)
-> ._handle(connection, pkt)
._handle(connection, pkt):
msg_id = pkt.getId()
request_dict, handler = ._pending[0]
if !pkt.isResponse():
handler.packetReceived(connection, pkt)
klass, _, _, kw = request_pop(msg_id)
if klass && isinstance(pkt, klass) || pkt.isError():
handler.packetReceived(connection, packet, kw)
else:
# unexpected answer
# skip all intermediate handlers and apply to "latest" (fff6cf45)
while len(._pending) > 1 && !._pending[0].handler:
del ._pending[0]
ServerConnection <Connection
KEEP_ALICE = Connection.KEEP_ALIVE + 5
server = True
# main worker class for handling data exchanging
Connection <BaseConnection
.read_buf ReadBuffer ([][]byte = iovec)
._queue []
._parser_state ParserState
.writable()
-> .connector.send()
.readable()
-> .connector.receive(.read_buf)
while .read_buf:
pkt = Packets.parse(.read_buf, ._parser_state)
._queue.append(pkt)
return bool(._queue)
# called from under EventManager.poll() # incoming messages
.process()
-> ._handlers.handle(self, ._queue.pop(0))
._addPacket(pkt):
.connector.queue(pkt.encode())
.em.addWriter(self)
.notify(pkt):
pkt.setId(next-msg-id)
._addPacket(pkt)
.ask(pkt, timeout=30, on_timeout=None, **kw):
.notify(pkt)
._handlers.emit(pkt, timeout, on_timeout, kw)
.answer(pkt, msg_id=None):
pkt.setId(msgid ?: self.getPeerId())
._addPacket(pkt)
------------
EventManager
.connection_dict {} fd -> Connection
.reader_set set
.writer_set set
.epoll
._pending_processing []Connection # fetched from .epoll
.register(conn)
.unregister(conn, close=False)
.poll(blocking=1):
._poll(blocking)
to_process = ._pending_processing.pop(0)
to_process.process()
._poll(blocking):
event_list = .epoll.poll(blocking)
for fd, event in event_list:
conn = .connection_dict[fd]
if event & EPOLLIN:
if conn.readable():
._addPendingConnection(conn) # queues to ._pending_processing
# EPOLLOUT -> wlist -> conn.writable()
# EPOLLERR -> elist -> conn.readable() -> ._addPendingConnection(conn)
.addReader(conn)
.removeReader(conn)
.addWriter(conn)
.removeWriter(conn)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment