Commands

Bash commands

class mara_pipelines.commands.bash.RunBash(command: Union[str, Callable])

Runs a command in a bash shell

Parameters

command – The command to run

Files commands

enum mara_pipelines.commands.files.Compression(value)

Different compression formats that are understood by file readers

Valid values are as follows:

NONE = <Compression.NONE: 'none'>
GZIP = <Compression.GZIP: 'gzip'>
TAR_GZIP = <Compression.TAR_GZIP: 'tar.gzip'>
ZIP = <Compression.ZIP: 'zip'>
class mara_pipelines.commands.files.ReadFile(file_name: str, compression: mara_pipelines.commands.files.Compression, target_table: str, mapper_script_file_name: Optional[str] = None, make_unique: bool = False, db_alias: Optional[str] = None, csv_format: Optional[bool] = None, skip_header: Optional[bool] = None, delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None, timezone: Optional[str] = None, file_format: Optional[mara_db.formats.Format] = None)

Reads data from a local file

class mara_pipelines.commands.files.ReadSQLite(sqlite_file_name: str, target_table: str, sql_statement: Optional[str] = None, sql_file_name: Optional[str] = None, replace: Optional[Dict[str, str]] = None, db_alias: Optional[str] = None, timezone: Optional[str] = None)
class mara_pipelines.commands.files.ReadScriptOutput(file_name: str, target_table: str, make_unique: bool = False, db_alias: Optional[str] = None, csv_format: Optional[bool] = None, skip_header: Optional[bool] = None, delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None, timezone: Optional[str] = None, pipe_format: Optional[mara_db.formats.Format] = None)

Reads the output from a python script into a database table

class mara_pipelines.commands.files.WriteFile(dest_file_name: str, sql_statement: typing.Optional[typing.Union[typing.Callable, str]] = None, sql_file_name: typing.Optional[str] = None, replace: typing.Optional[typing.Dict[str, str]] = None, db_alias: typing.Optional[str] = None, compression: mara_pipelines.commands.files.Compression = Compression.NONE, format: mara_db.formats.Format = <CsvFormat: delimiter_char=, >)

Writes data to a local file. The command is executed on the shell.

Python commands

class mara_pipelines.commands.python.RunFunction(function: Optional[Callable] = None, args: Optional[List[str]] = None, file_dependencies: Optional[List[str]] = None)

Runs an arbitrary python function

Parameters
  • function – The parameterless function to run

  • args – A list of arguments to be passed to the script

  • file_dependencies – Run triggered based on whether a list of files changed since the last pipeline run

Note

if you want to pass arguments, then use a lambda function

class mara_pipelines.commands.python.ExecutePython(file_name: Union[Callable, str], args: Optional[Union[Callable, List[str]]] = None, file_dependencies: Optional[List[str]] = None)

Runs a python script in a separate interpreter process

Parameters
  • file_name – the path of the file to run, relative to the pipeline directory

  • args – A list of arguments to be passed to the script

  • file_dependencies – Run triggered based on whether a list of files changed since the last pipeline run

SQL commands

class mara_pipelines.commands.sql.ExecuteSQL(sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, replace: Optional[Dict[str, str]] = None, file_dependencies: Optional[List[str]] = None, db_alias: Optional[str] = None, echo_queries: Optional[bool] = None, timezone: Optional[str] = None)

Runs an sql file or statement in a database

Parameters
  • sql_statement – The statement to run as a string

  • sql_file_name – The name of the file to run (relative to the directory of the parent pipeline)

  • replace – A set of replacements to perform against the sql query {‘replace: ‘with’, ..}`

class mara_pipelines.commands.sql.Copy(source_db_alias: str, target_table: str, target_db_alias: Optional[str] = None, sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, replace: Optional[Dict[str, str]] = None, timezone: Optional[str] = None, csv_format: Optional[bool] = None, delimiter_char: Optional[str] = None, file_dependencies: Optional[List[str]] = None)

Loads data from an external database

class mara_pipelines.commands.sql.CopyIncrementally(source_db_alias: str, source_table: str, modification_comparison: str, comparison_value_placeholder: str, target_table: str, primary_keys: List[str], sql_file_name: Optional[str] = None, sql_statement: Optional[Union[Callable, str]] = None, target_db_alias: Optional[str] = None, timezone: Optional[str] = None, replace: Optional[Dict[str, str]] = None, use_explicit_upsert: bool = False, csv_format: Optional[bool] = None, delimiter_char: Optional[str] = None, modification_comparison_type: Optional[str] = None)

Incrementally loads data from one database into another.

Requires the source table to have an monotonously increasing column or combination of columns that allow to identify “newer” columns (the modification comparison).

After an initial full load, only those rows with a with a a higher modification comparison than the last comparison value are read.

Parameters
  • source_db_alias – The database to load from

  • source_table – The table to read from

  • sql_statement – A query that is run to query the source database

  • sql_file_name – The path of a file name that is run to query the source database

  • replace – A set of replacements to perform against the sql query

  • modification_comparison – SQL expression that evaluates to a comparable value

  • modification_comparison_type – type of the saved (as string) modification_comparison value

  • comparison_value_placeholder – A placeholder in the sql code that gets replaced with the actual incremental load comparison or 1=1.

  • target_db_alias – The database to write to

  • target_table – The table for loading data into

  • primary_keys – A combination of primary key columns that are used for upserting into the target table

  • timezone – How to interpret timestamps in the target db

  • use_explicit_upsert – When True, uses an Update + Insert query combination. Otherwise ON CONFLICT DO UPDATE.