import asyncio
import time
import enum
import random
import re
import string
import logging
from datetime import datetime
from fnmatch import fnmatchcase
import attr
from .generated import labgrid_coordinator_pb2
__all__ = [
"TAG_KEY",
"TAG_VAL",
"ResourceEntry",
"ResourceMatch",
"Place",
"ReservationState",
"Reservation",
]
TAG_KEY = re.compile(r"[a-z][a-z0-9_]+")
TAG_VAL = re.compile(r"[a-z0-9_]?")
def set_map_from_dict(m, d):
for k, v in d.items():
assert isinstance(k, str)
if v is None:
m[k].Clear()
elif isinstance(v, bool):
m[k].bool_value = v
elif isinstance(v, int):
if v < 0:
m[k].int_value = v
else:
m[k].uint_value = v
elif isinstance(v, float):
m[k].float_value = v
elif isinstance(v, str):
m[k].string_value = v
else:
raise ValueError(f"cannot translate {repr(v)} to MapValue")
def build_dict_from_map(m):
d = {}
for k, v in m.items():
v: labgrid_coordinator_pb2.MapValue
kind = v.WhichOneof("kind")
if kind is None:
d[k] = None
else:
d[k] = getattr(v, kind)
return d
[docs]
@attr.s(eq=False)
class ResourceEntry:
data = attr.ib() # cls, params
[docs]
def __attrs_post_init__(self):
assert isinstance(self.data, dict)
self.data.setdefault("acquired", None)
self.data.setdefault("avail", False)
@property
def acquired(self):
return self.data["acquired"]
@property
def avail(self):
return self.data["avail"]
@property
def cls(self):
return self.data["cls"]
@property
def params(self):
return self.data["params"]
@property
def args(self):
"""arguments for resource construction"""
args = self.data["params"].copy()
args.pop("extra", None)
return args
@property
def extra(self):
"""extra resource information"""
return self.data["params"].get("extra", {})
[docs]
def asdict(self):
return {
"cls": self.cls,
"params": self.params,
"acquired": self.acquired,
"avail": self.avail,
}
[docs]
def update(self, data):
"""apply updated information from the exporter on the coordinator"""
data = data.copy()
data.setdefault("acquired", None)
data.setdefault("avail", False)
self.data = data
[docs]
def acquire(self, place_name):
assert self.data["acquired"] is None
self.data["acquired"] = place_name
[docs]
def release(self):
# ignore repeated releases
self.data["acquired"] = None
[docs]
def as_pb2(self):
msg = labgrid_coordinator_pb2.Resource()
msg.cls = self.cls
params = self.params.copy()
extra = params.pop("extra", {})
set_map_from_dict(msg.params, params)
set_map_from_dict(msg.extra, extra)
if self.acquired is not None:
msg.acquired = self.acquired
msg.avail = self.avail
return msg
[docs]
@staticmethod
def data_from_pb2(pb2):
assert isinstance(pb2, labgrid_coordinator_pb2.Resource)
data = {
"cls": pb2.cls,
"params": build_dict_from_map(pb2.params),
"acquired": pb2.acquired or None,
"avail": pb2.avail,
}
data["params"]["extra"] = build_dict_from_map(pb2.extra)
return data
[docs]
@classmethod
def from_pb2(cls, pb2):
assert isinstance(pb2, labgrid_coordinator_pb2.Place)
return cls(cls.data_from_pb2(pb2))
[docs]
@attr.s(eq=True, repr=False, str=False)
# This class requires eq=True, since we put the matches into a list and require
# the cmp functions to be able to remove the matches from the list later on.
class ResourceMatch:
exporter = attr.ib()
group = attr.ib()
cls = attr.ib()
name = attr.ib(default=None)
# rename is just metadata, so don't use it for comparing matches
rename = attr.ib(default=None, eq=False)
[docs]
@classmethod
def fromstr(cls, pattern):
if not 2 <= pattern.count("/") <= 3:
raise ValueError(f"invalid pattern format '{pattern}' (use 'exporter/group/cls/name')")
return cls(*pattern.split("/"))
[docs]
def __repr__(self):
result = f"{self.exporter}/{self.group}/{self.cls}"
if self.name is not None:
result += f"/{self.name}"
return result
[docs]
def __str__(self):
result = repr(self)
if self.rename:
result += " -> " + self.rename
return result
[docs]
def ismatch(self, resource_path):
"""Return True if this matches the given resource"""
try:
exporter, group, cls, name = resource_path
except ValueError:
exporter, group, cls = resource_path
name = None
if not fnmatchcase(exporter, self.exporter):
return False
if not fnmatchcase(group, self.group):
return False
if not fnmatchcase(cls, self.cls):
return False
if name and self.name and not fnmatchcase(name, self.name):
return False
return True
[docs]
def as_pb2(self):
return labgrid_coordinator_pb2.ResourceMatch(
exporter=self.exporter,
group=self.group,
cls=self.cls,
name=self.name,
rename=self.rename,
)
[docs]
@classmethod
def from_pb2(cls, pb2):
assert isinstance(pb2, labgrid_coordinator_pb2.ResourceMatch)
return cls(
exporter=pb2.exporter,
group=pb2.group,
cls=pb2.cls,
name=pb2.name if pb2.HasField("name") else None,
rename=pb2.rename,
)
[docs]
@attr.s(eq=False)
class Place:
name = attr.ib()
aliases = attr.ib(default=attr.Factory(set), converter=set)
comment = attr.ib(default="")
tags = attr.ib(default=attr.Factory(dict))
matches = attr.ib(default=attr.Factory(list))
acquired = attr.ib(default=None)
acquired_resources = attr.ib(default=attr.Factory(list))
allowed = attr.ib(default=attr.Factory(set), converter=set)
created = attr.ib(default=attr.Factory(time.time))
changed = attr.ib(default=attr.Factory(time.time))
reservation = attr.ib(default=None)
[docs]
def asdict(self):
# in the coordinator, we have resource objects, otherwise just a path
acquired_resources = []
for resource in self.acquired_resources:
if isinstance(resource, (tuple, list)):
acquired_resources.append(resource)
else:
acquired_resources.append(resource.path)
return {
"aliases": list(self.aliases),
"comment": self.comment,
"tags": self.tags,
"matches": [attr.asdict(x) for x in self.matches],
"acquired": self.acquired,
"acquired_resources": acquired_resources,
"allowed": list(self.allowed),
"created": self.created,
"changed": self.changed,
"reservation": self.reservation,
}
[docs]
def update_from_pb2(self, place_pb2):
# FIXME untangle this...
place = Place.from_pb2(place_pb2)
fields = attr.fields_dict(type(self))
for k, v in place.asdict().items():
assert k in fields
if k == "name":
# we cannot rename places
assert v == self.name
continue
if k == "matches":
self.matches = [ResourceMatch.from_pb2(m) for m in place_pb2.matches]
continue
setattr(self, k, v)
[docs]
def show(self, level=0):
indent = " " * level
if self.aliases:
print(indent + f"aliases: {', '.join(sorted(self.aliases))}")
if self.comment:
print(indent + f"comment: {self.comment}")
if self.tags:
print(indent + f"tags: {', '.join(k + '=' + v for k, v in sorted(self.tags.items()))}")
print(indent + "matches:")
for match in sorted(self.matches):
print(indent + f" {match}")
print(indent + f"acquired: {self.acquired}")
print(indent + "acquired resources:")
# in the coordinator, we have resource objects, otherwise just a path
for resource in sorted(self.acquired_resources):
if isinstance(resource, (tuple, list)):
resource_path = resource
else:
resource_path = resource.path
match = self.getmatch(resource_path)
if match.rename:
print(indent + f" {'/'.join(resource_path)} -> {match.rename}")
else:
print(indent + f" {'/'.join(resource_path)}")
if self.allowed:
print(indent + f"allowed: {', '.join(self.allowed)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"changed: {datetime.fromtimestamp(self.changed)}")
if self.reservation:
print(indent + f"reservation: {self.reservation}")
[docs]
def getmatch(self, resource_path):
"""Return the ResourceMatch object for the given resource path or None if not found.
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches:
if match.ismatch(resource_path):
return match
return None
[docs]
def hasmatch(self, resource_path):
"""Return True if this place as a ResourceMatch object for the given resource path.
A resource_path has the structure (exporter, group, cls, name).
"""
return self.getmatch(resource_path) is not None
[docs]
def unmatched(self, resource_paths):
"""Returns a match which could not be matched to the list of resource_path
A resource_path has the structure (exporter, group, cls, name).
"""
for match in self.matches:
if not any([match.ismatch(resource) for resource in resource_paths]):
return match
[docs]
def touch(self):
self.changed = time.time()
[docs]
def as_pb2(self):
try:
acquired_resources = []
for resource in self.acquired_resources:
assert not isinstance(resource, (tuple, list)), "as_pb2() only implemented for coordinator"
assert len(resource.path) == 4
path = "/".join(resource.path)
acquired_resources.append(path)
place = labgrid_coordinator_pb2.Place()
place.name = self.name
place.aliases.extend(self.aliases)
place.comment = self.comment
place.matches.extend(m.as_pb2() for m in self.matches)
place.acquired = self.acquired or ""
place.acquired_resources.extend(acquired_resources)
place.allowed.extend(self.allowed)
place.changed = self.changed
place.created = self.created
if self.reservation:
place.reservation = self.reservation
for key, value in self.tags.items():
place.tags[key] = value
return place
except TypeError:
logging.exception("failed to convert place %s to protobuf", self)
raise
[docs]
@classmethod
def from_pb2(cls, pb2):
assert isinstance(pb2, labgrid_coordinator_pb2.Place)
acquired_resources = []
for path in pb2.acquired_resources:
path = path.split("/")
assert len(path) == 4
acquired_resources.append(path)
return cls(
name=pb2.name,
aliases=pb2.aliases,
comment=pb2.comment,
tags=dict(pb2.tags),
matches=[ResourceMatch.from_pb2(m) for m in pb2.matches],
acquired=pb2.acquired if pb2.HasField("acquired") and pb2.acquired else None,
acquired_resources=acquired_resources,
allowed=pb2.allowed,
created=pb2.created,
changed=pb2.changed,
reservation=pb2.reservation if pb2.HasField("reservation") else None,
)
[docs]
class ReservationState(enum.Enum):
waiting = 0
allocated = 1
acquired = 2
expired = 3
invalid = 4
[docs]
@attr.s(eq=False)
class Reservation:
owner = attr.ib(validator=attr.validators.instance_of(str))
token = attr.ib(
default=attr.Factory(lambda: "".join(random.choice(string.ascii_uppercase + string.digits) for i in range(10)))
)
state = attr.ib(
default="waiting",
converter=lambda x: x if isinstance(x, ReservationState) else ReservationState[x],
validator=attr.validators.instance_of(ReservationState),
)
prio = attr.ib(default=0.0, validator=attr.validators.instance_of(float))
# a dictionary of name -> filter dicts
filters = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
# a dictionary of name -> place names
allocations = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict))
created = attr.ib(default=attr.Factory(time.time))
timeout = attr.ib(default=attr.Factory(lambda: time.time() + 60))
[docs]
def asdict(self):
return {
"owner": self.owner,
"state": self.state.name,
"prio": self.prio,
"filters": self.filters,
"allocations": self.allocations,
"created": self.created,
"timeout": self.timeout,
}
[docs]
def refresh(self, delta=60):
self.timeout = max(self.timeout, time.time() + delta)
@property
def expired(self):
return self.timeout < time.time()
[docs]
def show(self, level=0):
indent = " " * level
print(indent + f"owner: {self.owner}")
print(indent + f"token: {self.token}")
print(indent + f"state: {self.state.name}")
if self.prio:
print(indent + f"prio: {self.prio}")
print(indent + "filters:")
for name, fltr in self.filters.items():
print(indent + f" {name}: {' '.join([(k + '=' + v) for k, v in fltr.items()])}")
if self.allocations:
print(indent + "allocations:")
for name, allocation in self.allocations.items():
print(indent + f" {name}: {', '.join(allocation)}")
print(indent + f"created: {datetime.fromtimestamp(self.created)}")
print(indent + f"timeout: {datetime.fromtimestamp(self.timeout)}")
[docs]
def as_pb2(self):
res = labgrid_coordinator_pb2.Reservation()
res.owner = self.owner
res.token = self.token
res.state = self.state.value
res.prio = self.prio
for name, fltr in self.filters.items():
res.filters[name].CopyFrom(labgrid_coordinator_pb2.Reservation.Filter(filter=fltr))
if self.allocations:
# TODO: refactor to have only one place per filter group
assert len(self.allocations) == 1
assert "main" in self.allocations
allocation = self.allocations["main"]
assert len(allocation) == 1
res.allocations.update({"main": allocation[0]})
res.created = self.created
res.timeout = self.timeout
return res
[docs]
@classmethod
def from_pb2(cls, pb2: labgrid_coordinator_pb2.Reservation):
filters = {}
for name, fltr_pb2 in pb2.filters.items():
filters[name] = dict(fltr_pb2.filter)
allocations = {}
for fltr_name, place_name in pb2.allocations.items():
allocations[fltr_name] = [place_name]
return cls(
owner=pb2.owner,
token=pb2.token,
state=ReservationState(pb2.state),
prio=pb2.prio,
filters=filters,
allocations=allocations,
created=pb2.created,
timeout=pb2.timeout,
)
async def queue_as_aiter(q):
try:
while True:
try:
item = await q.get()
except asyncio.CancelledError:
# gRPC doesn't like to receive exceptions from the request_iterator
return
if item is None:
return
yield item
q.task_done()
logging.debug("sent message %s", item)
except Exception:
logging.exception("error in queue_as_aiter")
raise