"""Utility functions."""
import os
from configparser import ConfigParser
from contextlib import contextmanager
from copy import deepcopy
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Any, Dict, List, Optional, Tuple, Union
from packaging.specifiers import SpecifierSet
from pkg_resources import parse_requirements
from tomlkit import TOMLDocument, load
from tomlkit.container import Container
from tomlkit.items import Array, Item, String, Table
from .logger import get_logger
LOG = get_logger(__name__)
def _run_command(*args) -> Tuple[str, int]:
"""Run a command using ``subprocess.Popen``.
Parameters
----------
*args
Arguments for the command.
Returns
-------
str
The output
int
The exit code
Raises
------
RuntimeError
Error raised when the command is not successfully executed.
"""
LOG.debug(f"Running the following command: \n\n {' '.join(args)}")
popen = Popen(args, stdout=PIPE, universal_newlines=True)
out, _ = popen.communicate()
if popen.returncode:
raise RuntimeError(
f"Unable to run the following command: \n\n {' '.join(args)}"
)
return out, popen.returncode
[docs]@contextmanager
def pushd(new_dir: str):
"""Create a context manager for running commands in sub-directories.
Parameters
----------
new_dir : str
The relative directory to run the command in.
"""
curr_dir = Path.cwd()
os.chdir(curr_dir / new_dir)
try:
yield
finally:
os.chdir(curr_dir)
def _convert_toml_array_to_string(item: Union[Item, Any]) -> str:
if isinstance(item, Array):
return "\n".join(item)
elif isinstance(item, String):
return str(item)
else:
raise ValueError
[docs]def convert_requirements(requirements: str, conf: Optional[Dict] = None) -> Dict:
"""Generate environments for a newline-separate list of package requirements.
This function will generate one environment per entry with an additional environment
that upgrades all requirements simultaneously.
Parameters
----------
requirements : str
The requirements string.
conf : dict, optional (default None)
An existing configuration to edit.
Returns
-------
Dict
A configuration dictionary.
"""
conf = {"envs": []} if conf is None else conf
pkgs = [pkg.project_name for pkg in parse_requirements(requirements)]
for pkg in pkgs:
conf["envs"].append({})
conf["envs"][-1]["name"] = pkg
conf["envs"][-1]["upgrade"] = pkg
# Create an environment with all requirements upgraded
conf["envs"].append({})
conf["envs"][-1]["name"] = "all-requirements"
conf["envs"][-1]["upgrade"] = "\n".join(pkgs)
return conf
[docs]def gen_requirements_config(fname_or_buf: str, **options) -> Dict:
"""Generate a configuration file from package requirements.
This function will convert the package installation requirements to a configuration
file with one environment per requirement.
Parameters
----------
fname_or_buf : str
Path to the requirements file to parse using ``pkg_resources.parse_requirements``
or the string representing the requirements file.
**options
Options to apply to each test environment.
Returns
-------
Dict
The configuration file.
"""
# First, get the requirements
if Path(fname_or_buf).is_file():
with open(fname_or_buf) as infile:
cfg = infile.read()
else:
cfg = fname_or_buf
output = convert_requirements(requirements=cfg)
for index in range(len(output["envs"])):
output["envs"][index].update(options)
return output
[docs]def parse_cfg(filename: str = "setup.cfg", requirements: Optional[str] = None) -> Dict:
"""Generate a configuration from a ``.ini`` style file.
This function can operate in two ways. First, it can look for sections that
start with ``edgetest`` and build a configuration. Suppose
you have ``setup.cfg`` as follows:
.. code-block:: ini
[edgetest.envs.pandas]
upgrade =
pandas
This will result in a configuration that has one testing environment, named
``pandas``, that upgrades the ``pandas`` package.
If you don't have any sections that start with ``edgetest.envs``, we will look for
the PEP 517-style ``setup.cfg`` install requirements (the ``install_requires`` key
within the ``options`` section). To set global defaults for you environments, use
the ``edgetest`` section:
.. code-block:: ini
[edgetest]
extras =
tests
command =
pytest tests -m "not integration"
[edgetest.envs.pandas]
upgrade =
pandas
For this single environment file, the above configuration is equivalent to
.. code-block:: ini
[edgetest.envs.pandas]
extras =
tests
command =
pytest tests -m "not integration"
upgrade =
pandas
Parameters
----------
filename : str, optional (default "setup.cfg")
The name of the configuration file to read. Defaults to ``setup.cfg``.
requirements : str, optional (default None)
An optional path to the requirements text file. If there are no PEP-517
style dependencies or coded environments in the edgetest configuration, this
function will look for dependencies in the requirements file.
Returns
-------
Dict
A configuration dictionary for ``edgetest``.
"""
# Read in the configuration file
config = ConfigParser()
config.read(filename)
# Parse
output: Dict = {"envs": []}
# Get any global options if necessary
options = dict(config["edgetest"]) if "edgetest" in config else {}
# Next, create the sections
for section in config.sections():
if not section.startswith("edgetest."):
continue
# Look for the special ``envs`` key
section_name = section.split(".")
if section_name[1] == "envs":
output["envs"].append(dict(config[section]))
output["envs"][-1]["name"] = section_name[2]
if (
"lower" in output["envs"][-1]
and "options" in config
and "install_requires" in config["options"]
):
output["envs"][-1]["lower"] = get_lower_bounds(
config.get("options", "install_requires"),
output["envs"][-1]["lower"],
)
else:
output[section_name[1]] = dict(config[section])
if len(output["envs"]) == 0:
if config.get("options", "install_requires"):
output = convert_requirements(
requirements=config["options"]["install_requires"], conf=output
)
elif requirements:
req_conf = gen_requirements_config(fname_or_buf=requirements)
output["envs"] = req_conf["envs"]
else:
raise ValueError("Please supply a valid list of environments to create.")
# Apply global environment options (without overwriting)
for idx in range(len(output["envs"])):
output["envs"][idx] = dict(
list(options.items()) + list(output["envs"][idx].items())
)
return output
[docs]def parse_toml(
filename: str = "pyproject.toml", requirements: Optional[str] = None
) -> Dict:
"""Generate a configuration from a ``.toml`` style file.
This function can operate in two ways. First, it will look for tables that
start with ``edgetest`` and build a configuration. Suppose
you have ``pyproject.toml`` as follows:
.. code-block:: toml
[edgetest.envs.pandas]
upgrade = [
"pandas"
]
This will result in a configuration that has one testing environment, named
``pandas``, that upgrades the ``pandas`` package.
If you don't have any tables that start with ``edgetest.envs``, we will look for
the installation requirements (the ``dependencies`` key within the ``project`` section).
To set global defaults for you environments, use the ``edgetest`` table:
.. code-block:: toml
[edgetest]
extras = [
"tests"
]
command = "pytest tests -m 'not integration'"
[edgetest.envs.pandas]
upgrade = [
"pandas"
]
For this single environment file, the above configuration is equivalent to
.. code-block:: toml
[edgetest.envs.pandas]
extras = [
"tests"
]
command = "pytest tests -m 'not integration'"
upgrade = [
"pandas"
]
Parameters
----------
filename : str, optional (default "pyproject.toml")
The name of the toml file to read. Defaults to ``pyproject.toml``.
requirements : str, optional (default None)
An optional path to the requirements text file. If there are no TOML
style dependencies or coded environments in the edgetest configuration, this
function will look for dependencies in the requirements file.
Returns
-------
Dict
A configuration dictionary for ``edgetest``.
"""
options: Union[Item, Container, dict]
# Read in the configuration file
config: TOMLDocument = load(open(filename))
# Parse
output: Dict = {"envs": []}
# Get any global options if necessary. First scan through and pop out any Tables
temp_config = deepcopy(config)
if "edgetest" in config:
for j in config["edgetest"].items(): # type: ignore
if isinstance(config["edgetest"][j[0]], Table): # type: ignore
_ = temp_config["edgetest"].pop( # type: ignore
j[0], None
) # remove Tables from the temp config
else:
temp_config["edgetest"][j[0]] = _convert_toml_array_to_string( # type: ignore
temp_config["edgetest"][j[0]] # type: ignore
)
options = temp_config["edgetest"]
else:
options = {}
# Check envs exists and any other Tables
if "edgetest" in config:
for section in config["edgetest"]: # type: ignore
if section == "envs":
for env in config["edgetest"]["envs"]: # type: ignore
for item in config["edgetest"]["envs"][env]: # type: ignore
# If an Array then decompose to a string format
config["edgetest"]["envs"][env][ # type: ignore
item
] = _convert_toml_array_to_string(
config["edgetest"]["envs"][env][item] # type: ignore
)
output["envs"].append(dict(config["edgetest"]["envs"][env])) # type: ignore
output["envs"][-1]["name"] = env
if (
"lower" in output["envs"][-1]
and "project" in config
and "dependencies" in config["project"] # type: ignore
):
output["envs"][-1]["lower"] = get_lower_bounds(
dict(config["project"])["dependencies"], # type: ignore
output["envs"][-1]["lower"],
)
elif isinstance(config["edgetest"][section], Table): # type: ignore
output[section] = dict(config["edgetest"][section]) # type: ignore
if len(output["envs"]) == 0:
if config.get("project").get("dependencies"): # type: ignore
output = convert_requirements(
requirements="\n".join(config["project"]["dependencies"]), # type: ignore
conf=output, # type: ignore # noqa: E501
)
elif requirements:
req_conf = gen_requirements_config(fname_or_buf=requirements)
output["envs"] = req_conf["envs"]
else:
raise ValueError("Please supply a valid list of environments to create.")
# Apply global environment options (without overwriting)
for idx in range(len(output["envs"])):
output["envs"][idx] = dict(
list(options.items()) + list(output["envs"][idx].items()) # type: ignore
)
return output
[docs]def upgrade_requirements(
fname_or_buf: str, upgraded_packages: List[Dict[str, str]]
) -> str:
"""Create an upgraded requirements file.
Parameters
----------
fname_or_buf : str
Path to the requirements file to parse using ``pkg_resources.parse_requirements``
or the string representing the requirements file.
upgraded_packages : list
A list of packages upgraded in the testing procedure.
Returns
-------
str
The string file representing the new requirements file.
"""
# Get the existing file
try:
if Path(fname_or_buf).is_file():
with open(fname_or_buf) as infile:
cfg = infile.read()
else:
cfg = fname_or_buf
except OSError:
# Filename too long for the is_file() function
cfg = fname_or_buf
pkgs = [pkg for pkg in parse_requirements(cfg)]
upgrades = {pkg["name"]: pkg["version"] for pkg in upgraded_packages}
for pkg in pkgs:
if pkg.project_name not in upgrades:
continue
# Replace the spec
specs = deepcopy(pkg.specs)
for index, value in enumerate(specs):
if value[0] == "<=":
pkg.specs[index] = ("<=", upgrades[pkg.project_name])
elif value[0] == "<":
pkg.specs[index] = ("!=", value[1])
pkg.specs.append(("<=", upgrades[pkg.project_name]))
elif value[0] == "==":
pkg.specs = [(">=", value[1]), ("<=", upgrades[pkg.project_name])]
pkg.specifier = SpecifierSet(",".join("".join(spec) for spec in pkg.specs)) # type: ignore
return "\n".join(str(pkg) for pkg in pkgs)
[docs]def upgrade_setup_cfg(
upgraded_packages: List[Dict[str, str]], filename: str = "setup.cfg"
) -> ConfigParser:
"""Upgrade the ``setup.cfg`` file.
Parameters
----------
upgraded_packages : List[Dict[str, str]]
A list of packages upgraded in the testing procedure.
filename : str, optional (default "setup.cfg")
The name of the configuration file to read. Defaults to ``setup.cfg``.
Returns
-------
ConfigParser
The updated configuration file.
"""
parser = ConfigParser()
parser.read(filename)
if "options" in parser and parser.get("options", "install_requires"):
LOG.info(f"Updating the requirements in {filename}")
upgraded = upgrade_requirements(
fname_or_buf=parser["options"]["install_requires"].lstrip(),
upgraded_packages=upgraded_packages,
)
parser["options"]["install_requires"] = "\n" + upgraded
# Update the extras, if necessary
if "options.extras_require" in parser:
for extra, dependencies in parser.items("options.extras_require"):
upgraded = upgrade_requirements(
fname_or_buf=dependencies,
upgraded_packages=upgraded_packages,
)
parser["options.extras_require"][extra] = "\n" + upgraded
return parser
[docs]def upgrade_pyproject_toml(
upgraded_packages: List[Dict[str, str]], filename: str = "pyproject.toml"
) -> TOMLDocument:
"""Upgrade the ``pyproject.toml`` file.
Parameters
----------
upgraded_packages : List[Dict[str, str]]
A list of packages upgraded in the testing procedure.
filename : str, optional (default "pyproject.toml")
The name of the configuration file to read. Defaults to ``pyproject.toml``.
Returns
-------
TOMLDocument
The updated TOMLDocument.
"""
parser: TOMLDocument = load(open(filename))
if "project" in parser and parser.get("project").get("dependencies"): # type: ignore
LOG.info(f"Updating the requirements in {filename}")
upgraded = upgrade_requirements(
fname_or_buf="\n".join(parser["project"]["dependencies"]), # type: ignore
upgraded_packages=upgraded_packages,
)
parser["project"]["dependencies"] = upgraded.split("\n") # type: ignore
# Update the extras, if necessary
if parser.get("project").get("optional-dependencies"): # type: ignore
for extra, dependencies in parser["project"]["optional-dependencies"].items(): # type: ignore # noqa: E501
upgraded = upgrade_requirements(
fname_or_buf="\n".join(dependencies),
upgraded_packages=upgraded_packages,
)
parser["project"]["optional-dependencies"][extra] = upgraded.split("\n") # type: ignore
return parser
def _isin_case_dashhyphen_ins(a: str, vals: List[str]) -> bool:
"""Run isin check that is case and dash/hyphen insensitive.
Paramaters
----------
a : str
String value to check for membership against ``vals``.
vals : list of str
List of strings to check ``a`` against.
Returns
-------
bool
Return ``True`` if ``a`` in vals, otherwise ``False``.
"""
for b in vals:
if a.replace("_", "-").lower() == b.replace("_", "-").lower():
return True
return False
[docs]def get_lower_bounds(requirements: str, lower: str) -> str:
r"""Get lower bounds of requested packages from installation requirements.
Parses through the project ``requirements`` and the newline-delimited
packages requested in ``lower`` to find the lower bounds.
Parameters
----------
requirements : str
Project setup requirements,
e.g. ``"pandas>=1.5.1,<=1.4.2\nnumpy>=1.22.1,<=1.25.4"``
lower : str
Newline-delimited packages requested,
e.g. ``"pandas\nnumpy"``.
Returns
-------
str
The packages along with the lower bound, e.g. ``"pandas==1.5.1\nnumpy==1.22.1"``.
"""
all_lower_bounds = {
pkg.project_name + (f"[{','.join(pkg.extras)}]" if pkg.extras else ""): dict(
pkg.specs
).get(">=")
for pkg in parse_requirements(requirements)
}
lower_with_bounds = ""
for pkg_name, lower_bound in all_lower_bounds.items():
# TODO: Parse through extra requirements as well to get lower bounds
if lower_bound is None:
LOG.warning(
"Requested %s lower bound, but did not find in install requirements.",
pkg_name,
)
elif _isin_case_dashhyphen_ins(pkg_name, lower.split("\n")):
lower_with_bounds += f"{pkg_name}=={lower_bound}\n"
return lower_with_bounds