API

This part of the documentation covers all the interfaces of Mara Page. For parts where the package depends on external libraries, we document the most important right here and provide links to the canonical documentation.

Incremental processing

mara_pipelines.incremental_processing.file_dependencies.update(node_path: List[str], dependency_type: str, pipeline_base_path: str, file_dependencies: List[str])

Stores the combined hash of a list of files

Parameters
  • node_path – The path of the node that depends on the files

  • dependency_type – An arbitrary string that allows to distinguish between multiple dependencies of a node

  • pipeline_base_path – The base directory of the pipeline

  • file_dependencies – A list of file names relative to pipeline_base_path

mara_pipelines.incremental_processing.file_dependencies.delete(node_path: List[str], dependency_type: str)

Delets the combined hash of a list of files for that node and dependency type

Parameters
  • node_path – The path of the node that depends on the files

  • dependency_type – An arbitrary string that allows to distinguish between multiple dependencies of a node

mara_pipelines.incremental_processing.file_dependencies.is_modified(node_path: List[str], dependency_type: str, pipeline_base_path: str, file_dependencies: List[str])

Checks whether a list of files have been modified since the last pipeline run

Parameters
  • node_path – The path of the node that depends on the files

  • dependency_type – An arbitrary string that allows to distinguish between multiple dependencies of a node

  • pipeline_base_path – The base directory of the pipeline

  • file_dependencies – A list of file names relative to pipeline_base_path

Returns: True when at least one of the files was modified

mara_pipelines.incremental_processing.file_dependencies.hash(pipeline_base_path: pathlib.Path, file_dependencies: List[str]) str

Creates a combined hash of the content of a list of files

Parameters
  • pipeline_base_path – The base directory of the pipeline

  • file_dependencies – A list of file names relative to pipeline_base_path

Returns: a combined content hash


mara_pipelines.incremental_processing.incremental_copy_status.update(node_path: List[str], source_db_alias: str, source_table: str, last_comparison_value)

Updates the last_comparison_value for a pipeline node and table :param node_path: The path of the parent pipeline node :param source_db_alias: The alias of the the db from which data is copied :param source_table: The table from which is copied :param max_modification_value: The last retrieved modification value

Returns:

mara_pipelines.incremental_processing.incremental_copy_status.delete(node_path: List[str], source_db_alias: str, source_table: str)

Deletes the last_comparison_value for a pipeline node and table :param node_path: The path of the parent pipeline node :param source_db_alias: The alias of the the db from which data is copied :param source_table: The table from which is copied

Returns:

mara_pipelines.incremental_processing.incremental_copy_status.get_last_comparison_value(node_path: List[str], source_db_alias: str, source_table: str)

Returns the last comparison value for a pipeline node and table :param node_path: The path of the parent pipeline node :param source_db_alias: The alias of the the db from which data is copied :param source_table: The table from which is copied

Returns

The value or None


mara_pipelines.incremental_processing.processed_files.track_processed_file(node_path: str, file_name: str, last_modified_timestamp: datetime.datetime)

Records that a file has been ‘processed’ by a node

Parameters
  • node_path – The path of the node that processed the file

  • file_name – The name of the file that has been processed

  • last_modified_timestamp – The time when the file was modified last

Returns: True

mara_pipelines.incremental_processing.processed_files.already_processed_files(node_path: str) Dict[str, datetime.datetime]

Returns all files that already have been processed by a node :param node_path: The path of the node that processed the file

Returns

A mapping of file names to timestamps of last modification


mara_pipelines.incremental_processing.reset.reset_incremental_processing(node_path: List[str])

Recursively resets all incremental processing status information that is stored in the mara db :param node_path: The path of the node to reset

Events

class mara_pipelines.events.Event

Base class for events that are emitted from mara.

class mara_pipelines.events.EventHandler
mara_pipelines.events.notify_configured_event_handlers(event: mara_pipelines.events.Event)

Pipelines

class mara_pipelines.pipelines.Pipeline(id: str, description: str, max_number_of_parallel_tasks: Optional[int] = None, base_path: Optional[pathlib.Path] = None, labels: Optional[Dict[str, str]] = None, ignore_errors: bool = False, force_run_all_children: bool = False)

A directed acyclic graph (DAG) of nodes with dependencies between them.

Parameters
  • id – The id of the pipeline

  • description – A short summary of what the pipeline is doing

  • max_number_of_parallel_tasks – Only that many nodes of the pipeline will run in parallel

  • base_path – The absolute path of the pipeline root, file names are relative to that

  • labels – An arbitrary dictionary application specific tags, schemas and so on.

  • ignore_errors – When true, then the pipeline execution will not fail when a child node fails

  • force_run_all_children – When true, child nodes will run even when their upstreams failed

class mara_pipelines.pipelines.Task(id: str, description: str, commands: Optional[List[mara_pipelines.pipelines.Command]] = None, max_retries: Optional[int] = None)
class mara_pipelines.pipelines.Command

Base class for operations that can run inside a pipeline task

Parameters

parent – The pipeline node that contains this command (needed for debug output)

mara_pipelines.pipelines.find_node(path: List[str]) Tuple[mara_pipelines.pipelines.Node, bool]

Retrieves a node by the the path from its parents :param path: The ids of the node and of all its parent, from top to bottom

Returns

A tuple of the node and True if the node was found, or a the closest known parent node if and False otherwise

Shell

mara_pipelines.shell.run_shell_command(command: str, log_command: bool = True) Union[List[str], bool]

Runs a command in a bash shell and logs the output of the command in (near)real-time.

Parameters
  • command – The command to run

  • log_command – When true, then the command itself is logged before execution

Returns

Either (in order) - False when the exit code of the command was not 0 - True when there was no output to stdout - The output to stdout, as an array of lines

mara_pipelines.shell.sed_command(replace: Dict[str, str]) str

Creates a sed command string from a dictionary of replacements

Examples

>>> print(sed_command({'foo':'a','bar':'b'}))
sed "s/foo/a/g; s/bar/b/g"