API¶
This part of the documentation covers all the interfaces of Mara Page. For parts where the package depends on external libraries, we document the most important right here and provide links to the canonical documentation.
Incremental processing¶
- mara_pipelines.incremental_processing.file_dependencies.update(node_path: List[str], dependency_type: str, pipeline_base_path: str, file_dependencies: List[str])¶
Stores the combined hash of a list of files
- Parameters
node_path – The path of the node that depends on the files
dependency_type – An arbitrary string that allows to distinguish between multiple dependencies of a node
pipeline_base_path – The base directory of the pipeline
file_dependencies – A list of file names relative to pipeline_base_path
- mara_pipelines.incremental_processing.file_dependencies.delete(node_path: List[str], dependency_type: str)¶
Delets the combined hash of a list of files for that node and dependency type
- Parameters
node_path – The path of the node that depends on the files
dependency_type – An arbitrary string that allows to distinguish between multiple dependencies of a node
- mara_pipelines.incremental_processing.file_dependencies.is_modified(node_path: List[str], dependency_type: str, pipeline_base_path: str, file_dependencies: List[str])¶
Checks whether a list of files have been modified since the last pipeline run
- Parameters
node_path – The path of the node that depends on the files
dependency_type – An arbitrary string that allows to distinguish between multiple dependencies of a node
pipeline_base_path – The base directory of the pipeline
file_dependencies – A list of file names relative to pipeline_base_path
Returns: True when at least one of the files was modified
- mara_pipelines.incremental_processing.file_dependencies.hash(pipeline_base_path: pathlib.Path, file_dependencies: List[str]) str ¶
Creates a combined hash of the content of a list of files
- Parameters
pipeline_base_path – The base directory of the pipeline
file_dependencies – A list of file names relative to pipeline_base_path
Returns: a combined content hash
- mara_pipelines.incremental_processing.incremental_copy_status.update(node_path: List[str], source_db_alias: str, source_table: str, last_comparison_value)¶
Updates the last_comparison_value for a pipeline node and table :param node_path: The path of the parent pipeline node :param source_db_alias: The alias of the the db from which data is copied :param source_table: The table from which is copied :param max_modification_value: The last retrieved modification value
Returns:
- mara_pipelines.incremental_processing.incremental_copy_status.delete(node_path: List[str], source_db_alias: str, source_table: str)¶
Deletes the last_comparison_value for a pipeline node and table :param node_path: The path of the parent pipeline node :param source_db_alias: The alias of the the db from which data is copied :param source_table: The table from which is copied
Returns:
- mara_pipelines.incremental_processing.incremental_copy_status.get_last_comparison_value(node_path: List[str], source_db_alias: str, source_table: str)¶
Returns the last comparison value for a pipeline node and table :param node_path: The path of the parent pipeline node :param source_db_alias: The alias of the the db from which data is copied :param source_table: The table from which is copied
- Returns
The value or None
- mara_pipelines.incremental_processing.processed_files.track_processed_file(node_path: str, file_name: str, last_modified_timestamp: datetime.datetime)¶
Records that a file has been ‘processed’ by a node
- Parameters
node_path – The path of the node that processed the file
file_name – The name of the file that has been processed
last_modified_timestamp – The time when the file was modified last
Returns: True
- mara_pipelines.incremental_processing.processed_files.already_processed_files(node_path: str) Dict[str, datetime.datetime] ¶
Returns all files that already have been processed by a node :param node_path: The path of the node that processed the file
- Returns
A mapping of file names to timestamps of last modification
- mara_pipelines.incremental_processing.reset.reset_incremental_processing(node_path: List[str])¶
Recursively resets all incremental processing status information that is stored in the mara db :param node_path: The path of the node to reset
Events¶
- class mara_pipelines.events.Event¶
Base class for events that are emitted from mara.
- class mara_pipelines.events.EventHandler¶
- mara_pipelines.events.notify_configured_event_handlers(event: mara_pipelines.events.Event)¶
Pipelines¶
- class mara_pipelines.pipelines.Pipeline(id: str, description: str, max_number_of_parallel_tasks: Optional[int] = None, base_path: Optional[pathlib.Path] = None, labels: Optional[Dict[str, str]] = None, ignore_errors: bool = False, force_run_all_children: bool = False)¶
A directed acyclic graph (DAG) of nodes with dependencies between them.
- Parameters
id – The id of the pipeline
description – A short summary of what the pipeline is doing
max_number_of_parallel_tasks – Only that many nodes of the pipeline will run in parallel
base_path – The absolute path of the pipeline root, file names are relative to that
labels – An arbitrary dictionary application specific tags, schemas and so on.
ignore_errors – When true, then the pipeline execution will not fail when a child node fails
force_run_all_children – When true, child nodes will run even when their upstreams failed
- class mara_pipelines.pipelines.Task(id: str, description: str, commands: Optional[List[mara_pipelines.pipelines.Command]] = None, max_retries: Optional[int] = None)¶
- class mara_pipelines.pipelines.Command¶
Base class for operations that can run inside a pipeline task
- Parameters
parent – The pipeline node that contains this command (needed for debug output)
- mara_pipelines.pipelines.find_node(path: List[str]) Tuple[mara_pipelines.pipelines.Node, bool] ¶
Retrieves a node by the the path from its parents :param path: The ids of the node and of all its parent, from top to bottom
- Returns
A tuple of the node and True if the node was found, or a the closest known parent node if and False otherwise
Shell¶
- mara_pipelines.shell.run_shell_command(command: str, log_command: bool = True) Union[List[str], bool] ¶
Runs a command in a bash shell and logs the output of the command in (near)real-time.
- Parameters
command – The command to run
log_command – When true, then the command itself is logged before execution
- Returns
Either (in order) - False when the exit code of the command was not 0 - True when there was no output to stdout - The output to stdout, as an array of lines
- mara_pipelines.shell.sed_command(replace: Dict[str, str]) str ¶
Creates a sed command string from a dictionary of replacements
Examples
>>> print(sed_command({'foo':'a','bar':'b'})) sed "s/foo/a/g; s/bar/b/g"