Commit 11bb5b72 authored by zhifan huang's avatar zhifan huang

test: add new module to create network

add new module to easily create network. Only need to define points and
edges, and than, each node's ip address and route are auto-configed.

add a network typo, where the send and receive of packets use different
routes.

add 3 test, 2 is ping test. 1 is new speed test.
speed test: send 1Mb message, and test the cost of time.
parent 09ea6992
table inet limit_demo {
limit lim_1mbps { rate 256 kbytes/second}
chain limit_chain {
type filter hook postrouting priority 0; policy drop;
limit name "lim_1mbps" accept
}
}
import logging
import socket
import struct
import subprocess
import weakref
from collections import namedtuple, defaultdict
from functools import partial
from pathlib2 import Path
from threading import Timer
from subprocess import PIPE, CalledProcessError
Edge = namedtuple("Edge", ["dst", "cost"])
LIMIT_SPEED = str(Path(__file__).resolve().parent / "limit.rules")
def destroy_process(proc, timeout=5):
proc.terminate()
def kill_process():
if proc.poll() is None:
proc.kill()
proc.wait()
if proc.poll() is None:
t = Timer(timeout, kill_process)
t.start()
class Graph(object):
def __init__(self):
self.edges = []
self.points = []
@property
def vertices(self):
"""amount of points"""
return len(self.points)
def add_point(self, point):
self.edges.append([])
self.points.append(point)
def get_point(self, name):
"""return first point with same name"""
for p in self.points:
if p.name == name:
return p
return None
def add_edge(self, p1, p2, cost=1.0):
"""
p1, p2: int, index of vertex
cost: float
"""
self.edges[p1].append(Edge(p2, cost))
def dijkstra(self, src):
"""use dijkstra method to find the route of src point
return:
a list containing the next hop to reaching each point
"""
Path = namedtuple("Path", ["cost", "prev"])
dis = [Path(0xFF, -1)] * self.vertices
vis = [None] * self.vertices
dis[src] = Path(0, src)
for _ in range(self.vertices):
u = -1
dis_min = 0xFF
for i in range(self.vertices):
if vis[i] is None and dis[i].cost < dis_min:
u = i
dis_min = dis[i].cost
vis[u] = dis[u].prev
for ed in self.edges[u]:
dst = ed.dst
cost = ed.cost + dis[u].cost
if dis[dst].cost > cost:
dis[dst] = Path(cost, u)
# find next hop
def find(x):
if vis[x] != src and vis[vis[x]] != src:
vis[x] = find(vis[x])
return vis[x]
map(find, xrange(self.vertices))
for i in xrange(self.vertices):
if vis[i] == src:
vis[i] = i
return vis
class Network(Graph):
"""used to define a topology of a network
Using the topology of the network, each node's IP addresses and routes are
automatically setup.
NAT is not considered
"""
_ip = 0
def __init__(self, net="10.0.0.0"):
super(Network, self).__init__()
self.routes = {}
self.net = net
self.net_int = struct.unpack("!I", socket.inet_aton(net))[0]
@property
def next_ip(self):
self._ip += 1
return socket.inet_ntoa(struct.pack("!I", self.net_int + self._ip))
def add_switch(self):
pass
def add_node(self, name, node=None):
node = node or Node()
node.name = name
self.add_point(node)
return node
def connect(self, node1, node2, cost1=1, cost2=None):
"""create p2p link(veth) between node1 and node2
cost1: 1 -> 2
cost2: 2 -> 1
"""
cost2 = cost2 or cost1
i1, i2 = map(self.points.index, (node1, node2))
self.add_edge(i1, i2, cost1)
self.add_edge(i2, i1, cost2)
dev1 = Device("veth")
dev2 = Device("veth")
subprocess.check_call(['ip', 'link', 'add', dev1.name, 'netns', str(node1.pid),
'type', 'veth', 'peer', dev2.name, 'netns', str(node2.pid)])
node1.add_device(dev1)
node2.add_device(dev2)
node1.neighs[node2] = dev1
node2.neighs[node1] = dev2
dev1.up = dev2.up = True
dev1.add_ip4(self.next_ip, 32)
dev2.add_ip4(self.next_ip, 32)
return dev1, dev2
def compute_route(self):
self.routes = {i: self.dijkstra(i) for i in xrange(self.vertices)}
def config_route(self):
"""add route for each node"""
all_node = set(range(self.vertices))
all_ips = []
for i in all_node:
all_ips.append(self.points[i].ips)
for i in all_node:
route_rule = defaultdict(list)
for j in all_node - {i,}:
next_hop = self.routes[i][j]
if next_hop < 0 or next_hop >= self.vertices:
continue
next_hop = self.points[next_hop]
route_rule[next_hop].extend(all_ips[j])
node = self.points[i]
# TODO aggregate ip address
for next_hop, ips in route_rule.iteritems():
dev = node.neighs[next_hop]
for ip in ips:
# logging.warning("%s, %s", ip, dev.name)
node.add_route(ip, "dev", dev.name)
def connectable_test(self):
"""test each node can ping to their registry
Raise:
CalledProcessError
"""
for src in self.points:
for dst in self.points:
if dst == src:
continue
try:
src.run(["ping", "-c", "1", dst.ips[0]], stdout=PIPE)
except CalledProcessError as e:
logging.error("%s -> %s ping test failed", src.name, dst.name)
raise e
logging.debug("each node can ping to the other")
class Device(object):
"""class for network device"""
_id = 0
@classmethod
def get_id(cls):
cls._id += 1
return cls._id
def __init__(self, dev_type, name=None):
"""
type: device type, str
name: device name, str. if not set, generate one name
"""
# name if name else .....
self.type = dev_type
self.name = name or "{}-{}".format(dev_type, self.get_id())
self.ips = []
self._up = False
self.node = None
@property
def up(self):
"bool value control device up or not"
return self._up
@up.setter
def up(self, value):
"""value: Bool"""
if value == self._up:
return
self._up = value
value = "up" if value else "down"
self.node.run(["ip", "link", "set", "up", self.name])
def add_ip4(self, address, prefix):
ip = "{}/{}".format(address, prefix)
self.ips.append(address)
self.node.run(["ip", "addr", "add", ip, "dev", self.name])
class Node(object):
"""a network namespace"""
def __init__(self):
self.devices = []
self.procs = []
self.neighs = {}
self.name = None
self.procs.append(subprocess.Popen(["unshare", "-n"], stdin=PIPE))
self.pid = self.procs[0].pid
self.add_device_lo()
self.run(["sysctl", "-q", "net.ipv4.conf.all.proxy_arp=1"])
self.run(["sysctl", "-q", "net.ipv4.ip_forward=1"])
self.run(["sysctl", "-q", "net.ipv6.conf.default.forwarding=1"])
self.run(["sysctl", "-q", "net.ipv4.icmp_echo_ignore_broadcasts=0"])
@property
def iface(self):
return self.devices[-1]
@property
def ip(self):
return self.iface.ips[-1]
@property
def ips(self):
res = []
for dev in self.devices:
res.extend(dev.ips)
return res
def Popen(self, cmd, **kw):
"""wrapper for subprocess.Popen"""
proc = subprocess.Popen(["nsenter", "-t", str(self.pid), "-n"] + cmd, **kw)
proc.destroy = partial(destroy_process, proc)
return proc
def run(self, cmd, **kw):
"""wrapper for subprocess.checkout"""
return subprocess.check_call(["nsenter", "-t", str(self.pid), "-n"] + cmd, **kw)
def add_device(self, dev):
self.devices.append(dev)
dev.node = weakref.proxy(self)
def add_device_lo(self):
lo = Device("lo", name="lo")
self.add_device(lo)
lo.up = True
def add_route(self, net, *args):
self.run(["ip", "route", "add", net] + list(args))
def __del__(self):
for proc in self.procs:
destroy_process(proc)
class Switch(Node):
pass
def ring_net(number):
"""create a ring typo network, use first node as registry
args:
number: number of nodes
"""
if number < 2:
raise ValueError("node number should greater than 1")
net = Network()
for i in xrange(number):
net.add_node("n{}".format(i))
for i in xrange(number - 1):
net.connect(net.points[i], net.points[i - 1])
if number > 2:
net.connect(net.points[0], net.points[-1])
net.compute_route()
net.config_route()
net.connectable_test()
net.registries = {net.points[0]: net.points[1:]}
return net
def complete_net(number):
"""use complete graph as network typo, use first node as registry"""
if number < 2:
raise ValueError("node number should greater than 1")
net = Network()
for i in xrange(number):
net.add_node("n{}".format(i))
for i in xrange(number):
for j in xrange(i + 1, number):
net.connect(net.points[i], net.points[j])
net.compute_route()
net.config_route()
net.connectable_test()
net.registries = {net.points[0]: net.points[1:]}
return net
def diff_route():
"""
typo:
----- n2 -----
| (bad) |
n1 ------- n3 ------|-- n4
| (good) |
----- reg ------
(good)
bad route will limit traffic speed
Here we set default route of n1 -> n4 is n1 -> n3 -> n4, but the return
route will be n4 -> n2 -> n1, which is slower. So we can simulate difference
network speed between upload and download
"""
net = Network()
n1 = net.add_node("n1")
n2 = net.add_node("n2")
n3 = net.add_node("n3")
n4 = net.add_node("n4")
reg = net.add_node("reg")
net.connect(n1, n2, 2, 0.1)
net.connect(n4, n2, 0.1, 2)
net.connect(n1, n3)
net.connect(n4, n3)
net.connect(n1, reg, 1.5)
net.connect(n4, reg, 1.5)
net.compute_route()
net.config_route()
net.connectable_test()
net.registries = {reg: [n1, n2, n3, n4]}
n2.run(["nft", "-f", LIMIT_SPEED])
# will see speed is limited
# proc = n1.Popen(['python3', 'speed_test_server.py'])
# n4.run(['python3', 'speed_test_client.py', n1.ip])
# proc.wait()
return net
if __name__ == "__main__":
diff_route()
#!/bin/python3
import argparse
import socket
HOST = "127.0.0.1"
PORT = 50000
REPEAT = 1 << 8
TESTDATA = b"miku" * 1024
parser = argparse.ArgumentParser()
parser.add_argument("host", type=str, metavar="ip address", default=HOST)
parser.add_argument("port", default=PORT, nargs="?")
parser.add_argument("-6", "--ipv6", action="store_true")
args = parser.parse_args()
if args.ipv6:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((args.host, PORT))
for i in range(REPEAT):
sock.send(TESTDATA)
sock.close()
import argparse
import logging
import socket
import time
logger = logging.getLogger(__name__)
HOST_4 = "0.0.0.0"
HOST_6 = "::"
PORT = 50000
BUFFER = 4096
parser = argparse.ArgumentParser()
parser.add_argument("-6", "--ipv6", action="store_true")
args = parser.parse_args()
if args.ipv6:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind((HOST_6, PORT))
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((HOST_4, PORT))
sock.listen()
client, addr = sock.accept()
start = time.perf_counter()
count = 0
while True:
data = client.recv(BUFFER)
if data:
count += len(data)
continue
client.close()
end = time.perf_counter()
delta = end - start
logger.info("time use %.2fs, speed %.2fMB/s"
, delta, count / delta / (1 << 20))
break
print(delta)
"""contain ping-test for re6set net"""
import os
import unittest
import time
import psutil
import logging
import os
import random
import time
import unittest
from pathlib2 import Path
from subprocess import PIPE
import psutil
from re6st.tests.test_network import my_net, network_build, re6st_wrap
PING_PATH = str(Path(__file__).parent.resolve() / "ping.py")
SPEED_TEST_CLIENT = str(Path(__file__).parent.resolve() / "speed_test_client.py")
SPEED_TEST_SERVER = str(Path(__file__).parent.resolve() / "speed_test_server.py")
def deploy_re6st(nm, recreate=False):
net = nm.registries
......@@ -30,6 +35,34 @@ def deploy_re6st(nm, recreate=False):
nodes.append(node)
return nodes, registries
def find_node(nodes, name):
for node in nodes:
if node.node.name == name:
return node
def speed_test(client, server, protocol=6):
"""client sent 1MB message to server
return:
the time cost of server to receive all packet.
"""
server_cmd = ['python3', SPEED_TEST_SERVER]
client_cmd = ['python3', SPEED_TEST_CLIENT]
if protocol == 6:
ip_addr = server.ip6 + '1'
server_cmd.append('-6')
client_cmd.append('-6')
else:
ip_addr = server.ip
client_cmd.append(ip_addr)
proc = server.node.Popen(server_cmd, stdout=PIPE)
time.sleep(3)
client.node.run(client_cmd)
out, _ = proc.communicate()
return float(out)
def wait_stable(nodes, timeout=240):
"""try use ping6 from each node to the other until ping success to all the
other nodes
......@@ -89,6 +122,49 @@ class TestNet(unittest.TestCase):
# except:
# pass
def test_switch_route(self):
"""A network containing 3 routes from n4 to n1, the default limit
bandwidth at 256kb/s. The others don't have a limit. test speed from n4
to n1
This test is not sufficient. It proves re6stnet can choose a different
route than the default. It doesn't guarantee re6stnet will choose an
optimized route."""
nm = my_net.diff_route()
nodes, _ = deploy_re6st(nm)
wait_stable(nodes, 40)
n1 = find_node(nodes, 'n1')
n2 = find_node(nodes, 'n2')
n4 = find_node(nodes, 'n4')
self.assertTrue(speed_test(n2, n1) > 1,
"route config fault, limit on bandwidth not work.")
self.assertTrue(speed_test(n4, n1) < 1,
"use the default route, not a optimized route")
time.sleep(5)
# speed)
def test_ping_complete_net(self):
"""ping test a complete network"""
nm = my_net.complete_net(8)
nodes, _ = deploy_re6st(nm, True)
wait_stable(nodes, 50)
time.sleep(10)
self.assertTrue(wait_stable(nodes, 40), " ping test failed")
def test_ping_ring_net(self):
"""ping test a ring net"""
nm = my_net.ring_net(8)
nodes, _ = deploy_re6st(nm, True)
wait_stable(nodes, 50)
time.sleep(10)
self.assertTrue(wait_stable(nodes, 40), " ping test failed")
def test_ping_router(self):
"""create a network in a net segment, test the connectivity by ping
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment