Commit 455179ac authored by Tom Niget's avatar Tom Niget

more types

parent 033ce168
......@@ -34,3 +34,5 @@ nosetests.xml
.mr.developer.cfg
.project
.pydevproject
.idea/
......@@ -237,7 +237,7 @@ class ImportedNodeInterface(NSInterface):
On destruction, the interface will be restored to the original name space
and will try to restore the original state."""
def __init__(self, node: "nemu.Node", iface, migrate=True):
def __init__(self, node: "nemu.Node", iface: nemu.iproute.interface, migrate=True):
self._slave = None
self._migrate = migrate
if self._migrate:
......
......@@ -385,7 +385,7 @@ def create_if_pair(if1: interface, if2: interface) -> tuple[interface, interface
return interfaces[if1.name], interfaces[if2.name]
def del_if(iface):
def del_if(iface: interface | int | str):
ifname = _get_if_name(iface)
execute([IP_PATH, "link", "del", ifname])
......
......@@ -39,6 +39,7 @@ class Node(object):
_nodes: MutableMapping[int, "Node"] = weakref.WeakValueDictionary()
_nextnode = 0
_processes: MutableMapping[int, nemu.subprocess_.Subprocess]
_interfaces: MutableMapping[int, nemu.interface.Interface]
@staticmethod
def get_nodes() -> list["Node"]:
s = sorted(list(Node._nodes.items()), key = lambda x: x[0])
......@@ -98,7 +99,7 @@ class Node(object):
self._pid = self._slave = None
@property
def pid(self):
def pid(self) -> int:
return self._pid
# Subprocesses
......@@ -121,7 +122,7 @@ class Node(object):
return nemu.subprocess_.backticks_raise(self, *kargs, **kwargs)
# Interfaces
def _add_interface(self, interface):
def _add_interface(self, interface: nemu.interface.Interface):
self._interfaces[interface.index] = interface
def add_if(self, **kwargs):
......@@ -142,18 +143,18 @@ class Node(object):
setattr(i, k, v)
return i
def import_if(self, interface):
def import_if(self, interface: nemu.iproute.interface):
return nemu.interface.ImportedNodeInterface(self, interface)
def del_if(self, iface):
def del_if(self, iface: nemu.interface.Interface):
"""Doesn't destroy the interface if it wasn't created by us."""
del self._interfaces[iface.index]
iface.destroy()
def get_interface(self, name):
def get_interface(self, name: str):
return [i for i in self.get_interfaces() if i.name == name][0]
def get_interfaces(self):
def get_interfaces(self) -> list[nemu.interface.Interface]:
if not self._slave:
return []
ifaces = self._slave.get_if_data()
......
......@@ -29,7 +29,7 @@ import tempfile
import time
import traceback
from pickle import loads, dumps
from typing import Literal
from typing import Literal, Optional
import nemu.iproute
import nemu.subprocess_
......@@ -645,7 +645,7 @@ class Client(object):
raise
self._read_and_check_reply()
def spawn(self, argv, executable=None,
def spawn(self, argv: list[str], executable: str = None,
stdin=None, stdout=None, stderr=None,
cwd=None, env=None, user=None):
"""Start a subprocess in the slave; the interface resembles
......@@ -698,7 +698,7 @@ class Client(object):
return pid
def poll(self, pid):
def poll(self, pid: int) -> Optional[int]:
"""Equivalent to Popen.poll(), checks if the process has finished.
Returns the exitcode if finished, None otherwise."""
self._send_cmd("PROC", "POLL", pid)
......@@ -711,7 +711,7 @@ class Client(object):
else:
raise RuntimeError("Error on command: %d %s" % (code, text))
def wait(self, pid):
def wait(self, pid: int) -> int:
"""Equivalent to Popen.wait(). Waits for the process to finish and
returns the exitcode."""
self._send_cmd("PROC", "WAIT", pid)
......@@ -719,7 +719,7 @@ class Client(object):
exitcode = int(text.split()[0])
return exitcode
def signal(self, pid, sig=signal.SIGTERM):
def signal(self, pid: int, sig=signal.SIGTERM):
"""Equivalent to Popen.send_signal(). Sends a signal to the child
process; signal defaults to SIGTERM."""
if sig:
......@@ -728,7 +728,7 @@ class Client(object):
self._send_cmd("PROC", "KILL", pid)
self._read_and_check_reply()
def get_if_data(self, ifnr=None):
def get_if_data(self, ifnr=None) -> dict[int, nemu.iproute.interface] | nemu.iproute.interface:
if ifnr:
self._send_cmd("IF", "LIST", ifnr)
else:
......@@ -736,7 +736,7 @@ class Client(object):
data = self._read_and_check_reply()
return loads(_db64(data.partition("\n")[2]))
def set_if(self, interface):
def set_if(self, interface: nemu.iproute.interface):
cmd = ["IF", "SET", interface.index]
for k in interface.changeable_attributes:
v = getattr(interface, k)
......@@ -746,15 +746,15 @@ class Client(object):
self._send_cmd(*cmd)
self._read_and_check_reply()
def del_if(self, ifnr):
def del_if(self, ifnr: int):
self._send_cmd("IF", "DEL", ifnr)
self._read_and_check_reply()
def change_netns(self, ifnr, netns):
def change_netns(self, ifnr: int, netns: int):
self._send_cmd("IF", "RTRN", ifnr, netns)
self._read_and_check_reply()
def get_addr_data(self, ifnr=None):
def get_addr_data(self, ifnr: int = None):
if ifnr:
self._send_cmd("ADDR", "LIST", ifnr)
else:
......@@ -775,15 +775,15 @@ class Client(object):
self._send_cmd("ADDR", "DEL", ifnr, address.address, address.prefix_len)
self._read_and_check_reply()
def get_route_data(self):
def get_route_data(self) -> list[nemu.iproute.route]:
self._send_cmd("ROUT", "LIST")
data = self._read_and_check_reply()
return loads(_db64(data.partition("\n")[2]))
def add_route(self, route):
def add_route(self, route: nemu.iproute.route):
self._add_del_route("ADD", route)
def del_route(self, route):
def del_route(self, route: nemu.iproute.route):
self._add_del_route("DEL", route)
def _add_del_route(self, action: Literal["ADD", "DEL"], route: nemu.iproute.route):
......
......@@ -27,9 +27,9 @@ import signal
import sys
import time
import traceback
import typing
from typing import TYPE_CHECKING, Optional
if typing.TYPE_CHECKING:
if TYPE_CHECKING:
from nemu import Node
from nemu import compat
from nemu.environ import eintr_wrapper
......@@ -106,14 +106,14 @@ class Subprocess(object):
"""The real process ID of this subprocess."""
return self._pid
def poll(self):
def poll(self) -> Optional[int]:
"""Checks status of program, returns exitcode or None if still running.
See Popen.poll."""
if self._returncode is None:
self._returncode = self._slave.poll(self._pid)
return self.returncode
def wait(self):
def wait(self) -> int:
"""Waits for program to complete and returns the exitcode.
See Popen.wait"""
if self._returncode is None:
......@@ -266,21 +266,21 @@ class Popen(Subprocess):
return (out, err)
def system(node, args):
def system(node: "Node", args: str | list[str]) -> Optional[int]:
"""Emulates system() function, if `args' is an string, it uses `/bin/sh' to
exexecute it, otherwise is interpreted as the argv array to call execve."""
shell = isinstance(args, str)
return Popen(node, args, shell=shell).wait()
def backticks(node, args):
def backticks(node: "Node", args: str | list[str]) -> str:
"""Emulates shell backticks, if `args' is an string, it uses `/bin/sh' to
exexecute it, otherwise is interpreted as the argv array to call execve."""
shell = isinstance(args, str)
return Popen(node, args, shell=shell, stdout=PIPE).communicate()[0].decode("utf-8")
def backticks_raise(node, args: str | list[str]) -> str:
def backticks_raise(node: "Node", args: str | list[str]) -> str:
"""Emulates shell backticks, if `args' is an string, it uses `/bin/sh' to
exexecute it, otherwise is interpreted as the argv array to call execve.
Raises an RuntimeError if the return value is not 0."""
......@@ -299,8 +299,8 @@ def backticks_raise(node, args: str | list[str]) -> str:
#
# Server-side code, called from nemu.protocol.Server
def spawn(executable, argv=None, cwd=None, env=None, close_fds=False,
stdin=None, stdout=None, stderr=None, user=None):
def spawn(executable: str, argv=None, cwd=None, env=None, close_fds: bool | list[int] = False,
stdin=None, stdout=None, stderr=None, user=None) -> int:
"""Internal function that performs all the dirty work for Subprocess, Popen
and friends. This is executed in the slave process, directly from the
protocol.Server class.
......@@ -324,7 +324,7 @@ def spawn(executable, argv=None, cwd=None, env=None, close_fds=False,
userfd[i] = userfd[i].fileno() # pragma: no cover
# Verify there is no clash
assert not (set([0, 1, 2]) & set(filtered_userfd))
assert not ({0, 1, 2} & set(filtered_userfd))
if user is not None:
user, uid, gid = get_user(user)
......@@ -337,7 +337,7 @@ def spawn(executable, argv=None, cwd=None, env=None, close_fds=False,
(r, w) = compat.pipe()
pid = os.fork()
if pid == 0: # pragma: no cover
if pid == 0: # pragma: no cover
# coverage doesn't seem to understand fork
try:
# Set up stdio piping
......@@ -352,14 +352,14 @@ def spawn(executable, argv=None, cwd=None, env=None, close_fds=False,
flags = fcntl.fcntl(w, fcntl.F_GETFD)
fcntl.fcntl(w, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
if close_fds == True:
if close_fds is True:
for i in range(3, MAXFD):
if i != w:
try:
os.close(i)
except:
pass
elif close_fds != False:
elif close_fds is not False:
for i in close_fds:
os.close(i)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment