Commands¶
Bash commands¶
- class mara_pipelines.commands.bash.RunBash(command: Union[str, Callable])¶
Runs a command in a bash shell
- Parameters
command – The command to run
Files commands¶
- enum mara_pipelines.commands.files.Compression(value)¶
Different compression formats that are understood by file readers
Valid values are as follows:
- NONE = <Compression.NONE: 'none'>¶
- GZIP = <Compression.GZIP: 'gzip'>¶
- TAR_GZIP = <Compression.TAR_GZIP: 'tar.gzip'>¶
- ZIP = <Compression.ZIP: 'zip'>¶
- class mara_pipelines.commands.files.ReadFile(file_name: str, compression: mara_pipelines.commands.files.Compression, target_table: str, mapper_script_file_name: Optional[str] = None, make_unique: bool = False, db_alias: Optional[str] = None, csv_format: Optional[bool] = None, skip_header: Optional[bool] = None, delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None, timezone: Optional[str] = None, file_format: Optional[mara_db.formats.Format] = None)¶
Reads data from a local file
- class mara_pipelines.commands.files.ReadSQLite(sqlite_file_name: str, target_table: str, sql_statement: Optional[str] = None, sql_file_name: Optional[str] = None, replace: Optional[Dict[str, str]] = None, db_alias: Optional[str] = None, timezone: Optional[str] = None)¶
- class mara_pipelines.commands.files.ReadScriptOutput(file_name: str, target_table: str, make_unique: bool = False, db_alias: Optional[str] = None, csv_format: Optional[bool] = None, skip_header: Optional[bool] = None, delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None, timezone: Optional[str] = None, pipe_format: Optional[mara_db.formats.Format] = None)¶
Reads the output from a python script into a database table
- class mara_pipelines.commands.files.WriteFile(dest_file_name: str, sql_statement: typing.Optional[typing.Union[typing.Callable, str]] = None, sql_file_name: typing.Optional[str] = None, replace: typing.Optional[typing.Dict[str, str]] = None, db_alias: typing.Optional[str] = None, compression: mara_pipelines.commands.files.Compression = Compression.NONE, format: mara_db.formats.Format = <CsvFormat: delimiter_char=, >)¶
Writes data to a local file. The command is executed on the shell.
Python commands¶
- class mara_pipelines.commands.python.RunFunction(function: Optional[Callable] = None, args: Optional[List[str]] = None, file_dependencies: Optional[List[str]] = None)¶
Runs an arbitrary python function
- Parameters
function – The parameterless function to run
args – A list of arguments to be passed to the script
file_dependencies – Run triggered based on whether a list of files changed since the last pipeline run
Note
if you want to pass arguments, then use a lambda function
- class mara_pipelines.commands.python.ExecutePython(file_name: Union[Callable, str], args: Optional[Union[Callable, List[str]]] = None, file_dependencies: Optional[List[str]] = None)¶
Runs a python script in a separate interpreter process
- Parameters
file_name – the path of the file to run, relative to the pipeline directory
args – A list of arguments to be passed to the script
file_dependencies – Run triggered based on whether a list of files changed since the last pipeline run
SQL commands¶
- class mara_pipelines.commands.sql.ExecuteSQL(sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, replace: Optional[Dict[str, str]] = None, file_dependencies: Optional[List[str]] = None, db_alias: Optional[str] = None, echo_queries: Optional[bool] = None, timezone: Optional[str] = None)¶
Runs an sql file or statement in a database
- Parameters
sql_statement – The statement to run as a string
sql_file_name – The name of the file to run (relative to the directory of the parent pipeline)
replace – A set of replacements to perform against the sql query {‘replace: ‘with’, ..}`
- class mara_pipelines.commands.sql.Copy(source_db_alias: str, target_table: str, target_db_alias: Optional[str] = None, sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, replace: Optional[Dict[str, str]] = None, timezone: Optional[str] = None, csv_format: Optional[bool] = None, delimiter_char: Optional[str] = None, file_dependencies: Optional[List[str]] = None)¶
Loads data from an external database
- class mara_pipelines.commands.sql.CopyIncrementally(source_db_alias: str, source_table: str, modification_comparison: str, comparison_value_placeholder: str, target_table: str, primary_keys: List[str], sql_file_name: Optional[str] = None, sql_statement: Optional[Union[Callable, str]] = None, target_db_alias: Optional[str] = None, timezone: Optional[str] = None, replace: Optional[Dict[str, str]] = None, use_explicit_upsert: bool = False, csv_format: Optional[bool] = None, delimiter_char: Optional[str] = None, modification_comparison_type: Optional[str] = None)¶
Incrementally loads data from one database into another.
Requires the source table to have an monotonously increasing column or combination of columns that allow to identify “newer” columns (the modification comparison).
After an initial full load, only those rows with a with a a higher modification comparison than the last comparison value are read.
- Parameters
source_db_alias – The database to load from
source_table – The table to read from
sql_statement – A query that is run to query the source database
sql_file_name – The path of a file name that is run to query the source database
replace – A set of replacements to perform against the sql query
modification_comparison – SQL expression that evaluates to a comparable value
modification_comparison_type – type of the saved (as string) modification_comparison value
comparison_value_placeholder – A placeholder in the sql code that gets replaced with the actual incremental load comparison or 1=1.
target_db_alias – The database to write to
target_table – The table for loading data into
primary_keys – A combination of primary key columns that are used for upserting into the target table
timezone – How to interpret timestamps in the target db
use_explicit_upsert – When True, uses an Update + Insert query combination. Otherwise ON CONFLICT DO UPDATE.