import os
from pathlib import Path
from typing import Dict, List, Callable, Generator, Union, Literal
from datasail.parsers import parse_datasail_args
from datasail.reader.utils import DATA_INPUT, MATRIX_INPUT
from datasail.routine import datasail_main
from datasail.settings import *
def error(msg: str, error_code: int, cli: bool) -> None:
"""
Print an error message with an individual error code to the commandline. Afterward, the program is ended.
Args:
msg: Error message
error_code: Code of the error to identify it
cli: boolean flag indicating that this program has been started from commandline
"""
LOGGER.error(msg)
if cli:
exit(error_code)
else:
raise ValueError(msg)
def validate_args(**kwargs) -> Dict[str, object]:
"""
Validate the arguments given to the program.
Notes:
next error code: 26
Args:
**kwargs: Arguments in kwargs-format
Returns:
The kwargs in case something has been adjusted, e.g. splits normalization or naming
"""
# create output directory
output_created = False
if kwargs[KW_OUTDIR] is not None and not kwargs[KW_OUTDIR].is_dir():
output_created = True
kwargs[KW_OUTDIR].mkdir(parents=True, exist_ok=True)
LOGGER.setLevel(VERB_MAP[kwargs[KW_VERBOSE]])
LOGGER.handlers[0].setLevel(level=VERB_MAP[kwargs[KW_VERBOSE]])
if kwargs[KW_OUTDIR] is not None:
kwargs[KW_LOGDIR] = kwargs[KW_OUTDIR] / "logs"
kwargs[KW_LOGDIR].mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(kwargs[KW_LOGDIR] / "general.log")
file_handler.setLevel(level=VERB_MAP[kwargs[KW_VERBOSE]])
file_handler.setFormatter(FORMATTER)
LOGGER.addHandler(file_handler)
else:
kwargs[KW_LOGDIR] = None
if output_created:
LOGGER.warning("Output directory does not exist, DataSAIL creates it automatically")
LOGGER.info("Validating arguments")
# check splits to be more than 1 and their fractions sum up to 1 and check the names
if len(kwargs[KW_SPLITS]) < 2:
error("Less then two splits required. This is no useful input, please check the input again.", 1,
kwargs[KW_CLI])
if kwargs[KW_NAMES] is None:
kwargs[KW_NAMES] = [f"Split{x:03d}" for x in range(len(kwargs[KW_SPLITS]))]
elif len(kwargs[KW_SPLITS]) != len(kwargs[KW_NAMES]):
error("Different number of splits and names. You have to give the same number of splits and names for "
"them.",2, kwargs[KW_CLI])
elif len(kwargs[KW_NAMES]) != len(set(kwargs[KW_NAMES])):
error("At least two splits will have the same name. Please check the naming of the splits again to have "
"unique names", 24, kwargs[KW_CLI])
kwargs[KW_SPLITS] = [x / sum(kwargs[KW_SPLITS]) for x in kwargs[KW_SPLITS]]
# check search termination criteria
if kwargs[KW_MAX_SEC] < 1:
error("The maximal search time must be a positive integer.", 3, kwargs[KW_CLI])
if kwargs[KW_THREADS] < 0:
error("The number of threads to use has to be a non-negative integer.", 23, kwargs[KW_CLI])
if kwargs[KW_THREADS] == 0:
kwargs[KW_THREADS] = os.cpu_count()
else:
kwargs[KW_THREADS] = min(kwargs[KW_THREADS], os.cpu_count())
# check the interaction file
if kwargs[KW_INTER] is not None and isinstance(kwargs[KW_INTER], Path) and not kwargs[KW_INTER].is_file():
error("The interaction filepath is not valid.", 5, kwargs[KW_CLI])
# check the epsilon value
if 1 < kwargs[KW_DELTA] or kwargs[KW_DELTA] < 0:
error("The delta value has to be a real value between 0 and 1.", 6, kwargs[KW_CLI])
# check the epsilon value
if 1 < kwargs[KW_EPSILON] or kwargs[KW_EPSILON] < 0:
error("The epsilon value has to be a real value between 0 and 1.", 6, kwargs[KW_CLI])
# check number of runs to be a positive integer
if kwargs[KW_RUNS] < 1:
error("The number of runs cannot be lower than 1.", 25, kwargs[KW_CLI])
# check the input regarding the caching
if kwargs[KW_CACHE] and kwargs[KW_CACHE_DIR] is not None:
kwargs[KW_CACHE_DIR] = Path(kwargs[KW_CACHE_DIR])
if not kwargs[KW_CACHE_DIR].is_dir():
LOGGER.warning("Cache directory does not exist, DataSAIL creates it automatically")
kwargs[KW_CACHE_DIR].mkdir(parents=True, exist_ok=True)
if kwargs[KW_LINKAGE] not in ["average", "single", "complete"]:
error("The linkage method has to be one of 'mean', 'single', or 'complete'.", 26, kwargs[KW_CLI])
# syntactically parse the input data for the E-dataset
if kwargs[KW_E_DATA] is not None and isinstance(kwargs[KW_E_DATA], Path) and not kwargs[KW_E_DATA].exists():
error("The filepath to the E-data is invalid.", 7, kwargs[KW_CLI])
if kwargs[KW_E_WEIGHTS] is not None and isinstance(kwargs[KW_E_WEIGHTS], Path) and not kwargs[KW_E_WEIGHTS].is_file():
error("The filepath to the weights of the E-data is invalid.", 8, kwargs[KW_CLI])
if kwargs[KW_E_STRAT] is not None and isinstance(kwargs[KW_E_STRAT], Path) and not kwargs[KW_E_STRAT].is_file():
error("The filepath to the stratification of the E-data is invalid.", 11, kwargs[KW_CLI])
if kwargs[KW_E_SIM] is not None and isinstance(kwargs[KW_E_SIM], Path) and not kwargs[KW_E_SIM].is_file():
error(f"The similarity metric for the E-data seems to be a file-input but the filepath is invalid.", 9, kwargs[KW_CLI])
if kwargs[KW_E_DIST] is not None and isinstance(kwargs[KW_E_DIST], Path) and not kwargs[KW_E_DIST].is_file():
error(f"The distance metric for the E-data seems to be a file-input but the filepath is invalid.", 10, kwargs[KW_CLI])
if kwargs[KW_E_CLUSTERS] < 1:
error("The number of clusters to find in the E-data has to be a positive integer.", 12, kwargs[KW_CLI])
# syntactically parse the input data for the F-dataset
if kwargs[KW_F_DATA] is not None and isinstance(kwargs[KW_F_DATA], Path) and not kwargs[KW_F_DATA].exists():
error("The filepath to the F-data is invalid.", 13, kwargs[KW_CLI])
if kwargs[KW_F_WEIGHTS] is not None and isinstance(kwargs[KW_F_WEIGHTS], Path) and not kwargs[KW_F_WEIGHTS].is_file():
error("The filepath to the weights of the F-data is invalid.", 14, kwargs[KW_CLI])
if kwargs[KW_E_STRAT] is not None and isinstance(kwargs[KW_E_STRAT], Path) and not kwargs[KW_E_STRAT].is_file():
error("The filepath to the stratification of the E-data is invalid.", 20, kwargs[KW_CLI])
if kwargs[KW_F_SIM] is not None and isinstance(kwargs[KW_F_SIM], Path) and not kwargs[KW_F_SIM].is_file():
error(f"The similarity metric for the F-data seems to be a file-input but the filepath is invalid.", 15, kwargs[KW_CLI])
if kwargs[KW_F_DIST] is not None and isinstance(kwargs[KW_F_DIST], Path) and not kwargs[KW_F_DIST].is_file():
error(f"The distance metric for the F-data seems to be a file-input but the filepath is invalid.", 16, kwargs[KW_CLI])
if kwargs[KW_F_CLUSTERS] < 1:
error("The number of clusters to find in the F-data has to be a positive integer.", 17, kwargs[KW_CLI])
return kwargs
def to_path(x):
return Path(x) if isinstance(x, str) and x not in ALGOS + FP_OPTIONS else x
[docs]def datasail(
techniques: Union[str, List[str], Callable[..., List[str]], Generator[str, None, None]] = None,
inter: Optional[
Union[str, Path, List[Tuple[str, str]], Callable[..., List[str]], Generator[str, None, None]]
] = None,
output: Optional[Union[str, Path]] = None,
max_sec: int = 100,
verbose: str = "W",
splits: List[float] = None,
names: List[str] = None,
delta: float = 0.05,
epsilon: float = 0.05,
runs: int = 1,
solver: str = SOLVER_SCIP,
cache: bool = False,
cache_dir: Union[str, Path] = None,
linkage: Literal["average", "single", "complete"] = "average",
overflow: Literal["assign", "break"] = "assign",
e_type: str = None,
e_data: DATA_INPUT = None,
e_weights: DATA_INPUT = None,
e_strat: DATA_INPUT = None,
e_sim: MATRIX_INPUT = None,
e_dist: MATRIX_INPUT = None,
e_args: str = "",
e_clusters: int = 50,
f_type: str = None,
f_data: DATA_INPUT = None,
f_weights: DATA_INPUT = None,
f_strat: DATA_INPUT = None,
f_sim: MATRIX_INPUT = None,
f_dist: MATRIX_INPUT = None,
f_args: str = "",
f_clusters: int = 50,
threads: int = 1,
) -> Tuple[Dict, Dict, Dict]:
"""
Entry point for the package usage of DataSAIL.
Args:
techniques: List of techniques to split based on
inter: Filepath to a TSV file storing interactions of the e-entities and f-entities.
output: Output directory to store the results in.
max_sec: Maximal number of seconds to take for optimizing a found solution.
verbose: Verbosity level for logging.
splits: List of splits, have to add up to one, otherwise scaled accordingly.
names: List of names of the splits.
epsilon: Fraction by how much the provided split sizes may be undercut
delta: Fraction by how much the stratification may be undercut
runs: Number of runs to perform per split. This may introduce some variance in the splits.
solver: Solving algorithm to use.
cache: Boolean flag indicating to store or load results from cache.
cache_dir: Directory to store the cache in if not the default location.
linkage: Linkage method to use to compute metrics between merged clusters.
e_type: Data format of the first batch of data
e_data: Data file of the first batch of data
e_weights: Weighting of the datapoints from e_data
e_strat: Stratification of the datapoints from e_data
e_sim: Similarity measure to apply for the e-data
e_dist: Distance measure to apply for the e-data
e_args: Additional arguments for the tools in e_sim or e_dist
e_clusters: Number of clusters to find in the e-data
f_type: Data format of the second batch of data
f_data: Data file of the second batch of data
f_weights: Weighting of the datapoints from f-data
f_strat: Stratification of the datapoints from f-data
f_sim: Similarity measure to apply for the f-data
f_dist: Distance measure to apply for the f-data
f_args: Additional arguments for the tools in f_sim or f-dist
f_clusters: Number of clusters to find in the f-data
threads: number of threads to use for one CD-HIT run
Returns:
Three dictionaries mapping techniques to another dictionary. The inner dictionary maps input id to their splits.
"""
kwargs = validate_args(
output=to_path(output), techniques=techniques, inter=to_path(inter), max_sec=max_sec, verbosity=verbose,
splits=splits, names=names, delta=delta, epsilon=epsilon, runs=runs, solver=solver, cache=cache,
cache_dir=to_path(cache_dir), linkage=linkage, e_type=e_type, e_data=to_path(e_data),
e_weights=to_path(e_weights), e_strat=to_path(e_strat), e_sim=to_path(e_sim), e_dist=to_path(e_dist),
e_args=e_args, e_clusters=e_clusters, f_type=f_type, f_data=to_path(f_data), f_weights=to_path(f_weights),
f_strat=to_path(f_strat), f_sim=to_path(f_sim), f_dist=to_path(f_dist), f_args=f_args, f_clusters=f_clusters,
threads=threads, cli=False, overflow=overflow,
)
return datasail_main(**kwargs)
def sail(args=None, **kwargs) -> None:
"""
Entry point for the CLI tool. Invocation routine of DataSAIL. Here, the arguments are validated and the main
routine is invoked.
"""
if kwargs is None or len(kwargs) == 0:
kwargs = parse_datasail_args(args or sys.argv[1:])
kwargs = {key: (kwargs[key] if key in kwargs else val) for key, val in DEFAULT_KWARGS.items()}
kwargs[KW_CLI] = True
for kwarg in [KW_OUTDIR, KW_INTER, KW_CACHE_DIR, KW_E_DATA, KW_E_WEIGHTS, KW_E_STRAT,
KW_E_SIM, KW_E_DIST, KW_F_DATA, KW_F_WEIGHTS, KW_F_STRAT, KW_F_SIM, KW_F_DIST]:
if kwarg in kwargs:
kwargs[kwarg] = to_path(kwargs[kwarg])
kwargs = validate_args(**kwargs)
datasail_main(**kwargs)