Skip to content

Flows library

viadot.flows.adls_container_to_container.ADLSContainerToContainer (Flow)

Copy file(s) between containers.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
from_path str

The path to the Data Lake file.

required
to_path str

The path of the final file location a/a/filename.extension.

required
adls_sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None.

required
vault_name str

The name of the vault from which to retrieve the secrets.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

viadot.flows.adls_gen1_to_azure_sql_new.ADLSGen1ToAzureSQLNew (Flow)

Move file(s) from Azure Data Lake gen1 to gen2.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
gen1_path str

The path to the gen1 Data Lake file/folder.

required
gen2_path str

The path of the final gen2 file/folder.

required
local_file_path str

Where the gen1 file should be downloaded.

required
overwrite str

Whether to overwrite the destination file(s).

required
read_sep str

The delimiter for the gen1 file.

required
write_sep str

The delimiter for the output file.

required
read_quoting str

The quoting option for the input file.

required
read_lineterminator str

The line terminator for the input file.

required
read_error_bad_lines bool

Whether to raise an exception on bad lines.

required
gen1_sp_credentials_secret str

The Key Vault secret holding Service Pricipal credentials for gen1 lake

required
gen2_sp_credentials_secret str

The Key Vault secret holding Service Pricipal credentials for gen2 lake

required
sqldb_credentials_secret str

The Key Vault secret holding Azure SQL Database credentials

required
vault_name str

The name of the vault from which to retrieve sp_credentials_secret

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

viadot.flows.adls_gen1_to_azure_sql.ADLSGen1ToAzureSQL (Flow)

Bulk insert a file from an Azure Data Lake gen1 to Azure SQL Database.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
path str

The path to the Data Lake file/folder.

required
blob_path str

The path of the generated blob.

required
dtypes dict

Which dtypes to use when creating the table in Azure SQL Database.

required
local_file_path str

Where the gen1 file should be downloaded.

required
sp_credentials_secret str

The Key Vault secret holding Service Pricipal credentials

required
vault_name str

The name of the vault from which to retrieve sp_credentials_secret

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

viadot.flows.adls_gen1_to_gen2.ADLSGen1ToGen2 (Flow)

Move file(s) from Azure Data Lake gen1 to gen2.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
gen1_path str

The path to the gen1 Data Lake file/folder.

required
gen2_path str

The path of the final gen2 file/folder.

required
local_file_path str

Where the gen1 file should be downloaded.

required
overwrite str

Whether to overwrite the destination file(s).

required
gen1_sp_credentials_secret str

The Key Vault secret holding Service Pricipal credentials for gen1 lake

required
gen2_sp_credentials_secret str

The Key Vault secret holding Service Pricipal credentials for gen2 lake

required
vault_name str

The name of the vault from which to retrieve the secrets.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

viadot.flows.adls_to_azure_sql.ADLSToAzureSQL (Flow)

__init__(self, name, local_file_path=None, adls_path=None, read_sep='\t', write_sep='\t', remove_tab=False, overwrite_adls=True, if_empty='warn', adls_sp_credentials_secret=None, dtypes=None, check_dtypes_order=True, table=None, schema=None, if_exists='replace', check_col_order=True, sqldb_credentials_secret=None, on_bcp_error='skip', max_download_retries=5, tags=['promotion'], vault_name=None, timeout=3600, *args, **kwargs) special

Flow for downloading data from different marketing APIs to a local CSV using Supermetrics API, then uploading it to Azure Data Lake, and finally inserting into Azure SQL Database.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
local_file_path str

Local destination path. Defaults to None.

None
adls_path str

The path to an ADLS folder or file. If you pass a path to a directory,

None
read_sep str

The delimiter for the source file. Defaults to " ".

'\t'
write_sep str

The delimiter for the output CSV file. Defaults to " ".

'\t'
remove_tab bool

Whether to remove tab delimiters from the data. Defaults to False.

False
overwrite_adls bool

Whether to overwrite the file in ADLS. Defaults to True.

True
if_empty str

What to do if the Supermetrics query returns no data. Defaults to "warn".

'warn'
adls_sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
dtypes dict

Which custom data types should be used for SQL table creation task.

None
check_dtypes_order bool, optiona

By default, this task will be used by all the flows. It can be set up to False for avoiding its application. Defaults to True.

True
table str

Destination table. Defaults to None.

None
schema str

Destination schema. Defaults to None.

None
if_exists Literal

What to do if the table exists. Defaults to "replace".

'replace'
check_col_order bool

Whether to check column order. Defaults to True.

True
sqldb_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
on_bcp_error Literal["skip", "fail"]

What to do if error occurs. Defaults to "skip".

'skip'
max_download_retries int

How many times to retry the download. Defaults to 5.

5
tags List[str]

Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"].

['promotion']
vault_name str

The name of the vault from which to obtain the secrets. Defaults to None.

None
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
Source code in viadot/flows/adls_to_azure_sql.py
def __init__(
    self,
    name: str,
    local_file_path: str = None,
    adls_path: str = None,
    read_sep: str = "\t",
    write_sep: str = "\t",
    remove_tab: bool = False,
    overwrite_adls: bool = True,
    if_empty: str = "warn",
    adls_sp_credentials_secret: str = None,
    dtypes: Dict[str, Any] = None,
    check_dtypes_order: bool = True,
    table: str = None,
    schema: str = None,
    if_exists: Literal["fail", "replace", "append", "delete"] = "replace",
    check_col_order: bool = True,
    sqldb_credentials_secret: str = None,
    on_bcp_error: Literal["skip", "fail"] = "skip",
    max_download_retries: int = 5,
    tags: List[str] = ["promotion"],
    vault_name: str = None,
    timeout: int = 3600,
    *args: List[any],
    **kwargs: Dict[str, Any],
):
    """
    Flow for downloading data from different marketing APIs to a local CSV
    using Supermetrics API, then uploading it to Azure Data Lake,
    and finally inserting into Azure SQL Database.
    Args:
        name (str): The name of the flow.
        local_file_path (str, optional): Local destination path. Defaults to None.
        adls_path (str): The path to an ADLS folder or file. If you pass a path to a directory,
        the latest file from that directory will be loaded. We assume that the files are named using timestamps.
        read_sep (str, optional): The delimiter for the source file. Defaults to "\t".
        write_sep (str, optional): The delimiter for the output CSV file. Defaults to "\t".
        remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False.
        overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
        if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
        adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
        Defaults to None.
        dtypes (dict, optional): Which custom data types should be used for SQL table creation task.
        To be used only in case that dtypes need to be manually mapped - dtypes from raw schema file in use by default. Defaults to None.
        check_dtypes_order (bool, optiona): By default, this task will be used by all the flows. It can
            be set up to False for avoiding its application. Defaults to True.
        table (str, optional): Destination table. Defaults to None.
        schema (str, optional): Destination schema. Defaults to None.
        if_exists (Literal, optional): What to do if the table exists. Defaults to "replace".
        check_col_order (bool, optional): Whether to check column order. Defaults to True.
        sqldb_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        Azure SQL Database credentials. Defaults to None.
        on_bcp_error (Literal["skip", "fail"], optional): What to do if error occurs. Defaults to "skip".
        max_download_retries (int, optional): How many times to retry the download. Defaults to 5.
        tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"].
        vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
        timeout(int, optional): The amount of time (in seconds) to wait while running this task before
            a timeout occurs. Defaults to 3600.
    """

    adls_path = adls_path.strip("/")
    # Read parquet
    if adls_path.split(".")[-1] in ["csv", "parquet"]:
        self.adls_path = adls_path
    else:
        self.adls_path = get_key_value(key=adls_path)

    # Read schema
    self.dtypes = dtypes
    self.check_dtypes_order = check_dtypes_order
    self.adls_root_dir_path = os.path.split(self.adls_path)[0]
    self.adls_file_name = os.path.split(self.adls_path)[-1]
    extension = os.path.splitext(self.adls_path)[-1]
    json_schema_file_name = self.adls_file_name.replace(extension, ".json")
    self.json_shema_path = os.path.join(
        self.adls_root_dir_path,
        "schema",
        json_schema_file_name,
    )

    # AzureDataLakeUpload
    self.local_file_path = local_file_path or self.slugify(name) + ".csv"
    self.local_json_path = self.slugify(name) + ".json"
    self.read_sep = read_sep
    self.write_sep = write_sep
    self.overwrite_adls = overwrite_adls
    self.if_empty = if_empty
    self.adls_sp_credentials_secret = adls_sp_credentials_secret
    self.adls_path_conformed = self.get_promoted_path(env="conformed")
    self.adls_path_operations = self.get_promoted_path(env="operations")

    # AzureSQLCreateTable
    self.table = table
    self.schema = schema
    self.if_exists = if_exists
    self.check_col_order = check_col_order
    # Generate CSV
    self.remove_tab = remove_tab

    # BCPTask
    self.sqldb_credentials_secret = sqldb_credentials_secret
    self.on_bcp_error = on_bcp_error

    # Global
    self.max_download_retries = max_download_retries
    self.tags = tags
    self.vault_name = vault_name
    self.timeout = timeout

    super().__init__(*args, name=name, **kwargs)

    # self.dtypes.update(METADATA_COLUMNS)
    self.gen_flow()

viadot.flows.azure_sql_transform.AzureSQLTransform (Flow)

__init__(self, name, query, sqldb_credentials_secret=None, vault_name=None, tags=['transform'], timeout=3600, *args, **kwargs) special

Flow for running SQL queries on top of Azure SQL Database.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
query str, required

The query to execute on the database.

required
credentials_secret str

The name of the Azure Key Vault secret containing a dictionary

required
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
tags list

Tag for marking flow. Defaults to "transform".

['transform']
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
Source code in viadot/flows/azure_sql_transform.py
def __init__(
    self,
    name: str,
    query: str,
    sqldb_credentials_secret: str = None,
    vault_name: str = None,
    tags: List[str] = ["transform"],
    timeout: int = 3600,
    *args: List[any],
    **kwargs: Dict[str, Any]
):
    """
    Flow for running SQL queries on top of Azure SQL Database.

    Args:
        name (str): The name of the flow.
        query (str, required): The query to execute on the database.
        credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
        with SQL db credentials (server, db_name, user, and password).
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
        tags (list, optional): Tag for marking flow. Defaults to "transform".
        timeout(int, optional): The amount of time (in seconds) to wait while running this task before
            a timeout occurs. Defaults to 3600.
    """
    self.query = query
    self.tags = tags
    self.sqldb_credentials_secret = sqldb_credentials_secret
    self.vault_name = vault_name
    self.timeout = timeout

    super().__init__(*args, name=name, **kwargs)
    self.gen_flow()

viadot.flows.supermetrics_to_adls.SupermetricsToADLS (Flow)

__init__(self, name, ds_id, ds_accounts, fields, ds_user=None, ds_segments=None, date_range_type=None, start_date=None, end_date=None, settings=None, filter=None, max_rows=1000000, max_columns=None, order_columns=None, expectation_suite=None, evaluation_parameters=None, keep_validation_output=False, output_file_extension='.parquet', local_file_path=None, adls_file_name=None, adls_dir_path=None, overwrite_adls=True, if_empty='warn', if_exists='replace', adls_sp_credentials_secret=None, max_download_retries=5, supermetrics_task_timeout=1800, parallel=True, tags=['extract'], vault_name=None, check_missing_data=True, timeout=3600, *args, **kwargs) special

Flow for downloading data from different marketing APIs to a local CSV using Supermetrics API, then uploading it to Azure Data Lake.

Parameters:

Name Type Description Default
name str

The name of the flow.

required
ds_id str

A query parameter passed to the SupermetricsToCSV task

required
ds_accounts List[str]

A query parameter passed to the SupermetricsToCSV task

required
ds_user str

A query parameter passed to the SupermetricsToCSV task

None
fields List[str]

A query parameter passed to the SupermetricsToCSV task

required
ds_segments List[str]

A query parameter passed to the SupermetricsToCSV task. Defaults to None.

None
date_range_type str

A query parameter passed to the SupermetricsToCSV task. Defaults to None.

None
start_date str

A query parameter to pass start date to the date range filter. Defaults to None.

None
end_date str

A query parameter to pass end date to the date range filter. Defaults to None.

None
settings Dict[str, Any]

A query parameter passed to the SupermetricsToCSV task. Defaults to None.

None
filter str

A query parameter passed to the SupermetricsToCSV task. Defaults to None.

None
max_rows int

A query parameter passed to the SupermetricsToCSV task. Defaults to 1000000.

1000000
max_columns int

A query parameter passed to the SupermetricsToCSV task. Defaults to None.

None
order_columns str

A query parameter passed to the SupermetricsToCSV task. Defaults to None.

None
expectation_suite dict

The Great Expectations suite used to valiaate the data. Defaults to None.

None
evaluation_parameters str

A dictionary containing evaluation parameters for the validation. Defaults to None.

None
keep_validation_output bool

Whether to keep the output files generated by the Great Expectations task. Defaults to False.

False
local_file_path str

Local destination path. Defaults to None.

None
adls_file_name str

Name of file in ADLS. Defaults to None.

None
output_file_extension str

Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet"..

'.parquet'
adls_dir_path str

Azure Data Lake destination folder/catalog path. Defaults to None.

None
sep str

The separator to use in the CSV. Defaults to " ".

required
overwrite_adls bool

Whether to overwrite the file in ADLS. Defaults to True.

True
if_empty str

What to do if the Supermetrics query returns no data. Defaults to "warn".

'warn'
adls_sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
max_download_retries int

How many times to retry the download. Defaults to 5.

5
supermetrics_task_timeout int

The timeout for the download task. Defaults to 60*30.

1800
parallel bool

Whether to parallelize the downloads. Defaults to True.

True
tags List[str]

Flow tags to use, eg. to control flow concurrency. Defaults to ["extract"].

['extract']
vault_name str

The name of the vault from which to obtain the secrets. Defaults to None.

None
check_missing_data bool

Whether to check missing data. Defaults to True.

True
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
Source code in viadot/flows/supermetrics_to_adls.py
def __init__(
    self,
    name: str,
    ds_id: str,
    ds_accounts: List[str],
    fields: List[str],
    ds_user: str = None,
    ds_segments: List[str] = None,
    date_range_type: str = None,
    start_date: str = None,
    end_date: str = None,
    settings: Dict[str, Any] = None,
    filter: str = None,
    max_rows: int = 1000000,
    max_columns: int = None,
    order_columns: str = None,
    expectation_suite: dict = None,
    evaluation_parameters: dict = None,
    keep_validation_output: bool = False,
    output_file_extension: str = ".parquet",
    local_file_path: str = None,
    adls_file_name: str = None,
    adls_dir_path: str = None,
    overwrite_adls: bool = True,
    if_empty: str = "warn",
    if_exists: str = "replace",
    adls_sp_credentials_secret: str = None,
    max_download_retries: int = 5,
    supermetrics_task_timeout: int = 60 * 30,
    parallel: bool = True,
    tags: List[str] = ["extract"],
    vault_name: str = None,
    check_missing_data: bool = True,
    timeout: int = 3600,
    *args: List[any],
    **kwargs: Dict[str, Any],
):
    """
    Flow for downloading data from different marketing APIs to a local CSV
    using Supermetrics API, then uploading it to Azure Data Lake.

    Args:
        name (str): The name of the flow.
        ds_id (str): A query parameter passed to the SupermetricsToCSV task
        ds_accounts (List[str]): A query parameter passed to the SupermetricsToCSV task
        ds_user (str): A query parameter passed to the SupermetricsToCSV task
        fields (List[str]): A query parameter passed to the SupermetricsToCSV task
        ds_segments (List[str], optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
        date_range_type (str, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
        start_date (str, optional): A query parameter to pass start date to the date range filter. Defaults to None.
        end_date (str, optional): A query parameter to pass end date to the date range filter. Defaults to None.
        settings (Dict[str, Any], optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
        filter (str, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
        max_rows (int, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to 1000000.
        max_columns (int, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
        order_columns (str, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
        expectation_suite (dict, optional): The Great Expectations suite used to valiaate the data. Defaults to None.
        evaluation_parameters (str, optional): A dictionary containing evaluation parameters for the validation. Defaults to None.
        keep_validation_output (bool, optional): Whether to keep the output files generated by the Great Expectations task. Defaults to False.
        Currently, only GitHub URLs are supported. Defaults to None.
        local_file_path (str, optional): Local destination path. Defaults to None.
        adls_file_name (str, optional): Name of file in ADLS. Defaults to None.
        output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet"..
        adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None.
        sep (str, optional): The separator to use in the CSV. Defaults to "\t".
        overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
        if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
        adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
        Defaults to None.
        max_download_retries (int, optional): How many times to retry the download. Defaults to 5.
        supermetrics_task_timeout (int, optional): The timeout for the download task. Defaults to 60*30.
        parallel (bool, optional): Whether to parallelize the downloads. Defaults to True.
        tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["extract"].
        vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
        check_missing_data (bool, optional): Whether to check missing data. Defaults to True.
        timeout(int, optional): The amount of time (in seconds) to wait while running this task before
            a timeout occurs. Defaults to 3600.
    """
    if not ds_user:
        try:
            ds_user = PrefectSecret("SUPERMETRICS_DEFAULT_USER").run()
        except ValueError as e:
            msg = "Neither 'ds_user' parameter nor 'SUPERMETRICS_DEFAULT_USER' secret were not specified"
            raise ValueError(msg) from e

    self.flow_name = name
    self.check_missing_data = check_missing_data
    self.timeout = timeout
    # SupermetricsToDF
    self.ds_id = ds_id
    self.ds_accounts = ds_accounts
    self.ds_segments = ds_segments
    self.ds_user = ds_user
    self.fields = fields
    self.date_range_type = date_range_type
    self.start_date = start_date
    self.end_date = end_date
    self.settings = settings
    self.filter = filter
    self.max_rows = max_rows
    self.max_columns = max_columns
    self.order_columns = order_columns
    self.if_exists = if_exists
    self.output_file_extension = output_file_extension

    # RunGreatExpectationsValidation
    self.expectation_suite = expectation_suite
    self.expectations_path = "/home/viadot/tmp/expectations"
    self.expectation_suite_name = expectation_suite["expectation_suite_name"]
    self.evaluation_parameters = evaluation_parameters
    self.keep_validation_output = keep_validation_output

    # AzureDataLakeUpload
    self.local_file_path = (
        local_file_path or self.slugify(name) + self.output_file_extension
    )
    self.local_json_path = self.slugify(name) + ".json"
    self.now = str(pendulum.now("utc"))
    self.adls_dir_path = adls_dir_path

    if adls_file_name is not None:
        self.adls_file_path = os.path.join(adls_dir_path, adls_file_name)
        self.adls_schema_file_dir_file = os.path.join(
            adls_dir_path, "schema", Path(adls_file_name).stem + ".json"
        )

    else:
        self.adls_file_path = os.path.join(
            adls_dir_path, self.now + self.output_file_extension
        )
        self.adls_schema_file_dir_file = os.path.join(
            adls_dir_path, "schema", self.now + ".json"
        )
    self.overwrite_adls = overwrite_adls
    self.if_empty = if_empty
    self.adls_sp_credentials_secret = adls_sp_credentials_secret

    # Global
    self.max_download_retries = max_download_retries
    self.supermetrics_task_timeout = supermetrics_task_timeout
    self.parallel = parallel
    self.tags = tags
    self.vault_name = vault_name

    super().__init__(*args, name=name, **kwargs)

    self.gen_flow()

viadot.flows.supermetrics_to_azure_sql.SupermetricsToAzureSQL (Flow)

viadot.flows.cloud_for_customers_report_to_adls.CloudForCustomersReportToADLS (Flow)

__init__(self, name=None, report_url=None, url=None, endpoint=None, params={}, fields=None, skip=0, top=1000, channels=None, months=None, years=None, env='QA', c4c_credentials_secret=None, local_file_path=None, output_file_extension='.csv', adls_dir_path=None, adls_file_path=None, overwrite_adls=False, adls_sp_credentials_secret=None, if_empty='warn', if_exists='replace', timeout=3600, *args, **kwargs) special

Flow for downloading data from different marketing APIs to a local CSV using Cloud for Customers API, then uploading it to Azure Data Lake.

Parameters:

Name Type Description Default
name str

The name of the flow.

None
report_url str

The url to the API. Defaults to None.

None
url str

???

None
endpoint str

???

None
params dict

???

{}
fields list

???

None
skip int

Initial index value of reading row. Defaults to 0.

0
top int

The value of top reading row. Defaults to 1000.

1000
channels List[str]

Filtering parameters passed to the url. Defaults to None.

None
months List[str]

Filtering parameters passed to the url. Defaults to None.

None
years List[str]

Filtering parameters passed to the url. Defaults to None.

None
env str

???

'QA'
c4c_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
local_file_path str

Local destination path. Defaults to None.

None
output_file_extension str

Output file extension - to allow selection of .csv for data which is not easy

'.csv'
adls_dir_path str

Azure Data Lake destination folder/catalog path. Defaults to None.

None
adls_file_path str

Azure Data Lake destination file path. Defaults to None.

None
overwrite_adls bool

Whether to overwrite the file in ADLS. Defaults to False.

False
adls_sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
if_empty str

What to do if the Supermetrics query returns no data. Defaults to "warn".

'warn'
if_exists str

What to do if the local file already exists. Defaults to "replace".

'replace'
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
Source code in viadot/flows/cloud_for_customers_report_to_adls.py
def __init__(
    self,
    name: str = None,
    report_url: str = None,
    url: str = None,
    endpoint: str = None,
    params: Dict[str, Any] = {},
    fields: List[str] = None,
    skip: int = 0,
    top: int = 1000,
    channels: List[str] = None,
    months: List[str] = None,
    years: List[str] = None,
    env: str = "QA",
    c4c_credentials_secret: str = None,
    local_file_path: str = None,
    output_file_extension: str = ".csv",
    adls_dir_path: str = None,
    adls_file_path: str = None,
    overwrite_adls: bool = False,
    adls_sp_credentials_secret: str = None,
    if_empty: str = "warn",
    if_exists: str = "replace",
    timeout: int = 3600,
    *args: List[any],
    **kwargs: Dict[str, Any],
):
    """
    Flow for downloading data from different marketing APIs to a local CSV
    using Cloud for Customers API, then uploading it to Azure Data Lake.

    Args:
        name (str): The name of the flow.
        report_url (str, optional): The url to the API. Defaults to None.
        url (str, optional): ???
        endpoint (str, optional): ???
        params (dict, optional): ???
        fields (list, optional): ???
        skip (int, optional): Initial index value of reading row. Defaults to 0.
        top (int, optional): The value of top reading row. Defaults to 1000.
        channels (List[str], optional): Filtering parameters passed to the url. Defaults to None.
        months (List[str], optional): Filtering parameters passed to the url. Defaults to None.
        years (List[str], optional): Filtering parameters passed to the url. Defaults to None.
        env (str, optional): ???
        c4c_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        username and password for the Cloud for Customers instance.
        local_file_path (str, optional): Local destination path. Defaults to None.
        output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy
        to handle with parquet. Defaults to ".csv".
        adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None.
        adls_file_path (str, optional): Azure Data Lake destination file path. Defaults to None.
        overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to False.
        adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
        Defaults to None.
        if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
        if_exists (str, optional): What to do if the local file already exists. Defaults to "replace".
        timeout(int, optional): The amount of time (in seconds) to wait while running this task before
            a timeout occurs. Defaults to 3600.
    """

    self.report_url = report_url
    self.skip = skip
    self.top = top
    self.if_empty = if_empty
    self.env = env
    self.c4c_credentials_secret = c4c_credentials_secret
    self.timeout = timeout

    # AzureDataLakeUpload
    self.adls_sp_credentials_secret = adls_sp_credentials_secret
    self.if_exists = if_exists
    self.overwrite_adls = overwrite_adls
    self.output_file_extension = output_file_extension
    self.local_file_path = (
        local_file_path or slugify(name) + self.output_file_extension
    )
    self.now = str(pendulum.now("utc"))
    self.adls_dir_path = adls_dir_path
    self.adls_file_path = adls_file_path or os.path.join(
        adls_dir_path, self.now + self.output_file_extension
    )

    # in case of non-report invoking
    self.url = url
    self.endpoint = endpoint
    self.params = params
    self.fields = fields

    # filtering report_url for reports
    self.channels = channels
    self.months = months
    self.years = years

    self.report_urls_with_filters = [self.report_url]

    self.report_urls_with_filters = self.create_url_with_fields(
        fields_list=self.channels, filter_code="CCHANNETZTEXT12CE6C2FA0D77995"
    )

    self.report_urls_with_filters = self.create_url_with_fields(
        fields_list=self.months, filter_code="CMONTH_ID"
    )

    self.report_urls_with_filters = self.create_url_with_fields(
        fields_list=self.years, filter_code="CYEAR_ID"
    )

    super().__init__(*args, name=name, **kwargs)

    self.gen_flow()