Patrick Kelley 8fd444092b initial
2025-05-07 15:35:15 -04:00

180 lines
5.3 KiB
Python

"""
A module for zkg's notion of "user variables": named values required
by packages that the user can provide in a variety of ways, including
responses to zkg's input prompting.
"""
import os
import re
import readline
def slugify(string):
"""Returns file-system-safe, lower-case version of the input string.
Any character sequence outside of ``[a-zA-Z0-9_]+`` gets replaced by a
single underscore. If the variable has no value or the value is an
empty string, returns the given default.
"""
return re.sub(r"[^\w]+", "_", string, flags=re.ASCII).lower()
def _rlinput(prompt, prefill=""):
"""Variation of input() that supports pre-filling a value."""
readline.set_startup_hook(lambda: readline.insert_text(prefill))
try:
return input(prompt)
finally:
readline.set_startup_hook()
class UserVar:
"""A class representing a single user variable.
User variables have a name and an optional description. They
resolve to a value using a cascade of mechanisms, including
command-line arguments (via --user-var), environment variables,
cached previous values, and user input. They may come with a
default value.
"""
def __init__(self, name, val=None, default=None, desc=None):
self._name = name
self._desc = desc or ""
self._val = val
self._default = default if default is not None else val
def name(self):
return self._name
def desc(self):
return self._desc
def set(self, val):
self._val = val
def val(self, fallback=None):
return self._val if self._val is not None else fallback
def default(self):
return self._default
def resolve(self, name, config, user_var_args=None, force=False):
"""Populates user variables with updated values and returns them.
This function resolves the variable in the following order:
(1) Use any value provided on the command line via --user-var
(2) If force is not used, prompt the user for input
(3) use an environment variables of the same name,
(4) retrieve from the provided config parser's "user_vars" section,
(5) use the default value of the user variable.
The resolved value is stored with the instance (to be
retrieved via .val() in the future) and also returned.
Args:
name (str): the requesting entity, e.g. a package name
config (configparser.ConfigParser): the zkg configuration
user_var_args (list of UserVar): user-var instances
provided via command line
force (bool): whether to skip prompting for input
Returns:
str: the resulting variable value
Raises:
ValueError: when we couldn't produce a value for the variable
"""
val = None
source = None
if user_var_args:
for uvar in user_var_args:
if uvar.name() == self._name:
val = uvar.val()
source = "command line"
break
if val is None:
val = os.environ.get(self._name)
if val:
source = "environment"
if source:
print(
f'"{name}" will use value of "{self._name}" ({self._desc}) from {source}: {val}',
)
self._val = val
return val
if val is None:
# Try to re-use a cached value in the subsequent prompt
val = config.get("user_vars", self._name, fallback=self._default)
if force:
if val is None:
raise ValueError(self._name)
self._val = val
return val
desc = " (" + self._desc + ")" if self._desc else ""
print(f'"{name}" requires a "{self._name}" value{desc}: ')
self._val = _rlinput(self._name + ": ", val)
return self._val
@staticmethod
def parse_arg(arg):
"""Parser for NAME=VAL format string used in command-line args."""
try:
name, val = arg.split("=", 1)
return UserVar(name, val=val)
except ValueError as error:
raise ValueError(
f'invalid user var argument "{arg}", must be NAME=VAR',
) from error
@staticmethod
def parse_dict(metadata_dict):
"""Returns list of UserVars from the metadata's 'user_vars' field.
Args:
metadata_dict (dict of str->str): the input metadata, e.g. from
a configparser entry value.
Returns:
list of UserVar. If the 'user_vars' field is not present,
an empty list is returned. If malformed, returns None.
"""
text = metadata_dict.get("user_vars")
if not text:
return []
rval = []
text = text.strip()
# Example: LIBRDKAFKA_ROOT [/usr] "Path to librdkafka installation"
entries = re.split(r'(\w+\s+\[.*\]\s+".*")\s+', text)
entries = list(filter(None, entries))
for entry in entries:
mob = re.match(r'(\w+)\s+\[(.*)\]\s+"(.*)"', entry)
if not mob:
return None
groups = mob.groups()
if len(groups) != 3:
return None
rval.append(UserVar(groups[0], val=groups[1], desc=groups[2]))
return rval