Commit 596050eb authored by Julien Muchembled's avatar Julien Muchembled

Experimental support of RINA

parent 674cad32
...@@ -271,7 +271,8 @@ def main(): ...@@ -271,7 +271,8 @@ def main():
remote_gateway, config.disable_proto, config.neighbour) remote_gateway, config.disable_proto, config.neighbour)
config.babel_args += tunnel_manager.new_iface_list config.babel_args += tunnel_manager.new_iface_list
else: else:
tunnel_manager = tunnel.BaseTunnelManager(cache, cert, address) tunnel_manager = tunnel.BaseTunnelManager(control_socket,
cache, cert, address)
cleanup.append(tunnel_manager.sock.close) cleanup.append(tunnel_manager.sock.close)
try: try:
...@@ -279,7 +280,7 @@ def main(): ...@@ -279,7 +280,7 @@ def main():
ipv4 = getattr(cache, 'ipv4', None) ipv4 = getattr(cache, 'ipv4', None)
if ipv4: if ipv4:
serial = int(cert.cert.get_subject().serialNumber) serial = cert.subject_serial
if cache.ipv4_sublen <= 16 and serial < 1 << cache.ipv4_sublen: if cache.ipv4_sublen <= 16 and serial < 1 << cache.ipv4_sublen:
dot4 = lambda x: socket.inet_ntoa(struct.pack('!I', x)) dot4 = lambda x: socket.inet_ntoa(struct.pack('!I', x))
ip4('route', 'unreachable', ipv4, 'proto', 'static') ip4('route', 'unreachable', ipv4, 'proto', 'static')
......
import errno, glob, logging, os, select
import socket, struct, threading, time, weakref
from . import utils
# Experimental support for RINA (Recursive InterNetwork Architecture).
# https://github.com/IRATI/stack (revision b7a7552 or later)
# This relies on pull requests #996 and #997.
DEFAULT_DIF = "default.dif"
IPCM_PROMPT = "IPCM >>> "
IPCM_SOCK = '/run/ipcm-console.sock'
IPCP_NAME = 're6st'
NORMAL_DIF = "normal.DIF"
PORT = 3359
resolve_thread = None
shim = None
def ap_name(prefix):
# : and - are already used to separate the name from the instance number.
# Also not using / because the IPCP log path is named after the IPCP name.
return "%s.%s.%s" % (IPCP_NAME, int(prefix, 2), len(prefix))
def ap_prefix(name):
a, b, c = name.split('.')
if a == IPCP_NAME:
return utils.binFromSubnet(b + '/' + c)
@apply
class ipcm(object):
def __call__(self, *args):
try:
try:
s = self._socket
except AttributeError:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(IPCM_SOCK)
if s.recv(len(IPCM_PROMPT) + 1) != IPCM_PROMPT:
return
self._socket = s
x = ' '.join(map(str, args))
logging.debug("%s%s", IPCM_PROMPT, x)
s.send(x + '\n')
r = []
last = ''
while 1:
d = s.recv(4096)
if not d:
break
r += (last + d).split('\n')
last = r.pop()
if last == IPCM_PROMPT:
for x in r:
logging.debug("%s", x)
return r
except socket.error, e:
logging.info("%s", e)
del self._socket
def iterIpcp(self):
i = iter(self("list-ipcps") or ())
for line in i:
if line.startswith("Current IPC processes"):
l = lambda x: () if x == '-' else map(str.strip, x.split(','))
for line in i:
if not line:
return
id, name, type, state, reg_apps, ports = map(
str.strip, line.split('|'))
yield (int(id), name.replace(':', '-'), type, state,
l(reg_apps), map(int, l(ports)))
def queryRib(self, *args):
r = self("query-rib", *args)
if r:
i = iter(r)
for r in i:
if r:
name, class_, instance = r.split('; ')
assert name.startswith('Name: '), name
assert class_.startswith('Class: '), class_
assert instance.startswith('Instance: '), instance
r = next(i)
assert r.startswith('Value: '), r
value = [r[7:]]
while True:
r = next(i)
if not r:
break
value.append(r)
value = '\n'.join(value)
yield (name[6:], class_[7:], instance[10:],
None if value == '-' else value)
def iterNeigh(self, ipcp_id):
for x in self.queryRib(ipcp_id, "Neighbor",
"/difManagement/enrollment/neighbors/"):
x = dict(map(str.strip, x.split(':')) for x in x[3].split(';'))
yield (x['Name'],
int(x['Address']),
int(x['Enrolled']),
x['Supporting DIF Name'],
int(x['Underlying port-id']),
int(x['Number of enroll. attempts']))
class Shim(object):
normal_id = None
def __init__(self, ipcp_id, dif):
self.ipcp_id = ipcp_id
self.dif = dif
self._enabled = weakref.WeakValueDictionary()
def _kernel(self, **kw):
fd = os.open("/sys/rina/ipcps/%s/config" % self.ipcp_id, os.O_WRONLY)
try:
os.write(fd, ''.join("%s\0%s\0" % x for x in kw.iteritems()))
finally:
os.close(fd)
def _enroll(self, tm, prefix):
# This condition is only optimization, since the kernel may already
# have an entry for this neighbour.
if prefix not in self._enabled:
ap = ap_name(prefix)
ip = utils.ipFromBin(tm._network + prefix, '1')
port = str(PORT)
self._kernel(dirEntry="1:%s:%s0:%s:%s%s:%s" % (
len(ap), ap, len(ip), ip, len(port), port))
self._enabled[prefix] = tm._getPeer(prefix)
def update(self, tm):
global resolve_thread
if resolve_thread is None:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind("\0rina.resolve_ipcp_address")
s.listen(5)
resolve_thread = threading.Thread(target=self._resolve, args=(s,))
resolve_thread.daemon = True
resolve_thread.start()
prefix = tm._prefix
self._enabled[prefix] = tm.cert
self._asking_info = {prefix: float('inf')}
def update(tm):
ap = ap_name(tm._prefix)
name = ap + "-1--"
step = 0 # do never retry immediately
while 1:
normal_id = None
for ipcp in ipcm.iterIpcp():
if ipcp[0] == self.ipcp_id:
registered = name in ipcp[4]
elif ipcp[1] == name:
normal_id = ipcp[0]
normal_status = ipcp[3]
if registered:
break
if normal_id is None:
if step or not ipcm("create-ipcp", ap, 1, "normal-ipc"):
return
step = 1
elif normal_status == "INITIALIZED":
if step > 1 or not ipcm("assign-to-dif", normal_id,
NORMAL_DIF, DEFAULT_DIF):
return
step = 2
elif normal_status.startswith("ASSIGNED TO DIF"):
if step > 2 or not ipcm("register-at-dif",
normal_id, self.dif):
return
step = 3
else:
return
asking_info = self._asking_info
enabled = self._enabled
enrolled = set(ap_prefix(neigh[0].split('-', 1)[0])
for neigh in ipcm.iterNeigh(normal_id))
now = time.time()
for neigh_routes in tm.ctl.neighbours.itervalues():
for prefix in neigh_routes[1]:
if not prefix or prefix in enrolled:
continue
if prefix in enabled:
# Avoid enrollment to a neighbour
# that does not know our address.
if prefix not in asking_info:
r = ipcm("enroll-to-dif", normal_id,
NORMAL_DIF, self.dif, ap_name(prefix), 1)
if r and 'failed' in r[0]:
del enabled[prefix]
# Enrolling may take a while
# so don't block for too long.
if now + 1 < time.time():
return
continue
if asking_info.get(prefix, 0) < now and tm.askInfo(prefix):
self._enroll(tm, prefix)
self._asking_info[prefix] = now + 60
ap = ap_name(prefix)
port = str(PORT)
self._kernel(hostname=utils.ipFromBin(tm._network + prefix, '1'),
expReg="1:%s:%s0:%s:%s" % (len(ap), ap, len(port), port))
self.update = update
update(tm)
def enabled(self, tm, prefix, enroll):
logging.debug("RINA: enabled(%s, %s)", prefix, enroll)
try:
asking_info = self._asking_info
except AttributeError:
return
if enroll:
asking_info.pop(prefix, None)
self._enroll(tm, prefix)
else:
asking_info[prefix] = float('inf')
@staticmethod
def _resolve(sock):
clients = []
try:
while True:
try:
s = select.select([sock] + clients, (), ())
except select.error as e:
if e.args[0] != errno.EINTR:
raise
continue
for s in s[0]:
if s is sock:
clients.append(s.accept()[0])
continue
try:
d = s.recv(4096)
if d:
try:
address = 0
dif, name, instance = d.split('\n')
if dif == NORMAL_DIF and instance == "1":
prefix = ap_prefix(name)
if prefix:
try:
address = 1 + (
shim._enabled[prefix]
.subject_serial)
except KeyError:
pass
except:
logging.info("RINA: resolve(%r)", d)
raise
logging.debug("RINA: resolve(%r) -> %r", d, address)
s.send(struct.pack('=I', address))
continue
except Exception, e:
logging.info("RINA: %s", e)
clients.remove(s)
s.close()
finally:
global resolve_thread
resolve_thread = None
sock.close()
for s in clients:
s.close()
def sysfs_read(path):
fd = os.open(path, os.O_RDONLY)
try:
return os.read(fd, 4096).strip()
finally:
os.close(fd)
update = dummy_update = lambda tunnel_manager, route_dumped: False
if os.path.isdir("/sys/rina"):
def update(tunnel_manager, route_dumped):
global shim, update
try:
for ipcp in glob.glob("/sys/rina/ipcps/*"):
if sysfs_read(ipcp + "/type") != "shim-tcp-udp" or \
sysfs_read(ipcp + "/name") != IPCP_NAME + "/1//":
continue
if not os.access(ipcp + "/config", os.W_OK):
logging.exception("RINA: This kernel does not support"
" dynamic updates of shim-tcp-udp configuration.")
update = dummy_update
return False
dif = sysfs_read(ipcp + "/dif")
if dif.endswith("///"):
host_name = sysfs_read(ipcp + "/host_name")
if not host_name:
shim = Shim(int(ipcp.rsplit("/", 1)[1]), dif[:-3])
if route_dumped:
shim.update(tunnel_manager)
return True
shim = None
except Exception, e:
logging.info("%s", e)
return False
def enabled(*args):
if shim:
try:
shim.enabled(*args)
except Exception, e:
logging.info("%s", e)
import errno, logging, os, random, socket, subprocess, struct, time, weakref import errno, json, logging, os, random
import socket, subprocess, struct, time, weakref
from collections import defaultdict, deque from collections import defaultdict, deque
from bisect import bisect, insort from bisect import bisect, insort
from OpenSSL import crypto from OpenSSL import crypto
from . import ctl, plib, utils, version, x509 from . import ctl, plib, rina, utils, version, x509
PORT = 326 PORT = 326
...@@ -172,8 +173,9 @@ class BaseTunnelManager(object): ...@@ -172,8 +173,9 @@ class BaseTunnelManager(object):
'ipv4', 'ipv4_sublen')) 'ipv4', 'ipv4_sublen'))
_forward = None _forward = None
_next_rina = True
def __init__(self, cache, cert, address=()): def __init__(self, control_socket, cache, cert, address=()):
self.cert = cert self.cert = cert
self._network = cert.network self._network = cert.network
self._prefix = cert.prefix self._prefix = cert.prefix
...@@ -200,6 +202,8 @@ class BaseTunnelManager(object): ...@@ -200,6 +202,8 @@ class BaseTunnelManager(object):
self._peers = [p] self._peers = [p]
self._timeouts = [(p.stop_date, self.invalidatePeers)] self._timeouts = [(p.stop_date, self.invalidatePeers)]
self.ctl = ctl.Babel(control_socket, weakref.proxy(self), self._network)
# Only to check routing cache. Should go back to # Only to check routing cache. Should go back to
# TunnelManager when we don't need to check it anymore. # TunnelManager when we don't need to check it anymore.
self._next_refresh = time.time() self._next_refresh = time.time()
...@@ -209,11 +213,19 @@ class BaseTunnelManager(object): ...@@ -209,11 +213,19 @@ class BaseTunnelManager(object):
t += self._timeouts t += self._timeouts
if self._next_refresh: # same comment as in __init__ if self._next_refresh: # same comment as in __init__
t.append((self._next_refresh, self.refresh)) t.append((self._next_refresh, self.refresh))
self.ctl.select(r, w, t)
def refresh(self): def refresh(self):
self._next_refresh = time.time() + 5 if self._next_rina and rina.update(self, False):
self._next_rina = False
self.ctl.request_dump()
self._next_refresh = time.time() + self.cache.hello
self.checkRoutingCache() self.checkRoutingCache()
def babel_dump(self):
rina.update(self, True)
self._next_rina = True
def selectTimeout(self, next, callback, force=True): def selectTimeout(self, next, callback, force=True):
t = self._timeouts t = self._timeouts
for i, x in enumerate(t): for i, x in enumerate(t):
...@@ -403,7 +415,17 @@ class BaseTunnelManager(object): ...@@ -403,7 +415,17 @@ class BaseTunnelManager(object):
if code == 3 and tunnel_killer.state == 'locked': # response if code == 3 and tunnel_killer.state == 'locked': # response
self._kill(peer) self._kill(peer)
elif code == 4: # node information elif code == 4: # node information
if not msg: if msg:
if not peer:
return
try:
ask, ver, protocol, rina_enabled = json.loads(msg)
except ValueError:
ask = rina_enabled = False
rina.enabled(self, peer, rina_enabled)
if ask:
return self._info(False)
else:
return version.version return version.version
elif code == 5: elif code == 5:
# the registry wants to know the topology for debugging purpose # the registry wants to know the topology for debugging purpose
...@@ -417,6 +439,16 @@ class BaseTunnelManager(object): ...@@ -417,6 +439,16 @@ class BaseTunnelManager(object):
if peer and self._prefix == self.cache.registry_prefix: if peer and self._prefix == self.cache.registry_prefix:
logging.info("%s/%s: %s", int(peer, 2), len(peer), msg) logging.info("%s/%s: %s", int(peer, 2), len(peer), msg)
def askInfo(self, prefix):
return self.sendto(prefix, '\4' + self._info(True))
def _info(self, ask):
return json.dumps((ask,
version.version,
version.protocol,
rina.shim is not None,
))
@staticmethod @staticmethod
def _restart(): def _restart():
raise utils.ReexecException( raise utils.ReexecException(
...@@ -542,8 +574,8 @@ class TunnelManager(BaseTunnelManager): ...@@ -542,8 +574,8 @@ class TunnelManager(BaseTunnelManager):
def __init__(self, control_socket, cache, cert, openvpn_args, def __init__(self, control_socket, cache, cert, openvpn_args,
timeout, client_count, iface_list, address, ip_changed, timeout, client_count, iface_list, address, ip_changed,
remote_gateway, disable_proto, neighbour_list=()): remote_gateway, disable_proto, neighbour_list=()):
super(TunnelManager, self).__init__(cache, cert, address) super(TunnelManager, self).__init__(control_socket,
self.ctl = ctl.Babel(control_socket, weakref.proxy(self), self._network) cache, cert, address)
self.ovpn_args = openvpn_args self.ovpn_args = openvpn_args
self.timeout = timeout self.timeout = timeout
self._read_sock, self.write_sock = socket.socketpair( self._read_sock, self.write_sock = socket.socketpair(
...@@ -609,7 +641,6 @@ class TunnelManager(BaseTunnelManager): ...@@ -609,7 +641,6 @@ class TunnelManager(BaseTunnelManager):
def select(self, r, w, t): def select(self, r, w, t):
super(TunnelManager, self).select(r, w, t) super(TunnelManager, self).select(r, w, t)
r[self._read_sock] = self.handleClientEvent r[self._read_sock] = self.handleClientEvent
self.ctl.select(r, w, t)
def refresh(self): def refresh(self):
logging.debug('Checking tunnels...') logging.debug('Checking tunnels...')
...@@ -651,6 +682,7 @@ class TunnelManager(BaseTunnelManager): ...@@ -651,6 +682,7 @@ class TunnelManager(BaseTunnelManager):
#if remove and len(self._connecting) < len(self._free_iface_list): #if remove and len(self._connecting) < len(self._free_iface_list):
# self._tuntap(self._free_iface_list.pop()) # self._tuntap(self._free_iface_list.pop())
self._next_refresh = time.time() + 5 self._next_refresh = time.time() + 5
rina.update(self, True)
def _cleanDeads(self): def _cleanDeads(self):
disconnected = False disconnected = False
......
...@@ -32,7 +32,7 @@ if dirty: ...@@ -32,7 +32,7 @@ if dirty:
# they are intended to the network admin. # they are intended to the network admin.
# Only 'protocol' is important and it must be increased whenever they would be # Only 'protocol' is important and it must be increased whenever they would be
# a wish to force an update of nodes. # a wish to force an update of nodes.
protocol = 2 protocol = 3
min_protocol = 1 min_protocol = 1
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -105,6 +105,10 @@ class Cert(object): ...@@ -105,6 +105,10 @@ class Cert(object):
def network(self): def network(self):
return networkFromCa(self.ca) return networkFromCa(self.ca)
@property
def subject_serial(self):
return int(self.cert.get_subject().serialNumber)
@property @property
def openvpn_args(self): def openvpn_args(self):
return ('--ca', self.ca_path, return ('--ca', self.ca_path,
...@@ -197,6 +201,8 @@ class Peer(object): ...@@ -197,6 +201,8 @@ class Peer(object):
def connected(self): def connected(self):
return self._last is None or time.time() < self._last + 60 return self._last is None or time.time() < self._last + 60
subject_serial = Cert.subject_serial
def __ne__(self, other): def __ne__(self, other):
raise AssertionError raise AssertionError
__eq__ = __ge__ = __le__ = __ne__ __eq__ = __ge__ = __le__ = __ne__
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment