# pylint: disable=no-member
import contextlib
import json
import subprocess
import time
import attr
from .common import Driver
from ..factory import target_factory
from ..step import step
from ..util.helper import processwrapper
from ..util.managedfile import ManagedFile
from ..util.timeout import Timeout
from ..resource.common import NetworkResource
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class RawNetworkInterfaceDriver(Driver):
bindings = {
"iface": {"NetworkInterface", "RemoteNetworkInterface", "USBNetworkInterface"},
}
[docs]
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._record_handle = None
self._replay_handle = None
[docs]
def on_activate(self):
self._set_interface("up")
self._wait_state("up")
[docs]
def on_deactivate(self):
self._set_interface("down")
self._wait_state("down")
def _wrap_command(self, args):
wrapper = ["sudo", "labgrid-raw-interface"]
if self.iface.command_prefix:
# add ssh prefix, convert command passed via ssh (including wrapper) to single argument
return self.iface.command_prefix + [" ".join(wrapper + args)]
else:
# keep wrapper and args as-is
return wrapper + args
@step(args=["state"])
def _set_interface(self, state):
"""Set interface to given state."""
cmd = ["ip", self.iface.ifname, state]
cmd = self._wrap_command(cmd)
subprocess.check_call(cmd)
[docs]
@Driver.check_active
def set_interface_up(self):
"""Set bound interface up."""
self._set_interface("up")
[docs]
@Driver.check_active
def set_interface_down(self):
"""Set bound interface down."""
self._set_interface("down")
def _get_state(self):
"""Returns the bound interface's operstate."""
if_state = self.iface.extra.get("state")
if if_state:
return if_state
cmd = self.iface.command_prefix + ["cat", f"/sys/class/net/{self.iface.ifname}/operstate"]
output = processwrapper.check_output(cmd).decode("ascii")
if_state = output.strip()
return if_state
[docs]
@Driver.check_active
def get_state(self):
"""Returns the bound interface's operstate."""
return self._get_state()
@step(title="wait_state", args=["expected_state", "timeout"])
def _wait_state(self, expected_state, timeout=60):
"""Wait until the expected state is reached or the timeout expires."""
timeout = Timeout(float(timeout))
while True:
if self._get_state() == expected_state:
return
if timeout.expired:
raise TimeoutError(
f"exported interface {self.iface.ifname} did not go {expected_state} within {timeout.timeout} seconds"
)
time.sleep(1)
[docs]
@Driver.check_active
def wait_state(self, expected_state, timeout=60):
"""Wait until the expected state is reached or the timeout expires."""
self._wait_state(expected_state, timeout=timeout)
def _stop(self, proc, *, timeout=None):
assert proc is not None
try:
_, err = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
proc.terminate()
_, err = proc.communicate()
raise
if proc.returncode:
raise subprocess.CalledProcessError(
returncode=proc.returncode,
cmd=proc.args,
stderr=err,
)
[docs]
@Driver.check_active
@step(args=["filename", "count", "timeout"])
def start_record(self, filename, *, count=None, timeout=None):
"""
Starts tcpdump on bound network interface resource.
Args:
filename (str): name of a file to record to, or None to record to stdout
count (int): optional, exit after receiving this many number of packets
timeout (int): optional, number of seconds to capture packets before tcpdump exits
Returns:
Popen object of tcpdump process
"""
assert self._record_handle is None
cmd = ["tcpdump", self.iface.ifname]
if count is not None:
cmd.append(str(count))
if timeout is not None:
cmd.append("--timeout")
cmd.append(str(timeout))
cmd = self._wrap_command(cmd)
if filename is None:
self._record_handle = subprocess.Popen(cmd, stdout=subprocess.PIPE)
else:
with open(filename, "wb") as outdata:
self._record_handle = subprocess.Popen(cmd, stdout=outdata, stderr=subprocess.PIPE)
return self._record_handle
[docs]
@Driver.check_active
@step(args=["timeout"])
def stop_record(self, *, timeout=None):
"""
Stops previously started tcpdump on bound network interface resource.
Args:
timeout (int): optional, maximum number of seconds to wait for the tcpdump process to
terminate
"""
try:
self._stop(self._record_handle, timeout=timeout)
except subprocess.TimeoutExpired:
# If live streaming packets, there is no reason to wait for tcpdump
# to finish, so expect a timeout if piping to stdout
if self._record_handle.stdout is None:
raise
finally:
self._record_handle = None
[docs]
@contextlib.contextmanager
def record(self, filename, *, count=None, timeout=None):
"""
Context manager to start/stop tcpdump on bound network interface resource.
Either count or timeout must be specified.
Args:
filename (str): name of a file to record to, or None to live stream packets
count (int): optional, exit after receiving this many number of packets
timeout (int): optional, number of seconds to capture packets before tcpdump exits
Returns:
Popen object of tcpdump process. If filename is None, packets can be read from stdout
"""
assert count or timeout
try:
yield self.start_record(filename, count=count, timeout=timeout)
finally:
self.stop_record(timeout=0 if filename is None else None)
[docs]
@Driver.check_active
@step(args=["filename"])
def start_replay(self, filename):
"""
Starts tcpreplay on bound network interface resource.
Args:
filename (str): name of a file to replay from
Returns:
Popen object of tcpreplay process
"""
assert self._replay_handle is None
if isinstance(self.iface, NetworkResource):
mf = ManagedFile(filename, self.iface)
mf.sync_to_resource()
cmd = self._wrap_command([f"tcpreplay {self.iface.ifname} < {mf.get_remote_path()}"])
self._replay_handle = subprocess.Popen(cmd, stderr=subprocess.PIPE)
else:
cmd = self._wrap_command(["tcpreplay", self.iface.ifname])
with open(filename, "rb") as indata:
self._replay_handle = subprocess.Popen(cmd, stdin=indata)
return self._replay_handle
[docs]
@Driver.check_active
@step(args=["timeout"])
def stop_replay(self, *, timeout=None):
"""
Stops previously started tcpreplay on bound network interface resource.
Args:
timeout (int): optional, maximum number of seconds to wait for the tcpreplay process to
terminate
"""
try:
self._stop(self._replay_handle, timeout=timeout)
finally:
self._replay_handle = None
[docs]
@contextlib.contextmanager
def replay(self, filename, *, timeout=None):
"""
Context manager to start/stop tcpreplay on bound network interface resource.
Args:
filename (str): name of a file to replay from
timeout (int): optional, maximum number of seconds to wait for the tcpreplay process to
terminate
"""
try:
yield self.start_replay(filename)
finally:
self.stop_replay(timeout=timeout)
[docs]
@Driver.check_active
@step()
def get_statistics(self):
"""
Returns basic interface statistics of bound network interface resource.
"""
cmd = self.iface.command_prefix + ["ip", "--json", "-stats", "-stats", "link", "show", self.iface.ifname]
output = processwrapper.check_output(cmd)
return json.loads(output)[0]
[docs]
@Driver.check_active
def get_address(self):
"""
Returns the MAC address of the bound network interface resource.
"""
return self.get_statistics()["address"]