Flows library
viadot.flows.adls_container_to_container.ADLSContainerToContainer (Flow)
Copy file(s) between containers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
from_path |
str |
The path to the Data Lake file. |
required |
to_path |
str |
The path of the final file location a/a/filename.extension. |
required |
adls_sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. |
required |
vault_name |
str |
The name of the vault from which to retrieve the secrets. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
viadot.flows.adls_gen1_to_azure_sql_new.ADLSGen1ToAzureSQLNew (Flow)
Move file(s) from Azure Data Lake gen1 to gen2.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
gen1_path |
str |
The path to the gen1 Data Lake file/folder. |
required |
gen2_path |
str |
The path of the final gen2 file/folder. |
required |
local_file_path |
str |
Where the gen1 file should be downloaded. |
required |
overwrite |
str |
Whether to overwrite the destination file(s). |
required |
read_sep |
str |
The delimiter for the gen1 file. |
required |
write_sep |
str |
The delimiter for the output file. |
required |
read_quoting |
str |
The quoting option for the input file. |
required |
read_lineterminator |
str |
The line terminator for the input file. |
required |
read_error_bad_lines |
bool |
Whether to raise an exception on bad lines. |
required |
gen1_sp_credentials_secret |
str |
The Key Vault secret holding Service Pricipal credentials for gen1 lake |
required |
gen2_sp_credentials_secret |
str |
The Key Vault secret holding Service Pricipal credentials for gen2 lake |
required |
sqldb_credentials_secret |
str |
The Key Vault secret holding Azure SQL Database credentials |
required |
vault_name |
str |
The name of the vault from which to retrieve |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
viadot.flows.adls_gen1_to_azure_sql.ADLSGen1ToAzureSQL (Flow)
Bulk insert a file from an Azure Data Lake gen1 to Azure SQL Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
path |
str |
The path to the Data Lake file/folder. |
required |
blob_path |
str |
The path of the generated blob. |
required |
dtypes |
dict |
Which dtypes to use when creating the table in Azure SQL Database. |
required |
local_file_path |
str |
Where the gen1 file should be downloaded. |
required |
sp_credentials_secret |
str |
The Key Vault secret holding Service Pricipal credentials |
required |
vault_name |
str |
The name of the vault from which to retrieve |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
viadot.flows.adls_gen1_to_gen2.ADLSGen1ToGen2 (Flow)
Move file(s) from Azure Data Lake gen1 to gen2.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
gen1_path |
str |
The path to the gen1 Data Lake file/folder. |
required |
gen2_path |
str |
The path of the final gen2 file/folder. |
required |
local_file_path |
str |
Where the gen1 file should be downloaded. |
required |
overwrite |
str |
Whether to overwrite the destination file(s). |
required |
gen1_sp_credentials_secret |
str |
The Key Vault secret holding Service Pricipal credentials for gen1 lake |
required |
gen2_sp_credentials_secret |
str |
The Key Vault secret holding Service Pricipal credentials for gen2 lake |
required |
vault_name |
str |
The name of the vault from which to retrieve the secrets. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
viadot.flows.adls_to_azure_sql.ADLSToAzureSQL (Flow)
__init__(self, name, local_file_path=None, adls_path=None, read_sep='\t', write_sep='\t', remove_tab=False, overwrite_adls=True, if_empty='warn', adls_sp_credentials_secret=None, dtypes=None, check_dtypes_order=True, table=None, schema=None, if_exists='replace', check_col_order=True, sqldb_credentials_secret=None, on_bcp_error='skip', max_download_retries=5, tags=['promotion'], vault_name=None, timeout=3600, *args, **kwargs)
special
Flow for downloading data from different marketing APIs to a local CSV using Supermetrics API, then uploading it to Azure Data Lake, and finally inserting into Azure SQL Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
local_file_path |
str |
Local destination path. Defaults to None. |
None |
adls_path |
str |
The path to an ADLS folder or file. If you pass a path to a directory, |
None |
read_sep |
str |
The delimiter for the source file. Defaults to " ". |
'\t' |
write_sep |
str |
The delimiter for the output CSV file. Defaults to " ". |
'\t' |
remove_tab |
bool |
Whether to remove tab delimiters from the data. Defaults to False. |
False |
overwrite_adls |
bool |
Whether to overwrite the file in ADLS. Defaults to True. |
True |
if_empty |
str |
What to do if the Supermetrics query returns no data. Defaults to "warn". |
'warn' |
adls_sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
dtypes |
dict |
Which custom data types should be used for SQL table creation task. |
None |
check_dtypes_order |
bool, optiona |
By default, this task will be used by all the flows. It can be set up to False for avoiding its application. Defaults to True. |
True |
table |
str |
Destination table. Defaults to None. |
None |
schema |
str |
Destination schema. Defaults to None. |
None |
if_exists |
Literal |
What to do if the table exists. Defaults to "replace". |
'replace' |
check_col_order |
bool |
Whether to check column order. Defaults to True. |
True |
sqldb_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
on_bcp_error |
Literal["skip", "fail"] |
What to do if error occurs. Defaults to "skip". |
'skip' |
max_download_retries |
int |
How many times to retry the download. Defaults to 5. |
5 |
tags |
List[str] |
Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"]. |
['promotion'] |
vault_name |
str |
The name of the vault from which to obtain the secrets. Defaults to None. |
None |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
Source code in viadot/flows/adls_to_azure_sql.py
def __init__(
self,
name: str,
local_file_path: str = None,
adls_path: str = None,
read_sep: str = "\t",
write_sep: str = "\t",
remove_tab: bool = False,
overwrite_adls: bool = True,
if_empty: str = "warn",
adls_sp_credentials_secret: str = None,
dtypes: Dict[str, Any] = None,
check_dtypes_order: bool = True,
table: str = None,
schema: str = None,
if_exists: Literal["fail", "replace", "append", "delete"] = "replace",
check_col_order: bool = True,
sqldb_credentials_secret: str = None,
on_bcp_error: Literal["skip", "fail"] = "skip",
max_download_retries: int = 5,
tags: List[str] = ["promotion"],
vault_name: str = None,
timeout: int = 3600,
*args: List[any],
**kwargs: Dict[str, Any],
):
"""
Flow for downloading data from different marketing APIs to a local CSV
using Supermetrics API, then uploading it to Azure Data Lake,
and finally inserting into Azure SQL Database.
Args:
name (str): The name of the flow.
local_file_path (str, optional): Local destination path. Defaults to None.
adls_path (str): The path to an ADLS folder or file. If you pass a path to a directory,
the latest file from that directory will be loaded. We assume that the files are named using timestamps.
read_sep (str, optional): The delimiter for the source file. Defaults to "\t".
write_sep (str, optional): The delimiter for the output CSV file. Defaults to "\t".
remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False.
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
dtypes (dict, optional): Which custom data types should be used for SQL table creation task.
To be used only in case that dtypes need to be manually mapped - dtypes from raw schema file in use by default. Defaults to None.
check_dtypes_order (bool, optiona): By default, this task will be used by all the flows. It can
be set up to False for avoiding its application. Defaults to True.
table (str, optional): Destination table. Defaults to None.
schema (str, optional): Destination schema. Defaults to None.
if_exists (Literal, optional): What to do if the table exists. Defaults to "replace".
check_col_order (bool, optional): Whether to check column order. Defaults to True.
sqldb_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
Azure SQL Database credentials. Defaults to None.
on_bcp_error (Literal["skip", "fail"], optional): What to do if error occurs. Defaults to "skip".
max_download_retries (int, optional): How many times to retry the download. Defaults to 5.
tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"].
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
adls_path = adls_path.strip("/")
# Read parquet
if adls_path.split(".")[-1] in ["csv", "parquet"]:
self.adls_path = adls_path
else:
self.adls_path = get_key_value(key=adls_path)
# Read schema
self.dtypes = dtypes
self.check_dtypes_order = check_dtypes_order
self.adls_root_dir_path = os.path.split(self.adls_path)[0]
self.adls_file_name = os.path.split(self.adls_path)[-1]
extension = os.path.splitext(self.adls_path)[-1]
json_schema_file_name = self.adls_file_name.replace(extension, ".json")
self.json_shema_path = os.path.join(
self.adls_root_dir_path,
"schema",
json_schema_file_name,
)
# AzureDataLakeUpload
self.local_file_path = local_file_path or self.slugify(name) + ".csv"
self.local_json_path = self.slugify(name) + ".json"
self.read_sep = read_sep
self.write_sep = write_sep
self.overwrite_adls = overwrite_adls
self.if_empty = if_empty
self.adls_sp_credentials_secret = adls_sp_credentials_secret
self.adls_path_conformed = self.get_promoted_path(env="conformed")
self.adls_path_operations = self.get_promoted_path(env="operations")
# AzureSQLCreateTable
self.table = table
self.schema = schema
self.if_exists = if_exists
self.check_col_order = check_col_order
# Generate CSV
self.remove_tab = remove_tab
# BCPTask
self.sqldb_credentials_secret = sqldb_credentials_secret
self.on_bcp_error = on_bcp_error
# Global
self.max_download_retries = max_download_retries
self.tags = tags
self.vault_name = vault_name
self.timeout = timeout
super().__init__(*args, name=name, **kwargs)
# self.dtypes.update(METADATA_COLUMNS)
self.gen_flow()
viadot.flows.azure_sql_transform.AzureSQLTransform (Flow)
__init__(self, name, query, sqldb_credentials_secret=None, vault_name=None, tags=['transform'], timeout=3600, *args, **kwargs)
special
Flow for running SQL queries on top of Azure SQL Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
query |
str, required |
The query to execute on the database. |
required |
credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary |
required |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
tags |
list |
Tag for marking flow. Defaults to "transform". |
['transform'] |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
Source code in viadot/flows/azure_sql_transform.py
def __init__(
self,
name: str,
query: str,
sqldb_credentials_secret: str = None,
vault_name: str = None,
tags: List[str] = ["transform"],
timeout: int = 3600,
*args: List[any],
**kwargs: Dict[str, Any]
):
"""
Flow for running SQL queries on top of Azure SQL Database.
Args:
name (str): The name of the flow.
query (str, required): The query to execute on the database.
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with SQL db credentials (server, db_name, user, and password).
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
tags (list, optional): Tag for marking flow. Defaults to "transform".
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
self.query = query
self.tags = tags
self.sqldb_credentials_secret = sqldb_credentials_secret
self.vault_name = vault_name
self.timeout = timeout
super().__init__(*args, name=name, **kwargs)
self.gen_flow()
viadot.flows.supermetrics_to_adls.SupermetricsToADLS (Flow)
__init__(self, name, ds_id, ds_accounts, fields, ds_user=None, ds_segments=None, date_range_type=None, start_date=None, end_date=None, settings=None, filter=None, max_rows=1000000, max_columns=None, order_columns=None, expectation_suite=None, evaluation_parameters=None, keep_validation_output=False, output_file_extension='.parquet', local_file_path=None, adls_file_name=None, adls_dir_path=None, overwrite_adls=True, if_empty='warn', if_exists='replace', adls_sp_credentials_secret=None, max_download_retries=5, supermetrics_task_timeout=1800, parallel=True, tags=['extract'], vault_name=None, check_missing_data=True, timeout=3600, *args, **kwargs)
special
Flow for downloading data from different marketing APIs to a local CSV using Supermetrics API, then uploading it to Azure Data Lake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
required |
ds_id |
str |
A query parameter passed to the SupermetricsToCSV task |
required |
ds_accounts |
List[str] |
A query parameter passed to the SupermetricsToCSV task |
required |
ds_user |
str |
A query parameter passed to the SupermetricsToCSV task |
None |
fields |
List[str] |
A query parameter passed to the SupermetricsToCSV task |
required |
ds_segments |
List[str] |
A query parameter passed to the SupermetricsToCSV task. Defaults to None. |
None |
date_range_type |
str |
A query parameter passed to the SupermetricsToCSV task. Defaults to None. |
None |
start_date |
str |
A query parameter to pass start date to the date range filter. Defaults to None. |
None |
end_date |
str |
A query parameter to pass end date to the date range filter. Defaults to None. |
None |
settings |
Dict[str, Any] |
A query parameter passed to the SupermetricsToCSV task. Defaults to None. |
None |
filter |
str |
A query parameter passed to the SupermetricsToCSV task. Defaults to None. |
None |
max_rows |
int |
A query parameter passed to the SupermetricsToCSV task. Defaults to 1000000. |
1000000 |
max_columns |
int |
A query parameter passed to the SupermetricsToCSV task. Defaults to None. |
None |
order_columns |
str |
A query parameter passed to the SupermetricsToCSV task. Defaults to None. |
None |
expectation_suite |
dict |
The Great Expectations suite used to valiaate the data. Defaults to None. |
None |
evaluation_parameters |
str |
A dictionary containing evaluation parameters for the validation. Defaults to None. |
None |
keep_validation_output |
bool |
Whether to keep the output files generated by the Great Expectations task. Defaults to False. |
False |
local_file_path |
str |
Local destination path. Defaults to None. |
None |
adls_file_name |
str |
Name of file in ADLS. Defaults to None. |
None |
output_file_extension |
str |
Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet".. |
'.parquet' |
adls_dir_path |
str |
Azure Data Lake destination folder/catalog path. Defaults to None. |
None |
sep |
str |
The separator to use in the CSV. Defaults to " ". |
required |
overwrite_adls |
bool |
Whether to overwrite the file in ADLS. Defaults to True. |
True |
if_empty |
str |
What to do if the Supermetrics query returns no data. Defaults to "warn". |
'warn' |
adls_sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
max_download_retries |
int |
How many times to retry the download. Defaults to 5. |
5 |
supermetrics_task_timeout |
int |
The timeout for the download task. Defaults to 60*30. |
1800 |
parallel |
bool |
Whether to parallelize the downloads. Defaults to True. |
True |
tags |
List[str] |
Flow tags to use, eg. to control flow concurrency. Defaults to ["extract"]. |
['extract'] |
vault_name |
str |
The name of the vault from which to obtain the secrets. Defaults to None. |
None |
check_missing_data |
bool |
Whether to check missing data. Defaults to True. |
True |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
Source code in viadot/flows/supermetrics_to_adls.py
def __init__(
self,
name: str,
ds_id: str,
ds_accounts: List[str],
fields: List[str],
ds_user: str = None,
ds_segments: List[str] = None,
date_range_type: str = None,
start_date: str = None,
end_date: str = None,
settings: Dict[str, Any] = None,
filter: str = None,
max_rows: int = 1000000,
max_columns: int = None,
order_columns: str = None,
expectation_suite: dict = None,
evaluation_parameters: dict = None,
keep_validation_output: bool = False,
output_file_extension: str = ".parquet",
local_file_path: str = None,
adls_file_name: str = None,
adls_dir_path: str = None,
overwrite_adls: bool = True,
if_empty: str = "warn",
if_exists: str = "replace",
adls_sp_credentials_secret: str = None,
max_download_retries: int = 5,
supermetrics_task_timeout: int = 60 * 30,
parallel: bool = True,
tags: List[str] = ["extract"],
vault_name: str = None,
check_missing_data: bool = True,
timeout: int = 3600,
*args: List[any],
**kwargs: Dict[str, Any],
):
"""
Flow for downloading data from different marketing APIs to a local CSV
using Supermetrics API, then uploading it to Azure Data Lake.
Args:
name (str): The name of the flow.
ds_id (str): A query parameter passed to the SupermetricsToCSV task
ds_accounts (List[str]): A query parameter passed to the SupermetricsToCSV task
ds_user (str): A query parameter passed to the SupermetricsToCSV task
fields (List[str]): A query parameter passed to the SupermetricsToCSV task
ds_segments (List[str], optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
date_range_type (str, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
start_date (str, optional): A query parameter to pass start date to the date range filter. Defaults to None.
end_date (str, optional): A query parameter to pass end date to the date range filter. Defaults to None.
settings (Dict[str, Any], optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
filter (str, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
max_rows (int, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to 1000000.
max_columns (int, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
order_columns (str, optional): A query parameter passed to the SupermetricsToCSV task. Defaults to None.
expectation_suite (dict, optional): The Great Expectations suite used to valiaate the data. Defaults to None.
evaluation_parameters (str, optional): A dictionary containing evaluation parameters for the validation. Defaults to None.
keep_validation_output (bool, optional): Whether to keep the output files generated by the Great Expectations task. Defaults to False.
Currently, only GitHub URLs are supported. Defaults to None.
local_file_path (str, optional): Local destination path. Defaults to None.
adls_file_name (str, optional): Name of file in ADLS. Defaults to None.
output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".parquet"..
adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None.
sep (str, optional): The separator to use in the CSV. Defaults to "\t".
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
max_download_retries (int, optional): How many times to retry the download. Defaults to 5.
supermetrics_task_timeout (int, optional): The timeout for the download task. Defaults to 60*30.
parallel (bool, optional): Whether to parallelize the downloads. Defaults to True.
tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["extract"].
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
check_missing_data (bool, optional): Whether to check missing data. Defaults to True.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
if not ds_user:
try:
ds_user = PrefectSecret("SUPERMETRICS_DEFAULT_USER").run()
except ValueError as e:
msg = "Neither 'ds_user' parameter nor 'SUPERMETRICS_DEFAULT_USER' secret were not specified"
raise ValueError(msg) from e
self.flow_name = name
self.check_missing_data = check_missing_data
self.timeout = timeout
# SupermetricsToDF
self.ds_id = ds_id
self.ds_accounts = ds_accounts
self.ds_segments = ds_segments
self.ds_user = ds_user
self.fields = fields
self.date_range_type = date_range_type
self.start_date = start_date
self.end_date = end_date
self.settings = settings
self.filter = filter
self.max_rows = max_rows
self.max_columns = max_columns
self.order_columns = order_columns
self.if_exists = if_exists
self.output_file_extension = output_file_extension
# RunGreatExpectationsValidation
self.expectation_suite = expectation_suite
self.expectations_path = "/home/viadot/tmp/expectations"
self.expectation_suite_name = expectation_suite["expectation_suite_name"]
self.evaluation_parameters = evaluation_parameters
self.keep_validation_output = keep_validation_output
# AzureDataLakeUpload
self.local_file_path = (
local_file_path or self.slugify(name) + self.output_file_extension
)
self.local_json_path = self.slugify(name) + ".json"
self.now = str(pendulum.now("utc"))
self.adls_dir_path = adls_dir_path
if adls_file_name is not None:
self.adls_file_path = os.path.join(adls_dir_path, adls_file_name)
self.adls_schema_file_dir_file = os.path.join(
adls_dir_path, "schema", Path(adls_file_name).stem + ".json"
)
else:
self.adls_file_path = os.path.join(
adls_dir_path, self.now + self.output_file_extension
)
self.adls_schema_file_dir_file = os.path.join(
adls_dir_path, "schema", self.now + ".json"
)
self.overwrite_adls = overwrite_adls
self.if_empty = if_empty
self.adls_sp_credentials_secret = adls_sp_credentials_secret
# Global
self.max_download_retries = max_download_retries
self.supermetrics_task_timeout = supermetrics_task_timeout
self.parallel = parallel
self.tags = tags
self.vault_name = vault_name
super().__init__(*args, name=name, **kwargs)
self.gen_flow()
viadot.flows.supermetrics_to_azure_sql.SupermetricsToAzureSQL (Flow)
viadot.flows.cloud_for_customers_report_to_adls.CloudForCustomersReportToADLS (Flow)
__init__(self, name=None, report_url=None, url=None, endpoint=None, params={}, fields=None, skip=0, top=1000, channels=None, months=None, years=None, env='QA', c4c_credentials_secret=None, local_file_path=None, output_file_extension='.csv', adls_dir_path=None, adls_file_path=None, overwrite_adls=False, adls_sp_credentials_secret=None, if_empty='warn', if_exists='replace', timeout=3600, *args, **kwargs)
special
Flow for downloading data from different marketing APIs to a local CSV using Cloud for Customers API, then uploading it to Azure Data Lake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flow. |
None |
report_url |
str |
The url to the API. Defaults to None. |
None |
url |
str |
??? |
None |
endpoint |
str |
??? |
None |
params |
dict |
??? |
{} |
fields |
list |
??? |
None |
skip |
int |
Initial index value of reading row. Defaults to 0. |
0 |
top |
int |
The value of top reading row. Defaults to 1000. |
1000 |
channels |
List[str] |
Filtering parameters passed to the url. Defaults to None. |
None |
months |
List[str] |
Filtering parameters passed to the url. Defaults to None. |
None |
years |
List[str] |
Filtering parameters passed to the url. Defaults to None. |
None |
env |
str |
??? |
'QA' |
c4c_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
local_file_path |
str |
Local destination path. Defaults to None. |
None |
output_file_extension |
str |
Output file extension - to allow selection of .csv for data which is not easy |
'.csv' |
adls_dir_path |
str |
Azure Data Lake destination folder/catalog path. Defaults to None. |
None |
adls_file_path |
str |
Azure Data Lake destination file path. Defaults to None. |
None |
overwrite_adls |
bool |
Whether to overwrite the file in ADLS. Defaults to False. |
False |
adls_sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
if_empty |
str |
What to do if the Supermetrics query returns no data. Defaults to "warn". |
'warn' |
if_exists |
str |
What to do if the local file already exists. Defaults to "replace". |
'replace' |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
Source code in viadot/flows/cloud_for_customers_report_to_adls.py
def __init__(
self,
name: str = None,
report_url: str = None,
url: str = None,
endpoint: str = None,
params: Dict[str, Any] = {},
fields: List[str] = None,
skip: int = 0,
top: int = 1000,
channels: List[str] = None,
months: List[str] = None,
years: List[str] = None,
env: str = "QA",
c4c_credentials_secret: str = None,
local_file_path: str = None,
output_file_extension: str = ".csv",
adls_dir_path: str = None,
adls_file_path: str = None,
overwrite_adls: bool = False,
adls_sp_credentials_secret: str = None,
if_empty: str = "warn",
if_exists: str = "replace",
timeout: int = 3600,
*args: List[any],
**kwargs: Dict[str, Any],
):
"""
Flow for downloading data from different marketing APIs to a local CSV
using Cloud for Customers API, then uploading it to Azure Data Lake.
Args:
name (str): The name of the flow.
report_url (str, optional): The url to the API. Defaults to None.
url (str, optional): ???
endpoint (str, optional): ???
params (dict, optional): ???
fields (list, optional): ???
skip (int, optional): Initial index value of reading row. Defaults to 0.
top (int, optional): The value of top reading row. Defaults to 1000.
channels (List[str], optional): Filtering parameters passed to the url. Defaults to None.
months (List[str], optional): Filtering parameters passed to the url. Defaults to None.
years (List[str], optional): Filtering parameters passed to the url. Defaults to None.
env (str, optional): ???
c4c_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
username and password for the Cloud for Customers instance.
local_file_path (str, optional): Local destination path. Defaults to None.
output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy
to handle with parquet. Defaults to ".csv".
adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None.
adls_file_path (str, optional): Azure Data Lake destination file path. Defaults to None.
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to False.
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake.
Defaults to None.
if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
if_exists (str, optional): What to do if the local file already exists. Defaults to "replace".
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
self.report_url = report_url
self.skip = skip
self.top = top
self.if_empty = if_empty
self.env = env
self.c4c_credentials_secret = c4c_credentials_secret
self.timeout = timeout
# AzureDataLakeUpload
self.adls_sp_credentials_secret = adls_sp_credentials_secret
self.if_exists = if_exists
self.overwrite_adls = overwrite_adls
self.output_file_extension = output_file_extension
self.local_file_path = (
local_file_path or slugify(name) + self.output_file_extension
)
self.now = str(pendulum.now("utc"))
self.adls_dir_path = adls_dir_path
self.adls_file_path = adls_file_path or os.path.join(
adls_dir_path, self.now + self.output_file_extension
)
# in case of non-report invoking
self.url = url
self.endpoint = endpoint
self.params = params
self.fields = fields
# filtering report_url for reports
self.channels = channels
self.months = months
self.years = years
self.report_urls_with_filters = [self.report_url]
self.report_urls_with_filters = self.create_url_with_fields(
fields_list=self.channels, filter_code="CCHANNETZTEXT12CE6C2FA0D77995"
)
self.report_urls_with_filters = self.create_url_with_fields(
fields_list=self.months, filter_code="CMONTH_ID"
)
self.report_urls_with_filters = self.create_url_with_fields(
fields_list=self.years, filter_code="CYEAR_ID"
)
super().__init__(*args, name=name, **kwargs)
self.gen_flow()