Task library
viadot.tasks.open_apis.uk_carbon_intensity.StatsToCSV (Task)
A Prefect task for downloading UK Carbon Instensity Statistics (stats) to a csv file.
__call__(self)
special
Run the task.
Parameters
path : str Path of the csv file created or edited by this task days_back : int, optional How many days of stats to download in the csv. UK Carbon Intensity statistics are available for up to 30 days, by default 10
Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __call__(self):
"""
Run the task.
Parameters
----------
path : str
Path of the csv file created or edited by this task
days_back : int, optional
How many days of stats to download in the csv.
UK Carbon Intensity statistics are available for up to 30 days,
by default 10
"""
__init__(self, *args, **kwargs)
special
Generate the task.
Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __init__(self, *args, **kwargs):
"""Generate the task."""
super().__init__(name="uk_carbon_intensity_stats_to_csv", *args, **kwargs)
run(self, path, days_back=10)
Run the task.
Parameters
path : str Path of the csv file created or edited by this task days_back : int, optional How many days of stats to download in the csv. UK Carbon Intensity statistics are available for up to 30 days, by default 10
Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def run(self, path: str, days_back: int = 10):
"""
Run the task.
Parameters
----------
path : str
Path of the csv file created or edited by this task
days_back : int, optional
How many days of stats to download in the csv.
UK Carbon Intensity statistics are available for up to 30 days,
by default 10
"""
logger = prefect.context.get("logger")
carbon = UKCarbonIntensity()
now = datetime.datetime.now()
logger.info(f"Downloading data to {path}...")
for i in range(days_back):
from_delta = datetime.timedelta(days=i + 1)
to_delta = datetime.timedelta(days=i)
to = now - to_delta
from_ = now - from_delta
carbon.query(f"/intensity/stats/{from_.isoformat()}/{to.isoformat()}")
carbon.to_csv(path, if_exists="append")
# Download data to a local CSV file
logger.info(f"Successfully downloaded data to {path}.")
viadot.tasks.open_apis.uk_carbon_intensity.StatsToExcel (Task)
A Prefect task for downloading UK Carbon Instensity Statistics (stats) to a excel file.
__call__(self)
special
Run the task.
Parameters
path : str Path of the csv file created or edited by this task days_back : int, optional How many days of stats to download in the excel. UK Carbon Intensity statistics are available for up to 30 days, by default 10
Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __call__(self):
"""
Run the task.
Parameters
----------
path : str
Path of the csv file created or edited by this task
days_back : int, optional
How many days of stats to download in the excel.
UK Carbon Intensity statistics are available for up to 30 days,
by default 10
"""
__init__(self, *args, **kwargs)
special
Generate the task.
Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __init__(self, *args, **kwargs):
"""Generate the task."""
super().__init__(name="uk_carbon_intensity_stats_to_excel", *args, **kwargs)
run(self, path, days_back=10)
Run the task.
Parameters
path : str Path of the excel file created or edited by this task days_back : int, optional How many days of stats to download in the excel. UK Carbon Intensity statistics are available for up to 30 days, by default 10
Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def run(self, path: str, days_back: int = 10):
"""
Run the task.
Parameters
----------
path : str
Path of the excel file created or edited by this task
days_back : int, optional
How many days of stats to download in the excel.
UK Carbon Intensity statistics are available for up to 30 days,
by default 10
"""
logger = prefect.context.get("logger")
carbon = UKCarbonIntensity()
now = datetime.datetime.now()
logger.info(f"Downloading data to {path}...")
for i in range(days_back):
from_delta = datetime.timedelta(days=i + 1)
to_delta = datetime.timedelta(days=i)
to = now - to_delta
from_ = now - from_delta
carbon.query(f"/intensity/stats/{from_.isoformat()}/{to.isoformat()}")
carbon.to_excel(path, if_exists="append")
# Download data to a local excel file
logger.info(f"Successfully downloaded data to {path}.")
viadot.tasks.azure_data_lake.AzureDataLakeDownload (Task)
Task for downloading data from the Azure Data lakes (gen1 and gen2).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
The path from which to download the file(s). Defaults to None. |
required |
to_path |
str |
The destination path. Defaults to None. |
required |
recursive |
bool |
Set this to true if downloading entire directories. |
required |
gen |
int |
The generation of the Azure Data Lake. Defaults to 2. |
required |
vault_name |
str |
The name of the vault from which to fetch the secret. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
max_retries |
int |
[description]. Defaults to 3. |
required |
retry_delay |
timedelta |
[description]. Defaults to timedelta(seconds=10). |
required |
__call__(self, *args, **kwargs)
special
Download file(s) from the Azure Data Lake
Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
"""Download file(s) from the Azure Data Lake"""
return super().__call__(*args, **kwargs)
run(self, from_path=None, to_path=None, recursive=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
The path from which to download the file(s). |
None |
to_path |
str |
The destination path. |
None |
recursive |
bool |
Set this to true if downloading entire directories. |
None |
gen |
int |
The generation of the Azure Data Lake. |
None |
sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
"from_path",
"to_path",
"recursive",
"gen",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
from_path: str = None,
to_path: str = None,
recursive: bool = None,
gen: int = None,
sp_credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> None:
"""Task run method.
Args:
from_path (str): The path from which to download the file(s).
to_path (str): The destination path.
recursive (bool): Set this to true if downloading entire directories.
gen (int): The generation of the Azure Data Lake.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
file_name = from_path.split("/")[-1]
to_path = to_path or file_name
if not sp_credentials_secret:
# attempt to read a default for the service principal secret name
try:
sp_credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass
if sp_credentials_secret:
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=sp_credentials_secret, vault_name=vault_name
)
credentials = json.loads(credentials_str)
else:
credentials = {
"ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
"AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
"AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
}
lake = AzureDataLake(gen=gen, credentials=credentials)
full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], from_path)
self.logger.info(f"Downloading data from {full_dl_path} to {to_path}...")
lake.download(from_path=from_path, to_path=to_path, recursive=recursive)
self.logger.info(f"Successfully downloaded data to {to_path}.")
viadot.tasks.azure_data_lake.AzureDataLakeUpload (Task)
Upload file(s) to Azure Data Lake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
The local path from which to upload the file(s). Defaults to None. |
required |
to_path |
str |
The destination path. Defaults to None. |
required |
recursive |
bool |
Set this to true if uploading entire directories. Defaults to False. |
required |
overwrite |
bool |
Whether to overwrite files in the lake. Defaults to False. |
required |
gen |
int |
The generation of the Azure Data Lake. Defaults to 2. |
required |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
__call__(self, *args, **kwargs)
special
Upload file(s) to the Azure Data Lake
Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
"""Upload file(s) to the Azure Data Lake"""
return super().__call__(*args, **kwargs)
run(self, from_path=None, to_path=None, recursive=None, overwrite=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
The path from which to upload the file(s). |
None |
to_path |
str |
The destination path. |
None |
recursive |
bool |
Set to true if uploading entire directories. |
None |
overwrite |
bool |
Whether to overwrite the file(s) if they exist. |
None |
gen |
int |
The generation of the Azure Data Lake. |
None |
sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
"from_path",
"to_path",
"recursive",
"overwrite",
"gen",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
from_path: str = None,
to_path: str = None,
recursive: bool = None,
overwrite: bool = None,
gen: int = None,
sp_credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> None:
"""Task run method.
Args:
from_path (str): The path from which to upload the file(s).
to_path (str): The destination path.
recursive (bool): Set to true if uploading entire directories.
overwrite (bool): Whether to overwrite the file(s) if they exist.
gen (int): The generation of the Azure Data Lake.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
if not sp_credentials_secret:
# attempt to read a default for the service principal secret name
try:
sp_credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass
if sp_credentials_secret:
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=sp_credentials_secret, vault_name=vault_name
)
credentials = json.loads(credentials_str)
else:
credentials = {
"ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
"AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
"AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
}
lake = AzureDataLake(gen=gen, credentials=credentials)
full_to_path = os.path.join(credentials["ACCOUNT_NAME"], to_path)
self.logger.info(f"Uploading data from {from_path} to {full_to_path}...")
lake.upload(
from_path=from_path,
to_path=to_path,
recursive=recursive,
overwrite=overwrite,
)
self.logger.info(f"Successfully uploaded data to {full_to_path}.")
viadot.tasks.azure_data_lake.AzureDataLakeToDF (Task)
__call__(self, *args, **kwargs)
special
Load file(s) from the Azure Data Lake to a pandas DataFrame.
Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
"""Load file(s) from the Azure Data Lake to a pandas DataFrame."""
return super().__call__(*args, **kwargs)
__init__(self, path=None, sep='\t', quoting=0, lineterminator=None, error_bad_lines=None, gen=2, vault_name=None, timeout=3600, max_retries=3, retry_delay=datetime.timedelta(seconds=10), *args, **kwargs)
special
Load file(s) from the Azure Data Lake to a pandas DataFrame. Currently supports CSV and parquet files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path from which to load the DataFrame. Defaults to None. |
None |
sep |
str |
The separator to use when reading a CSV file. Defaults to " ". |
'\t' |
quoting |
int |
The quoting mode to use when reading a CSV file. Defaults to 0. |
0 |
lineterminator |
str |
The newline separator to use when reading a CSV file. Defaults to None. |
None |
error_bad_lines |
bool |
Whether to raise an exception on bad lines. Defaults to None. |
None |
gen |
int |
The generation of the Azure Data Lake. Defaults to 2. |
2 |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
Source code in viadot/tasks/azure_data_lake.py
def __init__(
self,
path: str = None,
sep: str = "\t",
quoting: int = 0,
lineterminator: str = None,
error_bad_lines: bool = None,
gen: int = 2,
vault_name: str = None,
timeout: int = 3600,
max_retries: int = 3,
retry_delay: timedelta = timedelta(seconds=10),
*args,
**kwargs,
):
"""Load file(s) from the Azure Data Lake to a pandas DataFrame.
Currently supports CSV and parquet files.
Args:
path (str, optional): The path from which to load the DataFrame. Defaults to None.
sep (str, optional): The separator to use when reading a CSV file. Defaults to "\t".
quoting (int, optional): The quoting mode to use when reading a CSV file. Defaults to 0.
lineterminator (str, optional): The newline separator to use when reading a CSV file. Defaults to None.
error_bad_lines (bool, optional): Whether to raise an exception on bad lines. Defaults to None.
gen (int, optional): The generation of the Azure Data Lake. Defaults to 2.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
timeout(int, optional): The amount of time (in seconds) to wait while running this task before
a timeout occurs. Defaults to 3600.
"""
self.path = path
self.sep = sep
self.quoting = quoting
self.lineterminator = lineterminator
self.error_bad_lines = error_bad_lines
self.gen = gen
self.vault_name = vault_name
super().__init__(
name="adls_to_df",
max_retries=max_retries,
retry_delay=retry_delay,
timeout=timeout,
*args,
**kwargs,
)
run(self, path=None, sep=None, quoting=None, lineterminator=None, error_bad_lines=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to file(s) which should be loaded into a DataFrame. |
None |
sep |
str |
The field separator to use when loading the file to the DataFrame. |
None |
quoting |
int |
The quoting mode to use when reading a CSV file. Defaults to 0. |
None |
lineterminator |
str |
The newline separator to use when reading a CSV file. Defaults to None. |
None |
error_bad_lines |
bool |
Whether to raise an exception on bad lines. Defaults to None. |
None |
gen |
int |
The generation of the Azure Data Lake. |
None |
sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
"path",
"sep",
"quoting",
"lineterminator",
"error_bad_lines",
"gen",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
path: str = None,
sep: str = None,
quoting: int = None,
lineterminator: str = None,
error_bad_lines: bool = None,
gen: int = None,
sp_credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> pd.DataFrame:
"""Task run method.
Args:
path (str): The path to file(s) which should be loaded into a DataFrame.
sep (str): The field separator to use when loading the file to the DataFrame.
quoting (int, optional): The quoting mode to use when reading a CSV file. Defaults to 0.
lineterminator (str, optional): The newline separator to use when reading a CSV file. Defaults to None.
error_bad_lines (bool, optional): Whether to raise an exception on bad lines. Defaults to None.
gen (int): The generation of the Azure Data Lake.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
if quoting is None:
quoting = 0
if path is None:
raise ValueError("Please provide the path to the file to be downloaded.")
if not sp_credentials_secret:
# attempt to read a default for the service principal secret name
try:
sp_credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass
if sp_credentials_secret:
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=sp_credentials_secret, vault_name=vault_name
)
credentials = json.loads(credentials_str)
else:
credentials = {
"ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
"AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
"AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
}
lake = AzureDataLake(gen=gen, credentials=credentials, path=path)
full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path)
self.logger.info(f"Downloading data from {full_dl_path} to a DataFrame...")
df = lake.to_df(
sep=sep,
quoting=quoting,
lineterminator=lineterminator,
error_bad_lines=error_bad_lines,
)
self.logger.info(f"Successfully loaded data.")
return df
viadot.tasks.azure_data_lake.AzureDataLakeCopy (Task)
Task for copying data between the Azure Data lakes files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
The path from which to copy the file(s). Defaults to None. |
required |
to_path |
str |
The destination path. Defaults to None. |
required |
recursive |
bool |
Set this to true if copy entire directories. |
required |
gen |
int |
The generation of the Azure Data Lake. Defaults to 2. |
required |
vault_name |
str |
The name of the vault from which to fetch the secret. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
max_retries |
int |
[description]. Defaults to 3. |
required |
retry_delay |
timedelta |
[description]. Defaults to timedelta(seconds=10). |
required |
__call__(self, *args, **kwargs)
special
Copy file(s) from the Azure Data Lake
Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
"""Copy file(s) from the Azure Data Lake"""
return super().__call__(*args, **kwargs)
run(self, from_path=None, to_path=None, recursive=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
The path from which to copy the file(s). |
None |
to_path |
str |
The destination path. |
None |
recursive |
bool |
Set this to true if copying entire directories. |
None |
gen |
int |
The generation of the Azure Data Lake. |
None |
sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
"from_path",
"to_path",
"recursive",
"gen",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
from_path: str = None,
to_path: str = None,
recursive: bool = None,
gen: int = None,
sp_credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> None:
"""Task run method.
Args:
from_path (str): The path from which to copy the file(s).
to_path (str): The destination path.
recursive (bool): Set this to true if copying entire directories.
gen (int): The generation of the Azure Data Lake.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
file_name = from_path.split("/")[-1]
to_path = to_path or file_name
if not sp_credentials_secret:
# attempt to read a default for the service principal secret name
try:
sp_credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass
if sp_credentials_secret:
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=sp_credentials_secret, vault_name=vault_name
)
credentials = json.loads(credentials_str)
else:
credentials = {
"ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
"AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
"AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
}
lake = AzureDataLake(gen=gen, credentials=credentials)
full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], from_path)
self.logger.info(f"Copying data from {full_dl_path} to {to_path}...")
lake.cp(from_path=from_path, to_path=to_path, recursive=recursive)
self.logger.info(f"Successfully copied data to {to_path}.")
viadot.tasks.azure_data_lake.AzureDataLakeList (Task)
Task for listing files in Azure Data Lake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the directory which contents you want to list. Defaults to None. |
required |
gen |
int |
The generation of the Azure Data Lake. Defaults to 2. |
required |
vault_name |
str |
The name of the vault from which to fetch the secret. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
max_retries |
int |
[description]. Defaults to 3. |
required |
retry_delay |
timedelta |
[description]. Defaults to timedelta(seconds=10). |
required |
Returns:
Type | Description |
---|---|
List[str] |
The list of paths to the contents of |
run(self, path=None, recursive=False, file_to_match=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the directory which contents you want to list. Defaults to None. |
None |
recursive |
bool |
If True, recursively list all subdirectories and files. Defaults to False. |
False |
file_to_match |
str |
If exist it only returns files with that name. Defaults to None. |
None |
gen |
int |
The generation of the Azure Data Lake. Defaults to None. |
None |
sp_credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary with |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
List[str] |
The list of paths to the contents of |
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
"path",
"gen",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
path: str = None,
recursive: bool = False,
file_to_match: str = None,
gen: int = None,
sp_credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> List[str]:
"""Task run method.
Args:
path (str, optional): The path to the directory which contents you want to list. Defaults to None.
recursive (bool, optional): If True, recursively list all subdirectories and files. Defaults to False.
file_to_match (str, optional): If exist it only returns files with that name. Defaults to None.
gen (int): The generation of the Azure Data Lake. Defaults to None.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
Returns:
List[str]: The list of paths to the contents of `path`. These paths
do not include the container, eg. the path to the file located at
"https://my_storage_acc.blob.core.windows.net/raw/supermetrics/test_file.txt"
will be shown as "raw/supermetrics/test_file.txt".
"""
if not sp_credentials_secret:
# attempt to read a default for the service principal secret name
try:
sp_credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass
if sp_credentials_secret:
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=sp_credentials_secret, vault_name=vault_name
)
credentials = json.loads(credentials_str)
else:
credentials = {
"ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
"AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
"AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
}
lake = AzureDataLake(gen=gen, credentials=credentials)
full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path)
self.logger.info(f"Listing files in {full_dl_path}.")
if recursive:
self.logger.info("Loading ADLS directories recursively.")
files = lake.find(path)
if file_to_match:
conditions = [file_to_match in item for item in files]
valid_files = np.array([])
if any(conditions):
index = np.where(conditions)[0]
files = list(np.append(valid_files, [files[i] for i in index]))
else:
raise FileExistsError(
f"There are not any available file named {file_to_match}."
)
else:
files = lake.ls(path)
self.logger.info(f"Successfully listed files in {full_dl_path}.")
return files
viadot.tasks.azure_key_vault.AzureKeyVaultSecret (SecretBase)
Task for retrieving secrets from an Azure Key Vault and returning it as a dictionary. Note that all initialization arguments can optionally be provided or overwritten at runtime.
For authentication, there are two options: you can set the AZURE_CREDENTIALS
Prefect Secret
containing your Azure Key Vault credentials which will be passed directly to SecretClient
, or you
can configure your flow's runtime
environment
for EnvironmentCredential
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
secret (str |
the name of the secret to retrieve |
required |
- |
vault_name (str |
the name of the vault from which to fetch the secret |
required |
- |
secret_client_kwargs (dict |
additional keyword arguments to forward to the SecretClient. |
required |
- |
**kwargs (dict |
additional keyword arguments to pass to the Task constructor |
required |
run(self, secret=None, vault_name=None, credentials=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
secret (str |
the name of the secret to retrieve |
required |
- |
vault_name (str |
the name of the vault from which to fetch the secret |
required |
- |
credentials (dict |
your Azure Key Vault credentials passed from an upstream
Secret task. By default, credentials are read from the |
required |
Examples:
from prefect import Flow
from viadot.tasks import AzureKeyVaultSecret
azure_secret_task = AzureKeyVaultSecret()
with Flow(name="example") as f:
secret = azure_secret_task(secret="test", vault_name="my_vault_name")
out = f.run()
Returns:
Type | Description |
---|---|
- str |
the contents of this secret, as a string |
Source code in viadot/tasks/azure_key_vault.py
@defaults_from_attrs("secret", "vault_name")
def run(
self,
secret: str = None,
vault_name: str = None,
credentials: dict = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> str:
"""
Task run method.
Args:
- secret (str): the name of the secret to retrieve
- vault_name (str): the name of the vault from which to fetch the secret
- credentials (dict, optional): your Azure Key Vault credentials passed from an upstream
Secret task. By default, credentials are read from the `AZURE_CREDENTIALS`
Prefect Secret; this Secret must be a JSON string
with the subkey `KEY_VAULT` and then vault_name containing three keys:
`AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`, which will be
passed directly to `SecretClient`. If not provided here or in context, the task
will fall back to Azure credentials discovery using `EnvironmentCredential()`.
Example `AZURE_CREDENTIALS` environment variable:
`export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'`
Example:
```python
from prefect import Flow
from viadot.tasks import AzureKeyVaultSecret
azure_secret_task = AzureKeyVaultSecret()
with Flow(name="example") as f:
secret = azure_secret_task(secret="test", vault_name="my_vault_name")
out = f.run()
```
Returns:
- str: the contents of this secret, as a string
"""
if secret is None:
raise ValueError("A secret name must be provided.")
key_vault = get_key_vault(
vault_name=vault_name,
credentials=credentials,
secret_client_kwargs=self.secret_client_kwargs,
)
secret_string = key_vault.get_secret(secret).value
return secret_string
viadot.tasks.azure_key_vault.CreateAzureKeyVaultSecret (SecretBase)
Task for creating secrets in an Azure Key Vault. Note that all initialization arguments can optionally be provided or overwritten at runtime.
For authentication, there are two options: you can set the AZURE_CREDENTIALS
Prefect Secret
containing your Azure Key Vault credentials which will be passed directly to SecretClient
, or you
can configure your flow's runtime
environment
for EnvironmentCredential
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
secret (str |
the name of the secret to retrieve |
required |
- |
vault_name (str |
the name of the vault from which to fetch the secret |
required |
- |
secret_client_kwargs (dict |
additional keyword arguments to forward to the SecretClient. |
required |
- |
**kwargs (dict |
additional keyword arguments to pass to the Task constructor |
required |
run(self, secret=None, value=None, lifetime=None, vault_name=None, credentials=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
secret (str |
the name of the secret to set |
required |
- |
value (str |
the value which the secret will hold |
required |
- |
lifetime (int |
The number of days after which the secret should expire. |
required |
- |
vault_name (str |
the name of the vault from which to fetch the secret |
required |
- |
credentials (dict |
your Azure Key Vault credentials passed from an upstream
Secret task; this Secret must be a JSON string
with the subkey |
required |
Examples:
from prefect import Flow
from prefect.tasks.secrets import PrefectSecret
from viadot.tasks import CreateAzureKeyVaultSecret
create_secret_task = CreateAzureKeyVaultSecret()
with Flow(name="example") as f:
azure_credentials = PrefectSecret("AZURE_CREDENTIALS")
secret = create_secret_task(secret="test2", value=42, vault_name="my_vault_name", credentials=azure_credentials)
out = f.run()
Returns:
Type | Description |
---|---|
- bool |
Whether the secret was created successfully. |
Source code in viadot/tasks/azure_key_vault.py
@defaults_from_attrs(
"secret",
"value",
"lifetime",
"vault_name",
)
def run(
self,
secret: str = None,
value: str = None,
lifetime: int = None,
vault_name: str = None,
credentials: dict = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> bool:
"""
Task run method.
Args:
- secret (str): the name of the secret to set
- value (str): the value which the secret will hold
- lifetime (int): The number of days after which the secret should expire.
- vault_name (str): the name of the vault from which to fetch the secret
- credentials (dict, optional): your Azure Key Vault credentials passed from an upstream
Secret task; this Secret must be a JSON string
with the subkey `KEY_VAULT` and then vault_name containing three keys:
`AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`, which will be
passed directly to `SecretClient`. If not provided here or in context, the task
will fall back to Azure credentials discovery using `EnvironmentCredential()`.
Example `AZURE_CREDENTIALS` environment variable:
`export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'`
Example:
```python
from prefect import Flow
from prefect.tasks.secrets import PrefectSecret
from viadot.tasks import CreateAzureKeyVaultSecret
create_secret_task = CreateAzureKeyVaultSecret()
with Flow(name="example") as f:
azure_credentials = PrefectSecret("AZURE_CREDENTIALS")
secret = create_secret_task(secret="test2", value=42, vault_name="my_vault_name", credentials=azure_credentials)
out = f.run()
```
Returns:
- bool: Whether the secret was created successfully.
"""
if secret is None:
raise ValueError("A secret name must be provided.")
key_vault = get_key_vault(
vault_name=vault_name,
credentials=credentials,
secret_client_kwargs=self.secret_client_kwargs,
)
expires_on = pendulum.now("UTC").add(days=lifetime)
secret_obj = key_vault.set_secret(secret, value, expires_on=expires_on)
was_successful = secret_obj.name == secret
return was_successful
viadot.tasks.azure_key_vault.DeleteAzureKeyVaultSecret (SecretBase)
Task for removing ("soft delete") a secret from an Azure Key Vault. Note that all initialization arguments can optionally be provided or overwritten at runtime.
For authentication, there are two options: you can set the AZURE_CREDENTIALS
Prefect Secret
containing your Azure Key Vault credentials which will be passed directly to SecretClient
, or you
can configure your flow's runtime
environment
for EnvironmentCredential
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
secret (str |
the name of the secret to retrieve |
required |
- |
vault_name (str |
the name of the vault from which to fetch the secret |
required |
- |
secret_client_kwargs (dict |
additional keyword arguments to forward to the SecretClient. |
required |
- |
**kwargs (dict |
additional keyword arguments to pass to the Task constructor |
required |
run(self, secret=None, vault_name=None, credentials=None, max_retries=None, retry_delay=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
secret (str |
the name of the secret to delete |
required |
- |
vault_name (str |
the name of the vault whethe the secret is located |
required |
- |
credentials (dict |
your Azure Key Vault credentials passed from an upstream
Secret task. By default, credentials are read from the |
required |
Examples:
from prefect import Flow
from viadot.tasks import DeleteAzureKeyVaultSecret
azure_secret_task = DeleteAzureKeyVaultSecret()
with Flow(name="example") as f:
secret = azure_secret_task(secret="test", vault_name="my_vault_name")
out = f.run()
Returns:
Type | Description |
---|---|
- bool |
Whether the secret was deleted successfully. |
Source code in viadot/tasks/azure_key_vault.py
@defaults_from_attrs("secret", "vault_name")
def run(
self,
secret: str = None,
vault_name: str = None,
credentials: dict = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> bool:
"""
Task run method.
Args:
- secret (str): the name of the secret to delete
- vault_name (str): the name of the vault whethe the secret is located
- credentials (dict, optional): your Azure Key Vault credentials passed from an upstream
Secret task. By default, credentials are read from the `AZURE_CREDENTIALS`
Prefect Secret; this Secret must be a JSON string
with the subkey `KEY_VAULT` and then vault_name containing three keys:
`AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`, which will be
passed directly to `SecretClient`. If not provided here or in context, the task
will fall back to Azure credentials discovery using `EnvironmentCredential()`.
Example `AZURE_CREDENTIALS` environment variable:
`export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'`
Example:
```python
from prefect import Flow
from viadot.tasks import DeleteAzureKeyVaultSecret
azure_secret_task = DeleteAzureKeyVaultSecret()
with Flow(name="example") as f:
secret = azure_secret_task(secret="test", vault_name="my_vault_name")
out = f.run()
```
Returns:
- bool: Whether the secret was deleted successfully.
"""
if secret is None:
raise ValueError("A secret name must be provided.")
key_vault = get_key_vault(
vault_name=vault_name,
credentials=credentials,
secret_client_kwargs=self.secret_client_kwargs,
)
poller = key_vault.begin_delete_secret(secret)
poller.wait(timeout=60 * 5)
was_successful = poller.status() == "finished"
return was_successful
viadot.tasks.azure_sql.AzureSQLBulkInsert (Task)
run(self, from_path=None, schema=None, table=None, dtypes=None, sep=None, if_exists=None, credentials_secret=None, vault_name=None)
Bulk insert data from Azure Data Lake into an Azure SQL Database table. This task also creates the table if it doesn't exist. Currently, only CSV files are supported.
from_path (str): Path to the file to be inserted. schema (str): Destination schema. table (str): Destination table. dtypes (Dict[str, Any]): Data types to force. sep (str): The separator to use to read the CSV file. if_exists (Literal, optional): What to do if the table already exists. credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with SQL db credentials (server, db_name, user, and password). vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
Source code in viadot/tasks/azure_sql.py
@defaults_from_attrs("sep", "if_exists", "credentials_secret")
def run(
self,
from_path: str = None,
schema: str = None,
table: str = None,
dtypes: Dict[str, Any] = None,
sep: str = None,
if_exists: Literal["fail", "replace", "append", "delete"] = None,
credentials_secret: str = None,
vault_name: str = None,
):
"""
Bulk insert data from Azure Data Lake into an Azure SQL Database table.
This task also creates the table if it doesn't exist.
Currently, only CSV files are supported.
Args:
from_path (str): Path to the file to be inserted.
schema (str): Destination schema.
table (str): Destination table.
dtypes (Dict[str, Any]): Data types to force.
sep (str): The separator to use to read the CSV file.
if_exists (Literal, optional): What to do if the table already exists.
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with SQL db credentials (server, db_name, user, and password).
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
fqn = f"{schema}.{table}" if schema else table
credentials = get_credentials(credentials_secret, vault_name=vault_name)
azure_sql = AzureSQL(credentials=credentials)
if if_exists == "replace":
azure_sql.create_table(
schema=schema, table=table, dtypes=dtypes, if_exists=if_exists
)
self.logger.info(f"Successfully created table {fqn}.")
azure_sql.bulk_insert(
schema=schema,
table=table,
source_path=from_path,
sep=sep,
if_exists=if_exists,
)
self.logger.info(f"Successfully inserted data into {fqn}.")
viadot.tasks.azure_sql.AzureSQLCreateTable (Task)
run(self, schema=None, table=None, dtypes=None, if_exists=None, credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)
Create a table in Azure SQL Database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schema |
str |
Destination schema. |
None |
table |
str |
Destination table. |
None |
dtypes |
Dict[str, Any] |
Data types to force. |
None |
if_exists |
Literal |
What to do if the table already exists. |
None |
credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Source code in viadot/tasks/azure_sql.py
@defaults_from_attrs("if_exists")
def run(
self,
schema: str = None,
table: str = None,
dtypes: Dict[str, Any] = None,
if_exists: Literal["fail", "replace", "skip", "delete"] = None,
credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
):
"""
Create a table in Azure SQL Database.
Args:
schema (str, optional): Destination schema.
table (str, optional): Destination table.
dtypes (Dict[str, Any], optional): Data types to force.
if_exists (Literal, optional): What to do if the table already exists.
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with SQL db credentials (server, db_name, user, and password).
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
credentials = get_credentials(credentials_secret, vault_name=vault_name)
azure_sql = AzureSQL(credentials=credentials)
fqn = f"{schema}.{table}" if schema is not None else table
created = azure_sql.create_table(
schema=schema, table=table, dtypes=dtypes, if_exists=if_exists
)
if created:
self.logger.info(f"Successfully created table {fqn}.")
else:
self.logger.info(
f"Table {fqn} has not been created as if_exists is set to {if_exists}."
)
viadot.tasks.azure_sql.AzureSQLDBQuery (Task)
Task for running an Azure SQL Database query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str, required |
The query to execute on the database. |
required |
credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary |
required |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
run(self, query, credentials_secret=None, vault_name=None)
Run an Azure SQL Database query
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str, required |
The query to execute on the database. |
required |
credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Source code in viadot/tasks/azure_sql.py
def run(
self,
query: str,
credentials_secret: str = None,
vault_name: str = None,
):
"""Run an Azure SQL Database query
Args:
query (str, required): The query to execute on the database.
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with SQL db credentials (server, db_name, user, and password).
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""
credentials = get_credentials(credentials_secret, vault_name=vault_name)
azure_sql = AzureSQL(credentials=credentials)
# run the query and fetch the results if it's a select
result = azure_sql.run(query)
self.logger.info(f"Successfully ran the query.")
return result
viadot.tasks.bcp.BCPTask (ShellTask)
Task for bulk inserting data into SQL Server-compatible databases.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
- |
path (str |
The path to the local CSV file to be inserted. |
required |
- |
schema (str |
The destination schema. |
required |
- |
table (str |
The destination table. |
required |
- |
chunksize (int |
The chunk size to use. |
required |
- |
error_log_file_path (string |
Full path of an error file. Defaults to "log_file.log". |
required |
- |
on_error (Literal["skip", "fail"] |
What to do if error occurs. Defaults to "skip". |
required |
- |
credentials (dict |
The credentials to use for connecting with the database. |
required |
- |
vault_name (str |
The name of the vault from which to fetch the secret. |
required |
- |
timeout(int |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
- |
**kwargs (dict |
Additional keyword arguments to pass to the Task constructor. |
required |
run(self, path=None, schema=None, table=None, chunksize=None, error_log_file_path=None, on_error=None, credentials=None, credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None, **kwargs)
Task run method.
- path (str, optional): The path to the local CSV file to be inserted.
- schema (str, optional): The destination schema.
- table (str, optional): The destination table.
- chunksize (int, optional): The chunk size to use. By default 5000.
- error_log_file_path (string, optional): Full path of an error file. Defaults to "log_file.log".
- on_error (Literal, optional): What to do if error occur. Defaults to None.
- credentials (dict, optional): The credentials to use for connecting with SQL Server.
- credentials_secret (str, optional): The name of the Key Vault secret containing database credentials. (server, db_name, user, password)
- vault_name (str): The name of the vault from which to fetch the secret.
Returns:
Type | Description |
---|---|
str |
The output of the bcp CLI command. |
Source code in viadot/tasks/bcp.py
@defaults_from_attrs(
"path",
"schema",
"table",
"chunksize",
"error_log_file_path",
"on_error",
"credentials",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
path: str = None,
schema: str = None,
table: str = None,
chunksize: int = None,
error_log_file_path: str = None,
on_error: Literal = None,
credentials: dict = None,
credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
**kwargs,
) -> str:
"""
Task run method.
Args:
- path (str, optional): The path to the local CSV file to be inserted.
- schema (str, optional): The destination schema.
- table (str, optional): The destination table.
- chunksize (int, optional): The chunk size to use. By default 5000.
- error_log_file_path (string, optional): Full path of an error file. Defaults to "log_file.log".
- on_error (Literal, optional): What to do if error occur. Defaults to None.
- credentials (dict, optional): The credentials to use for connecting with SQL Server.
- credentials_secret (str, optional): The name of the Key Vault secret containing database credentials.
(server, db_name, user, password)
- vault_name (str): The name of the vault from which to fetch the secret.
Returns:
str: The output of the bcp CLI command.
"""
if not credentials:
if not credentials_secret:
# attempt to read a default for the service principal secret name
try:
credentials_secret = PrefectSecret(
"AZURE_DEFAULT_SQLDB_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass
if credentials_secret:
credentials_str = AzureKeyVaultSecret(
credentials_secret, vault_name=vault_name
).run()
credentials = json.loads(credentials_str)
fqn = f"{schema}.{table}" if schema else table
server = credentials["server"]
db_name = credentials["db_name"]
uid = credentials["user"]
pwd = credentials["password"]
if "," in server:
# A space after the comma is allowed in the ODBC connection string
# but not in BCP's 'server' argument.
server = server.replace(" ", "")
if on_error == "skip":
max_error = 0
elif on_error == "fail":
max_error = 1
else:
raise ValueError(
"Please provide correct 'on_error' parameter value - 'skip' or 'fail'. "
)
command = f"/opt/mssql-tools/bin/bcp {fqn} in '{path}' -S {server} -d {db_name} -U {uid} -P '{pwd}' -c -F 2 -b {chunksize} -h 'TABLOCK' -e '{error_log_file_path}' -m {max_error}"
run_command = super().run(command=command, **kwargs)
try:
parse_logs(error_log_file_path)
except:
logger.warning("BCP logs couldn't be parsed.")
return run_command
viadot.tasks.great_expectations.RunGreatExpectationsValidation (RunGreatExpectationsValidation)
Task for running data validation with Great Expectations on a pandas DataFrame. See https://docs.prefect.io/api/latest/tasks/great_expectations.html#rungreatexpectationsvalidation for full documentation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
expectations_path |
str |
The path to the directory containing the expectation suites. |
required |
df |
pd.DataFrame |
The DataFrame to validate. |
required |
viadot.tasks.sqlite.SQLiteInsert (Task)
Task for inserting data from a pandas DataFrame into SQLite.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
db_path |
str |
The path to the database to be used. Defaults to None. |
required |
sql_path |
str |
The path to the text file containing the query. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
viadot.tasks.sqlite.SQLiteSQLtoDF (Task)
Task for downloading data from the SQLite to a pandas DataFrame.
SQLite will create a new database in the directory specified by the 'db_path' parameter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
db_path |
str |
The path to the database to be used. Defaults to None. |
required |
sql_path |
str |
The path to the text file containing the query. Defaults to None. |
required |
timeout(int, |
optional |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
__call__(self)
special
Generate a DataFrame from a SQLite SQL query
Source code in viadot/tasks/sqlite.py
def __call__(self):
"""Generate a DataFrame from a SQLite SQL query"""
viadot.tasks.supermetrics.SupermetricsToCSV (Task)
Task to downloading data from Supermetrics API to CSV file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The destination path. Defaults to "supermetrics_extract.csv". |
required |
max_retries |
int |
The maximum number of retries. Defaults to 5. |
required |
retry_delay |
timedelta |
The delay between task retries. Defaults to 10 seconds. |
required |
timeout |
int |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
max_rows |
int |
Maximum number of rows the query results should contain. Defaults to 1 000 000. |
required |
max_cols |
int |
Maximum number of columns the query results should contain. Defaults to None. |
required |
if_exists |
str |
What to do if file already exists. Defaults to "replace". |
required |
if_empty |
str |
What to do if query returns no data. Defaults to "warn". |
required |
sep |
str |
The separator in a target csv file. Defaults to "/t". |
required |
__call__(self)
special
Download Supermetrics data to a CSV
Source code in viadot/tasks/supermetrics.py
def __call__(self):
"""Download Supermetrics data to a CSV"""
super().__call__(self)
run(self, path=None, ds_id=None, ds_accounts=None, ds_segments=None, ds_user=None, fields=None, date_range_type=None, start_date=None, end_date=None, settings=None, filter=None, max_rows=None, max_columns=None, order_columns=None, if_exists=None, if_empty=None, max_retries=None, retry_delay=None, timeout=None, sep=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The destination path. Defaulrs to None |
None |
ds_id |
str |
A Supermetrics query parameter. |
None |
ds_accounts |
Union[str, List[str]] |
A Supermetrics query parameter. Defaults to None. |
None |
ds_segments |
List[str] |
A Supermetrics query parameter. Defaults to None. |
None |
ds_user |
str |
A Supermetrics query parameter. Defaults to None. |
None |
fields |
List[str] |
A Supermetrics query parameter. Defaults to None. |
None |
date_range_type |
str |
A Supermetrics query parameter. Defaults to None. |
None |
start_date |
str |
A Supermetrics query parameter. Defaults to None. |
None |
settings |
Dict[str, Any] |
A Supermetrics query parameter. Defaults to None. |
None |
filter |
str |
A Supermetrics query parameter. Defaults to None. |
None |
max_rows |
int |
A Supermetrics query parameter. Defaults to None. |
None |
max_columns |
int |
A Supermetrics query parameter. Defaults to None. |
None |
order_columns |
str |
A Supermetrics query parameter. Defaults to None. |
None |
if_exists |
str |
What to do if file already exists. Defaults to "replace". |
None |
if_empty |
str |
What to do if query returns no data. Defaults to "warn". |
None |
max_retries |
int |
The maximum number of retries. Defaults to 5. |
None |
retry_delay |
timedelta |
The delay between task retries. Defaults to 10 seconds. |
None |
timeout |
int |
Task timeout. Defaults to 30 minuntes. |
None |
Source code in viadot/tasks/supermetrics.py
@defaults_from_attrs(
"path",
"max_rows",
"if_exists",
"if_empty",
"max_retries",
"retry_delay",
"timeout",
"sep",
)
def run(
self,
path: str = None,
ds_id: str = None,
ds_accounts: Union[str, List[str]] = None,
ds_segments: List[str] = None,
ds_user: str = None,
fields: List[str] = None,
date_range_type: str = None,
start_date: str = None,
end_date: str = None,
settings: Dict[str, Any] = None,
filter: str = None,
max_rows: int = None,
max_columns: int = None,
order_columns: str = None,
if_exists: str = None,
if_empty: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
timeout: int = None,
sep: str = None,
):
"""
Task run method.
Args:
path (str, optional): The destination path. Defaulrs to None
ds_id (str, optional): A Supermetrics query parameter.
ds_accounts (Union[str, List[str]], optional): A Supermetrics query parameter. Defaults to None.
ds_segments (List[str], optional): A Supermetrics query parameter. Defaults to None.
ds_user (str, optional): A Supermetrics query parameter. Defaults to None.
fields (List[str], optional): A Supermetrics query parameter. Defaults to None.
date_range_type (str, optional): A Supermetrics query parameter. Defaults to None.
start_date (str, optional): A Supermetrics query parameter. Defaults to None.
end_date (str, optional) A Supermetrics query parameter. Defaults to None.
settings (Dict[str, Any], optional): A Supermetrics query parameter. Defaults to None.
filter (str, optional): A Supermetrics query parameter. Defaults to None.
max_rows (int, optional): A Supermetrics query parameter. Defaults to None.
max_columns (int, optional): A Supermetrics query parameter. Defaults to None.
order_columns (str, optional): A Supermetrics query parameter. Defaults to None.
if_exists (str, optional): What to do if file already exists. Defaults to "replace".
if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
max_retries (int, optional): The maximum number of retries. Defaults to 5.
retry_delay (timedelta, optional): The delay between task retries. Defaults to 10 seconds.
timeout (int, optional): Task timeout. Defaults to 30 minuntes.
sep (str, optional)
"""
if max_retries:
self.max_retries = max_retries
if retry_delay:
self.retry_delay = retry_delay
if isinstance(ds_accounts, str):
ds_accounts = [ds_accounts]
# Build the URL
# Note the task accepts only one account per query
query = dict(
ds_id=ds_id,
ds_accounts=ds_accounts,
ds_segments=ds_segments,
ds_user=ds_user,
fields=fields,
date_range_type=date_range_type,
start_date=start_date,
end_date=end_date,
settings=settings,
filter=filter,
max_rows=max_rows,
max_columns=max_columns,
order_columns=order_columns,
)
query = {param: val for param, val in query.items() if val is not None}
supermetrics = Supermetrics()
supermetrics.query(query)
# Download data to a local CSV file
self.logger.info(f"Downloading data to {path}...")
supermetrics.to_csv(path, if_exists=if_exists, if_empty=if_empty, sep=sep)
self.logger.info(f"Successfully downloaded data to {path}.")
viadot.tasks.supermetrics.SupermetricsToDF (Task)
Task for downloading data from the Supermetrics API to a pandas DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds_id |
str |
A Supermetrics query parameter. |
required |
ds_accounts |
Union[str, List[str]] |
A Supermetrics query parameter. Defaults to None. |
required |
ds_segments |
List[str] |
A Supermetrics query parameter. Defaults to None. |
required |
ds_user |
str |
A Supermetrics query parameter. Defaults to None. |
required |
fields |
List[str] |
A Supermetrics query parameter. Defaults to None. |
required |
date_range_type |
str |
A Supermetrics query parameter. Defaults to None. |
required |
settings |
Dict[str, Any] |
A Supermetrics query parameter. Defaults to None. |
required |
filter |
str |
A Supermetrics query parameter. Defaults to None. |
required |
max_rows |
int |
A Supermetrics query parameter. Defaults to None. |
required |
max_columns |
int |
A Supermetrics query parameter. Defaults to None. |
required |
order_columns |
str |
A Supermetrics query parameter. Defaults to None. |
required |
if_empty |
str |
What to do if query returns no data. Defaults to "warn". |
required |
max_retries |
int |
The maximum number of retries. Defaults to 5. |
required |
retry_delay |
timedelta |
The delay between task retries. Defaults to 10 seconds. |
required |
timeout |
int |
The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600. |
required |
run(self, ds_id=None, ds_accounts=None, ds_segments=None, ds_user=None, fields=None, date_range_type=None, start_date=None, end_date=None, settings=None, filter=None, max_rows=None, max_columns=None, order_columns=None, if_empty=None, max_retries=None, retry_delay=None, timeout=None)
Task run method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds_id |
str |
A Supermetrics query parameter. |
None |
ds_accounts |
Union[str, List[str]] |
A Supermetrics query parameter. Defaults to None. |
None |
ds_segments |
List[str] |
A Supermetrics query parameter. Defaults to None. |
None |
ds_user |
str |
A Supermetrics query parameter. Defaults to None. |
None |
fields |
List[str] |
A Supermetrics query parameter. Defaults to None. |
None |
date_range_type |
str |
A Supermetrics query parameter. Defaults to None. |
None |
start_date |
str |
A query paramter to pass start date to the date range filter. Defaults to None. |
None |
end_date |
str |
A query paramter to pass end date to the date range filter. Defaults to None. |
None |
settings |
Dict[str, Any] |
A Supermetrics query parameter. Defaults to None. |
None |
filter |
str |
A Supermetrics query parameter. Defaults to None. |
None |
max_rows |
int |
A Supermetrics query parameter. Defaults to None. |
None |
max_columns |
int |
A Supermetrics query parameter. Defaults to None. |
None |
order_columns |
str |
A Supermetrics query parameter. Defaults to None. |
None |
if_empty |
str |
What to do if query returns no data. Defaults to "warn". |
None |
max_retries |
int |
The maximum number of retries. Defaults to 5. |
None |
retry_delay |
timedelta |
The delay between task retries. Defaults to 10 seconds. |
None |
timeout |
int |
Task timeout. Defaults to 30 minuntes. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
The query result as a pandas DataFrame. |
Source code in viadot/tasks/supermetrics.py
@defaults_from_attrs(
"if_empty",
"max_rows",
"max_retries",
"retry_delay",
"timeout",
)
def run(
self,
ds_id: str = None,
ds_accounts: Union[str, List[str]] = None,
ds_segments: List[str] = None,
ds_user: str = None,
fields: List[str] = None,
date_range_type: str = None,
start_date: str = None,
end_date: str = None,
settings: Dict[str, Any] = None,
filter: str = None,
max_rows: int = None,
max_columns: int = None,
order_columns: str = None,
if_empty: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
timeout: int = None,
) -> pd.DataFrame:
"""
Task run method.
Args:
ds_id (str, optional): A Supermetrics query parameter.
ds_accounts (Union[str, List[str]], optional): A Supermetrics query parameter. Defaults to None.
ds_segments (List[str], optional): A Supermetrics query parameter. Defaults to None.
ds_user (str, optional): A Supermetrics query parameter. Defaults to None.
fields (List[str], optional): A Supermetrics query parameter. Defaults to None.
date_range_type (str, optional): A Supermetrics query parameter. Defaults to None.
start_date (str, optional): A query paramter to pass start date to the date range filter. Defaults to None.
end_date (str, optional): A query paramter to pass end date to the date range filter. Defaults to None.
settings (Dict[str, Any], optional): A Supermetrics query parameter. Defaults to None.
filter (str, optional): A Supermetrics query parameter. Defaults to None.
max_rows (int, optional): A Supermetrics query parameter. Defaults to None.
max_columns (int, optional): A Supermetrics query parameter. Defaults to None.
order_columns (str, optional): A Supermetrics query parameter. Defaults to None.
if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
max_retries (int, optional): The maximum number of retries. Defaults to 5.
retry_delay (timedelta, optional): The delay between task retries. Defaults to 10 seconds.
timeout (int, optional): Task timeout. Defaults to 30 minuntes.
Returns:
pd.DataFrame: The query result as a pandas DataFrame.
"""
if max_retries:
self.max_retries = max_retries
if retry_delay:
self.retry_delay = retry_delay
if isinstance(ds_accounts, str):
ds_accounts = [ds_accounts]
# Build the URL
# Note the task accepts only one account per query
query = dict(
ds_id=ds_id,
ds_accounts=ds_accounts,
ds_segments=ds_segments,
ds_user=ds_user,
fields=fields,
date_range_type=date_range_type,
start_date=start_date,
end_date=end_date,
settings=settings,
filter=filter,
max_rows=max_rows,
max_columns=max_columns,
order_columns=order_columns,
)
query = {param: val for param, val in query.items() if val is not None}
supermetrics = Supermetrics()
supermetrics.query(query)
# Download data to a local CSV file
self.logger.info(f"Downloading data to a DataFrame...")
df = supermetrics.to_df(if_empty=if_empty)
self.logger.info(f"Successfully downloaded data to a DataFrame.")
return df
viadot.task_utils.add_ingestion_metadata_task(df)
Add ingestion metadata columns, eg. data download date
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
input DataFrame. |
required |
Source code in viadot/task_utils.py
@task(timeout=3600)
def add_ingestion_metadata_task(
df: pd.DataFrame,
):
"""Add ingestion metadata columns, eg. data download date
Args:
df (pd.DataFrame): input DataFrame.
"""
# Don't skip when df has columns but has no data
if len(df.columns) == 0:
return df
else:
df2 = df.copy(deep=True)
df2["_viadot_downloaded_at_utc"] = datetime.now(timezone.utc).replace(
microsecond=0
)
return df2
viadot.task_utils.get_latest_timestamp_file_path(files)
Return the name of the latest file in a given data lake directory,
given a list of paths in that directory. Such list can be obtained using the
AzureDataLakeList
task. This task is useful for working with immutable data lakes as
the data is often written in the format /path/table_name/TIMESTAMP.parquet.
Source code in viadot/task_utils.py
@task(timeout=3600)
def get_latest_timestamp_file_path(files: List[str]) -> str:
"""
Return the name of the latest file in a given data lake directory,
given a list of paths in that directory. Such list can be obtained using the
`AzureDataLakeList` task. This task is useful for working with immutable data lakes as
the data is often written in the format /path/table_name/TIMESTAMP.parquet.
"""
logger = prefect.context.get("logger")
extract_fname = (
lambda f: os.path.basename(f).replace(".csv", "").replace(".parquet", "")
)
file_names = [extract_fname(file) for file in files]
latest_file_name = max(file_names, key=lambda d: datetime.fromisoformat(d))
latest_file = files[file_names.index(latest_file_name)]
logger.debug(f"Latest file: {latest_file}")
return latest_file
viadot.tasks.cloud_for_customers.C4CToDF (Task)
run(self, url=None, env='QA', endpoint=None, fields=None, params=None, chunksize=None, if_empty='warn', credentials_secret=None, vault_name=None)
Task for downloading data from the Cloud for Customers to a pandas DataFrame using normal URL (with query parameters). This task grab data from table from 'scratch' with passing table name in url or endpoint. It is rocommended to add some filters parameters in this case.
Examples:
url = "https://mysource.com/sap/c4c/odata/v1/c4codataapi" endpoint = "ServiceRequestCollection" params = {"$filter": "CreationDateTime ge 2021-12-21T00:00:00Z"}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The url to the API in case of prepared report. Defaults to None. |
None |
env |
str |
The environment to use. Defaults to 'QA'. |
'QA' |
endpoint |
str |
The endpoint of the API. Defaults to None. |
None |
fields |
List[str] |
The C4C Table fields. Defaults to None. |
None |
params |
Dict[str, str] |
Query parameters. Defaults to $format=json. |
None |
chunksize |
int |
How many rows to retrieve from C4C at a time. Uses a server-side cursor. |
None |
if_empty |
str |
What to do if query returns no data. Defaults to "warn". |
'warn' |
credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
The query result as a pandas DataFrame. |
Source code in viadot/tasks/cloud_for_customers.py
@defaults_from_attrs(
"url", "endpoint", "fields", "params", "chunksize", "env", "if_empty"
)
def run(
self,
url: str = None,
env: str = "QA",
endpoint: str = None,
fields: List[str] = None,
params: Dict[str, str] = None,
chunksize: int = None,
if_empty: str = "warn",
credentials_secret: str = None,
vault_name: str = None,
):
"""
Task for downloading data from the Cloud for Customers to a pandas DataFrame using normal URL (with query parameters).
This task grab data from table from 'scratch' with passing table name in url or endpoint. It is rocommended to add
some filters parameters in this case.
Example:
url = "https://mysource.com/sap/c4c/odata/v1/c4codataapi"
endpoint = "ServiceRequestCollection"
params = {"$filter": "CreationDateTime ge 2021-12-21T00:00:00Z"}
Args:
url (str, optional): The url to the API in case of prepared report. Defaults to None.
env (str, optional): The environment to use. Defaults to 'QA'.
endpoint (str, optional): The endpoint of the API. Defaults to None.
fields (List[str], optional): The C4C Table fields. Defaults to None.
params (Dict[str, str]): Query parameters. Defaults to $format=json.
chunksize (int, optional): How many rows to retrieve from C4C at a time. Uses a server-side cursor.
if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with C4C credentials. Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
Returns:
pd.DataFrame: The query result as a pandas DataFrame.
"""
if not credentials_secret:
try:
credentials_secret = PrefectSecret("C4C_KV").run()
except ValueError:
pass
if credentials_secret:
credentials_str = AzureKeyVaultSecret(
credentials_secret, vault_name=vault_name
).run()
credentials = json.loads(credentials_str)
else:
credentials = local_config.get("CLOUD_FOR_CUSTOMERS")[env]
self.logger.info(f"Downloading data from {url+endpoint}...")
# If we get any of these in params, we don't perform any chunking
if any(["$skip" in params, "$top" in params]):
return CloudForCustomers(
url=url,
endpoint=endpoint,
params=params,
env=env,
credentials=credentials,
).to_df(if_empty=if_empty, fields=fields)
def _generate_chunks() -> Generator[pd.DataFrame, None, None]:
"""
Util returning chunks as a generator to save memory.
"""
offset = 0
total_record_count = 0
while True:
boundaries = {"$skip": offset, "$top": chunksize}
params.update(boundaries)
chunk = CloudForCustomers(
url=url,
endpoint=endpoint,
params=params,
env=env,
credentials=credentials,
).to_df(if_empty=if_empty, fields=fields)
chunk_record_count = chunk.shape[0]
total_record_count += chunk_record_count
self.logger.info(
f"Successfully downloaded {total_record_count} records."
)
yield chunk
if chunk.shape[0] < chunksize:
break
offset += chunksize
self.logger.info(f"Data from {url+endpoint} has been downloaded successfully.")
chunks = _generate_chunks()
df = pd.concat(chunks)
return df
viadot.tasks.cloud_for_customers.C4CReportToDF (Task)
__call__(self, *args, **kwargs)
special
Download report to DF
Source code in viadot/tasks/cloud_for_customers.py
def __call__(self, *args, **kwargs):
"""Download report to DF"""
return super().__call__(*args, **kwargs)
run(self, report_url=None, env='QA', skip=0, top=1000, credentials_secret=None, vault_name=None, max_retries=3, retry_delay=datetime.timedelta(seconds=10))
Task for downloading data from the Cloud for Customers to a pandas DataFrame using report URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report_url |
str |
The URL to the report. Defaults to None. |
None |
env |
str |
The environment to use. Defaults to 'QA'. |
'QA' |
skip |
int |
Initial index value of reading row. Defaults to 0. |
0 |
top |
int |
The value of top reading row. Defaults to 1000. |
1000 |
credentials_secret |
str |
The name of the Azure Key Vault secret containing a dictionary |
None |
vault_name |
str |
The name of the vault from which to obtain the secret. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
The query result as a pandas DataFrame. |
Source code in viadot/tasks/cloud_for_customers.py
@defaults_from_attrs(
"report_url",
"env",
"skip",
"top",
)
def run(
self,
report_url: str = None,
env: str = "QA",
skip: int = 0,
top: int = 1000,
credentials_secret: str = None,
vault_name: str = None,
max_retries: int = 3,
retry_delay: timedelta = timedelta(seconds=10),
):
"""
Task for downloading data from the Cloud for Customers to a pandas DataFrame using report URL.
Args:
report_url (str, optional): The URL to the report. Defaults to None.
env (str, optional): The environment to use. Defaults to 'QA'.
skip (int, optional): Initial index value of reading row. Defaults to 0.
top (int, optional): The value of top reading row. Defaults to 1000.
credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
with C4C credentials (username & password). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
Returns:
pd.DataFrame: The query result as a pandas DataFrame.
"""
if not credentials_secret:
try:
credentials_secret = PrefectSecret("C4C_KV").run()
except ValueError:
pass
if credentials_secret:
credentials_str = AzureKeyVaultSecret(
credentials_secret, vault_name=vault_name
).run()
credentials = json.loads(credentials_str)
else:
credentials = local_config.get("CLOUD_FOR_CUSTOMERS")[env]
final_df = pd.DataFrame()
next_batch = True
while next_batch:
new_url = f"{report_url}&$top={top}&$skip={skip}"
chunk_from_url = CloudForCustomers(
report_url=new_url, env=env, credentials=credentials
)
df = chunk_from_url.to_df()
final_df = final_df.append(df)
if not final_df.empty:
df_count = df.count()[1]
if df_count != top:
next_batch = False
skip += top
else:
break
return final_df