Skip to content

Task library

viadot.tasks.open_apis.uk_carbon_intensity.StatsToCSV (Task)

A Prefect task for downloading UK Carbon Instensity Statistics (stats) to a csv file.

__call__(self) special

Run the task.

Parameters

path : str Path of the csv file created or edited by this task days_back : int, optional How many days of stats to download in the csv. UK Carbon Intensity statistics are available for up to 30 days, by default 10

Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __call__(self):
    """
    Run the task.

    Parameters
    ----------
    path : str
        Path of the csv file created or edited by this task
    days_back : int, optional
        How many days of stats to download in the csv.
        UK Carbon Intensity statistics are available for up to 30 days,
        by default 10
    """

__init__(self, *args, **kwargs) special

Generate the task.

Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __init__(self, *args, **kwargs):
    """Generate the task."""
    super().__init__(name="uk_carbon_intensity_stats_to_csv", *args, **kwargs)

run(self, path, days_back=10)

Run the task.

Parameters

path : str Path of the csv file created or edited by this task days_back : int, optional How many days of stats to download in the csv. UK Carbon Intensity statistics are available for up to 30 days, by default 10

Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def run(self, path: str, days_back: int = 10):
    """
    Run the task.

    Parameters
    ----------
    path : str
        Path of the csv file created or edited by this task
    days_back : int, optional
        How many days of stats to download in the csv.
        UK Carbon Intensity statistics are available for up to 30 days,
        by default 10
    """

    logger = prefect.context.get("logger")
    carbon = UKCarbonIntensity()
    now = datetime.datetime.now()
    logger.info(f"Downloading data to {path}...")
    for i in range(days_back):
        from_delta = datetime.timedelta(days=i + 1)
        to_delta = datetime.timedelta(days=i)
        to = now - to_delta
        from_ = now - from_delta
        carbon.query(f"/intensity/stats/{from_.isoformat()}/{to.isoformat()}")
        carbon.to_csv(path, if_exists="append")

    # Download data to a local CSV file
    logger.info(f"Successfully downloaded data to {path}.")

viadot.tasks.open_apis.uk_carbon_intensity.StatsToExcel (Task)

A Prefect task for downloading UK Carbon Instensity Statistics (stats) to a excel file.

__call__(self) special

Run the task.

Parameters

path : str Path of the csv file created or edited by this task days_back : int, optional How many days of stats to download in the excel. UK Carbon Intensity statistics are available for up to 30 days, by default 10

Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __call__(self):
    """
    Run the task.

    Parameters
    ----------
    path : str
        Path of the csv file created or edited by this task
    days_back : int, optional
        How many days of stats to download in the excel.
        UK Carbon Intensity statistics are available for up to 30 days,
        by default 10
    """

__init__(self, *args, **kwargs) special

Generate the task.

Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def __init__(self, *args, **kwargs):
    """Generate the task."""
    super().__init__(name="uk_carbon_intensity_stats_to_excel", *args, **kwargs)

run(self, path, days_back=10)

Run the task.

Parameters

path : str Path of the excel file created or edited by this task days_back : int, optional How many days of stats to download in the excel. UK Carbon Intensity statistics are available for up to 30 days, by default 10

Source code in viadot/tasks/open_apis/uk_carbon_intensity.py
def run(self, path: str, days_back: int = 10):
    """
    Run the task.

    Parameters
    ----------
    path : str
        Path of the excel file created or edited by this task
    days_back : int, optional
        How many days of stats to download in the excel.
        UK Carbon Intensity statistics are available for up to 30 days,
        by default 10
    """

    logger = prefect.context.get("logger")
    carbon = UKCarbonIntensity()
    now = datetime.datetime.now()
    logger.info(f"Downloading data to {path}...")
    for i in range(days_back):
        from_delta = datetime.timedelta(days=i + 1)
        to_delta = datetime.timedelta(days=i)
        to = now - to_delta
        from_ = now - from_delta
        carbon.query(f"/intensity/stats/{from_.isoformat()}/{to.isoformat()}")
        carbon.to_excel(path, if_exists="append")

    # Download data to a local excel file
    logger.info(f"Successfully downloaded data to {path}.")

viadot.tasks.azure_data_lake.AzureDataLakeDownload (Task)

Task for downloading data from the Azure Data lakes (gen1 and gen2).

Parameters:

Name Type Description Default
from_path str

The path from which to download the file(s). Defaults to None.

required
to_path str

The destination path. Defaults to None.

required
recursive bool

Set this to true if downloading entire directories.

required
gen int

The generation of the Azure Data Lake. Defaults to 2.

required
vault_name str

The name of the vault from which to fetch the secret. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
max_retries int

[description]. Defaults to 3.

required
retry_delay timedelta

[description]. Defaults to timedelta(seconds=10).

required

__call__(self, *args, **kwargs) special

Download file(s) from the Azure Data Lake

Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
    """Download file(s) from the Azure Data Lake"""
    return super().__call__(*args, **kwargs)

run(self, from_path=None, to_path=None, recursive=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
from_path str

The path from which to download the file(s).

None
to_path str

The destination path.

None
recursive bool

Set this to true if downloading entire directories.

None
gen int

The generation of the Azure Data Lake.

None
sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
    "from_path",
    "to_path",
    "recursive",
    "gen",
    "vault_name",
    "max_retries",
    "retry_delay",
)
def run(
    self,
    from_path: str = None,
    to_path: str = None,
    recursive: bool = None,
    gen: int = None,
    sp_credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> None:
    """Task run method.

    Args:
        from_path (str): The path from which to download the file(s).
        to_path (str): The destination path.
        recursive (bool): Set this to true if downloading entire directories.
        gen (int): The generation of the Azure Data Lake.
        sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """
    file_name = from_path.split("/")[-1]
    to_path = to_path or file_name

    if not sp_credentials_secret:
        # attempt to read a default for the service principal secret name
        try:
            sp_credentials_secret = PrefectSecret(
                "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
            ).run()
        except ValueError:
            pass

    if sp_credentials_secret:
        azure_secret_task = AzureKeyVaultSecret()
        credentials_str = azure_secret_task.run(
            secret=sp_credentials_secret, vault_name=vault_name
        )
        credentials = json.loads(credentials_str)
    else:
        credentials = {
            "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
            "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
            "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
            "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
        }
    lake = AzureDataLake(gen=gen, credentials=credentials)

    full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], from_path)
    self.logger.info(f"Downloading data from {full_dl_path} to {to_path}...")
    lake.download(from_path=from_path, to_path=to_path, recursive=recursive)
    self.logger.info(f"Successfully downloaded data to {to_path}.")

viadot.tasks.azure_data_lake.AzureDataLakeUpload (Task)

Upload file(s) to Azure Data Lake.

Parameters:

Name Type Description Default
from_path str

The local path from which to upload the file(s). Defaults to None.

required
to_path str

The destination path. Defaults to None.

required
recursive bool

Set this to true if uploading entire directories. Defaults to False.

required
overwrite bool

Whether to overwrite files in the lake. Defaults to False.

required
gen int

The generation of the Azure Data Lake. Defaults to 2.

required
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

__call__(self, *args, **kwargs) special

Upload file(s) to the Azure Data Lake

Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
    """Upload file(s) to the Azure Data Lake"""
    return super().__call__(*args, **kwargs)

run(self, from_path=None, to_path=None, recursive=None, overwrite=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
from_path str

The path from which to upload the file(s).

None
to_path str

The destination path.

None
recursive bool

Set to true if uploading entire directories.

None
overwrite bool

Whether to overwrite the file(s) if they exist.

None
gen int

The generation of the Azure Data Lake.

None
sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
    "from_path",
    "to_path",
    "recursive",
    "overwrite",
    "gen",
    "vault_name",
    "max_retries",
    "retry_delay",
)
def run(
    self,
    from_path: str = None,
    to_path: str = None,
    recursive: bool = None,
    overwrite: bool = None,
    gen: int = None,
    sp_credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> None:
    """Task run method.

    Args:
        from_path (str): The path from which to upload the file(s).
        to_path (str): The destination path.
        recursive (bool): Set to true if uploading entire directories.
        overwrite (bool): Whether to overwrite the file(s) if they exist.
        gen (int): The generation of the Azure Data Lake.
        sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """

    if not sp_credentials_secret:
        # attempt to read a default for the service principal secret name
        try:
            sp_credentials_secret = PrefectSecret(
                "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
            ).run()
        except ValueError:
            pass

    if sp_credentials_secret:
        azure_secret_task = AzureKeyVaultSecret()
        credentials_str = azure_secret_task.run(
            secret=sp_credentials_secret, vault_name=vault_name
        )
        credentials = json.loads(credentials_str)
    else:
        credentials = {
            "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
            "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
            "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
            "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
        }
    lake = AzureDataLake(gen=gen, credentials=credentials)

    full_to_path = os.path.join(credentials["ACCOUNT_NAME"], to_path)
    self.logger.info(f"Uploading data from {from_path} to {full_to_path}...")
    lake.upload(
        from_path=from_path,
        to_path=to_path,
        recursive=recursive,
        overwrite=overwrite,
    )
    self.logger.info(f"Successfully uploaded data to {full_to_path}.")

viadot.tasks.azure_data_lake.AzureDataLakeToDF (Task)

__call__(self, *args, **kwargs) special

Load file(s) from the Azure Data Lake to a pandas DataFrame.

Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
    """Load file(s) from the Azure Data Lake to a pandas DataFrame."""
    return super().__call__(*args, **kwargs)

__init__(self, path=None, sep='\t', quoting=0, lineterminator=None, error_bad_lines=None, gen=2, vault_name=None, timeout=3600, max_retries=3, retry_delay=datetime.timedelta(seconds=10), *args, **kwargs) special

Load file(s) from the Azure Data Lake to a pandas DataFrame. Currently supports CSV and parquet files.

Parameters:

Name Type Description Default
path str

The path from which to load the DataFrame. Defaults to None.

None
sep str

The separator to use when reading a CSV file. Defaults to " ".

'\t'
quoting int

The quoting mode to use when reading a CSV file. Defaults to 0.

0
lineterminator str

The newline separator to use when reading a CSV file. Defaults to None.

None
error_bad_lines bool

Whether to raise an exception on bad lines. Defaults to None.

None
gen int

The generation of the Azure Data Lake. Defaults to 2.

2
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
Source code in viadot/tasks/azure_data_lake.py
def __init__(
    self,
    path: str = None,
    sep: str = "\t",
    quoting: int = 0,
    lineterminator: str = None,
    error_bad_lines: bool = None,
    gen: int = 2,
    vault_name: str = None,
    timeout: int = 3600,
    max_retries: int = 3,
    retry_delay: timedelta = timedelta(seconds=10),
    *args,
    **kwargs,
):
    """Load file(s) from the Azure Data Lake to a pandas DataFrame.
    Currently supports CSV and parquet files.

    Args:
        path (str, optional): The path from which to load the DataFrame. Defaults to None.
        sep (str, optional): The separator to use when reading a CSV file. Defaults to "\t".
        quoting (int, optional): The quoting mode to use when reading a CSV file. Defaults to 0.
        lineterminator (str, optional): The newline separator to use when reading a CSV file. Defaults to None.
        error_bad_lines (bool, optional): Whether to raise an exception on bad lines. Defaults to None.
        gen (int, optional): The generation of the Azure Data Lake. Defaults to 2.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
        timeout(int, optional): The amount of time (in seconds) to wait while running this task before
            a timeout occurs. Defaults to 3600.
    """
    self.path = path
    self.sep = sep
    self.quoting = quoting
    self.lineterminator = lineterminator
    self.error_bad_lines = error_bad_lines

    self.gen = gen
    self.vault_name = vault_name
    super().__init__(
        name="adls_to_df",
        max_retries=max_retries,
        retry_delay=retry_delay,
        timeout=timeout,
        *args,
        **kwargs,
    )

run(self, path=None, sep=None, quoting=None, lineterminator=None, error_bad_lines=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
path str

The path to file(s) which should be loaded into a DataFrame.

None
sep str

The field separator to use when loading the file to the DataFrame.

None
quoting int

The quoting mode to use when reading a CSV file. Defaults to 0.

None
lineterminator str

The newline separator to use when reading a CSV file. Defaults to None.

None
error_bad_lines bool

Whether to raise an exception on bad lines. Defaults to None.

None
gen int

The generation of the Azure Data Lake.

None
sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
    "path",
    "sep",
    "quoting",
    "lineterminator",
    "error_bad_lines",
    "gen",
    "vault_name",
    "max_retries",
    "retry_delay",
)
def run(
    self,
    path: str = None,
    sep: str = None,
    quoting: int = None,
    lineterminator: str = None,
    error_bad_lines: bool = None,
    gen: int = None,
    sp_credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> pd.DataFrame:
    """Task run method.

    Args:
        path (str): The path to file(s) which should be loaded into a DataFrame.
        sep (str): The field separator to use when loading the file to the DataFrame.
        quoting (int, optional): The quoting mode to use when reading a CSV file. Defaults to 0.
        lineterminator (str, optional): The newline separator to use when reading a CSV file. Defaults to None.
        error_bad_lines (bool, optional): Whether to raise an exception on bad lines. Defaults to None.
        gen (int): The generation of the Azure Data Lake.
        sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """

    if quoting is None:
        quoting = 0

    if path is None:
        raise ValueError("Please provide the path to the file to be downloaded.")

    if not sp_credentials_secret:
        # attempt to read a default for the service principal secret name
        try:
            sp_credentials_secret = PrefectSecret(
                "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
            ).run()
        except ValueError:
            pass

    if sp_credentials_secret:
        azure_secret_task = AzureKeyVaultSecret()
        credentials_str = azure_secret_task.run(
            secret=sp_credentials_secret, vault_name=vault_name
        )
        credentials = json.loads(credentials_str)
    else:
        credentials = {
            "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
            "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
            "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
            "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
        }
    lake = AzureDataLake(gen=gen, credentials=credentials, path=path)

    full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path)
    self.logger.info(f"Downloading data from {full_dl_path} to a DataFrame...")
    df = lake.to_df(
        sep=sep,
        quoting=quoting,
        lineterminator=lineterminator,
        error_bad_lines=error_bad_lines,
    )
    self.logger.info(f"Successfully loaded data.")
    return df

viadot.tasks.azure_data_lake.AzureDataLakeCopy (Task)

Task for copying data between the Azure Data lakes files.

Parameters:

Name Type Description Default
from_path str

The path from which to copy the file(s). Defaults to None.

required
to_path str

The destination path. Defaults to None.

required
recursive bool

Set this to true if copy entire directories.

required
gen int

The generation of the Azure Data Lake. Defaults to 2.

required
vault_name str

The name of the vault from which to fetch the secret. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
max_retries int

[description]. Defaults to 3.

required
retry_delay timedelta

[description]. Defaults to timedelta(seconds=10).

required

__call__(self, *args, **kwargs) special

Copy file(s) from the Azure Data Lake

Source code in viadot/tasks/azure_data_lake.py
def __call__(self, *args, **kwargs):
    """Copy file(s) from the Azure Data Lake"""
    return super().__call__(*args, **kwargs)

run(self, from_path=None, to_path=None, recursive=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
from_path str

The path from which to copy the file(s).

None
to_path str

The destination path.

None
recursive bool

Set this to true if copying entire directories.

None
gen int

The generation of the Azure Data Lake.

None
sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
    "from_path",
    "to_path",
    "recursive",
    "gen",
    "vault_name",
    "max_retries",
    "retry_delay",
)
def run(
    self,
    from_path: str = None,
    to_path: str = None,
    recursive: bool = None,
    gen: int = None,
    sp_credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> None:
    """Task run method.

    Args:
        from_path (str): The path from which to copy the file(s).
        to_path (str): The destination path.
        recursive (bool): Set this to true if copying entire directories.
        gen (int): The generation of the Azure Data Lake.
        sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """
    file_name = from_path.split("/")[-1]
    to_path = to_path or file_name

    if not sp_credentials_secret:
        # attempt to read a default for the service principal secret name
        try:
            sp_credentials_secret = PrefectSecret(
                "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
            ).run()
        except ValueError:
            pass

    if sp_credentials_secret:
        azure_secret_task = AzureKeyVaultSecret()
        credentials_str = azure_secret_task.run(
            secret=sp_credentials_secret, vault_name=vault_name
        )
        credentials = json.loads(credentials_str)
    else:
        credentials = {
            "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
            "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
            "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
            "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
        }
    lake = AzureDataLake(gen=gen, credentials=credentials)

    full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], from_path)
    self.logger.info(f"Copying data from {full_dl_path} to {to_path}...")
    lake.cp(from_path=from_path, to_path=to_path, recursive=recursive)
    self.logger.info(f"Successfully copied data to {to_path}.")

viadot.tasks.azure_data_lake.AzureDataLakeList (Task)

Task for listing files in Azure Data Lake.

Parameters:

Name Type Description Default
path str

The path to the directory which contents you want to list. Defaults to None.

required
gen int

The generation of the Azure Data Lake. Defaults to 2.

required
vault_name str

The name of the vault from which to fetch the secret. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
max_retries int

[description]. Defaults to 3.

required
retry_delay timedelta

[description]. Defaults to timedelta(seconds=10).

required

Returns:

Type Description
List[str]

The list of paths to the contents of path. These paths do not include the container, eg. the path to the file located at "https://my_storage_acc.blob.core.windows.net/raw/supermetrics/test_file.txt" will be shown as "raw/supermetrics/test_file.txt".

run(self, path=None, recursive=False, file_to_match=None, gen=None, sp_credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
path str

The path to the directory which contents you want to list. Defaults to None.

None
recursive bool

If True, recursively list all subdirectories and files. Defaults to False.

False
file_to_match str

If exist it only returns files with that name. Defaults to None.

None
gen int

The generation of the Azure Data Lake. Defaults to None.

None
sp_credentials_secret str

The name of the Azure Key Vault secret containing a dictionary with

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None

Returns:

Type Description
List[str]

The list of paths to the contents of path. These paths do not include the container, eg. the path to the file located at "https://my_storage_acc.blob.core.windows.net/raw/supermetrics/test_file.txt" will be shown as "raw/supermetrics/test_file.txt".

Source code in viadot/tasks/azure_data_lake.py
@defaults_from_attrs(
    "path",
    "gen",
    "vault_name",
    "max_retries",
    "retry_delay",
)
def run(
    self,
    path: str = None,
    recursive: bool = False,
    file_to_match: str = None,
    gen: int = None,
    sp_credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> List[str]:
    """Task run method.

    Args:
        path (str, optional): The path to the directory which contents you want to list. Defaults to None.
        recursive (bool, optional): If True, recursively list all subdirectories and files. Defaults to False.
        file_to_match (str, optional): If exist it only returns files with that name. Defaults to None.
        gen (int): The generation of the Azure Data Lake. Defaults to None.
        sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
        ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.

    Returns:
        List[str]: The list of paths to the contents of `path`. These paths
        do not include the container, eg. the path to the file located at
        "https://my_storage_acc.blob.core.windows.net/raw/supermetrics/test_file.txt"
        will be shown as "raw/supermetrics/test_file.txt".
    """

    if not sp_credentials_secret:
        # attempt to read a default for the service principal secret name
        try:
            sp_credentials_secret = PrefectSecret(
                "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
            ).run()
        except ValueError:
            pass

    if sp_credentials_secret:
        azure_secret_task = AzureKeyVaultSecret()
        credentials_str = azure_secret_task.run(
            secret=sp_credentials_secret, vault_name=vault_name
        )
        credentials = json.loads(credentials_str)
    else:
        credentials = {
            "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
            "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
            "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
            "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
        }
    lake = AzureDataLake(gen=gen, credentials=credentials)

    full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path)

    self.logger.info(f"Listing files in {full_dl_path}.")
    if recursive:
        self.logger.info("Loading ADLS directories recursively.")
        files = lake.find(path)
        if file_to_match:
            conditions = [file_to_match in item for item in files]
            valid_files = np.array([])
            if any(conditions):
                index = np.where(conditions)[0]
                files = list(np.append(valid_files, [files[i] for i in index]))
            else:
                raise FileExistsError(
                    f"There are not any available file named {file_to_match}."
                )
    else:
        files = lake.ls(path)

    self.logger.info(f"Successfully listed files in {full_dl_path}.")
    return files

viadot.tasks.azure_key_vault.AzureKeyVaultSecret (SecretBase)

Task for retrieving secrets from an Azure Key Vault and returning it as a dictionary. Note that all initialization arguments can optionally be provided or overwritten at runtime.

For authentication, there are two options: you can set the AZURE_CREDENTIALS Prefect Secret containing your Azure Key Vault credentials which will be passed directly to SecretClient, or you can configure your flow's runtime environment for EnvironmentCredential.

Parameters:

Name Type Description Default
- secret (str

the name of the secret to retrieve

required
- vault_name (str

the name of the vault from which to fetch the secret

required
- secret_client_kwargs (dict

additional keyword arguments to forward to the SecretClient.

required
- **kwargs (dict

additional keyword arguments to pass to the Task constructor

required

run(self, secret=None, vault_name=None, credentials=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
- secret (str

the name of the secret to retrieve

required
- vault_name (str

the name of the vault from which to fetch the secret

required
- credentials (dict

your Azure Key Vault credentials passed from an upstream Secret task. By default, credentials are read from the AZURE_CREDENTIALS Prefect Secret; this Secret must be a JSON string with the subkey KEY_VAULT and then vault_name containing three keys: AZURE_TENANT_ID, AZURE_CLIENT_ID, and AZURE_CLIENT_SECRET, which will be passed directly to SecretClient. If not provided here or in context, the task will fall back to Azure credentials discovery using EnvironmentCredential(). Example AZURE_CREDENTIALS environment variable: export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'

required

Examples:

from prefect import Flow
from viadot.tasks import AzureKeyVaultSecret

azure_secret_task = AzureKeyVaultSecret()

with Flow(name="example") as f:
    secret = azure_secret_task(secret="test", vault_name="my_vault_name")
out = f.run()

Returns:

Type Description
- str

the contents of this secret, as a string

Source code in viadot/tasks/azure_key_vault.py
@defaults_from_attrs("secret", "vault_name")
def run(
    self,
    secret: str = None,
    vault_name: str = None,
    credentials: dict = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> str:
    """
    Task run method.

    Args:
        - secret (str): the name of the secret to retrieve
        - vault_name (str): the name of the vault from which to fetch the secret
        - credentials (dict, optional): your Azure Key Vault credentials passed from an upstream
            Secret task. By default, credentials are read from the `AZURE_CREDENTIALS`
            Prefect Secret; this Secret must be a JSON string
            with the subkey `KEY_VAULT` and then vault_name containing three keys:
            `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`, which will be
            passed directly to `SecretClient`.  If not provided here or in context, the task
            will fall back to Azure credentials discovery using `EnvironmentCredential()`.
            Example `AZURE_CREDENTIALS` environment variable:
            `export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'`

    Example:
        ```python
        from prefect import Flow
        from viadot.tasks import AzureKeyVaultSecret

        azure_secret_task = AzureKeyVaultSecret()

        with Flow(name="example") as f:
            secret = azure_secret_task(secret="test", vault_name="my_vault_name")
        out = f.run()
        ```
    Returns:
        - str: the contents of this secret, as a string
    """

    if secret is None:
        raise ValueError("A secret name must be provided.")

    key_vault = get_key_vault(
        vault_name=vault_name,
        credentials=credentials,
        secret_client_kwargs=self.secret_client_kwargs,
    )

    secret_string = key_vault.get_secret(secret).value

    return secret_string

viadot.tasks.azure_key_vault.CreateAzureKeyVaultSecret (SecretBase)

Task for creating secrets in an Azure Key Vault. Note that all initialization arguments can optionally be provided or overwritten at runtime.

For authentication, there are two options: you can set the AZURE_CREDENTIALS Prefect Secret containing your Azure Key Vault credentials which will be passed directly to SecretClient, or you can configure your flow's runtime environment for EnvironmentCredential.

Parameters:

Name Type Description Default
- secret (str

the name of the secret to retrieve

required
- vault_name (str

the name of the vault from which to fetch the secret

required
- secret_client_kwargs (dict

additional keyword arguments to forward to the SecretClient.

required
- **kwargs (dict

additional keyword arguments to pass to the Task constructor

required

run(self, secret=None, value=None, lifetime=None, vault_name=None, credentials=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
- secret (str

the name of the secret to set

required
- value (str

the value which the secret will hold

required
- lifetime (int

The number of days after which the secret should expire.

required
- vault_name (str

the name of the vault from which to fetch the secret

required
- credentials (dict

your Azure Key Vault credentials passed from an upstream Secret task; this Secret must be a JSON string with the subkey KEY_VAULT and then vault_name containing three keys: AZURE_TENANT_ID, AZURE_CLIENT_ID, and AZURE_CLIENT_SECRET, which will be passed directly to SecretClient. If not provided here or in context, the task will fall back to Azure credentials discovery using EnvironmentCredential(). Example AZURE_CREDENTIALS environment variable: export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'

required

Examples:

from prefect import Flow
from prefect.tasks.secrets import PrefectSecret
from viadot.tasks import CreateAzureKeyVaultSecret

create_secret_task = CreateAzureKeyVaultSecret()

with Flow(name="example") as f:
    azure_credentials = PrefectSecret("AZURE_CREDENTIALS")
    secret = create_secret_task(secret="test2", value=42, vault_name="my_vault_name", credentials=azure_credentials)
out = f.run()

Returns:

Type Description
- bool

Whether the secret was created successfully.

Source code in viadot/tasks/azure_key_vault.py
@defaults_from_attrs(
    "secret",
    "value",
    "lifetime",
    "vault_name",
)
def run(
    self,
    secret: str = None,
    value: str = None,
    lifetime: int = None,
    vault_name: str = None,
    credentials: dict = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> bool:
    """
    Task run method.

    Args:
        - secret (str): the name of the secret to set
        - value (str): the value which the secret will hold
        - lifetime (int): The number of days after which the secret should expire.
        - vault_name (str): the name of the vault from which to fetch the secret
        - credentials (dict, optional): your Azure Key Vault credentials passed from an upstream
            Secret task; this Secret must be a JSON string
            with the subkey `KEY_VAULT` and then vault_name containing three keys:
            `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`, which will be
            passed directly to `SecretClient`.  If not provided here or in context, the task
            will fall back to Azure credentials discovery using `EnvironmentCredential()`.
            Example `AZURE_CREDENTIALS` environment variable:
            `export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'`

    Example:
        ```python
        from prefect import Flow
        from prefect.tasks.secrets import PrefectSecret
        from viadot.tasks import CreateAzureKeyVaultSecret

        create_secret_task = CreateAzureKeyVaultSecret()

        with Flow(name="example") as f:
            azure_credentials = PrefectSecret("AZURE_CREDENTIALS")
            secret = create_secret_task(secret="test2", value=42, vault_name="my_vault_name", credentials=azure_credentials)
        out = f.run()
        ```
    Returns:
        - bool: Whether the secret was created successfully.
    """

    if secret is None:
        raise ValueError("A secret name must be provided.")

    key_vault = get_key_vault(
        vault_name=vault_name,
        credentials=credentials,
        secret_client_kwargs=self.secret_client_kwargs,
    )

    expires_on = pendulum.now("UTC").add(days=lifetime)
    secret_obj = key_vault.set_secret(secret, value, expires_on=expires_on)
    was_successful = secret_obj.name == secret

    return was_successful

viadot.tasks.azure_key_vault.DeleteAzureKeyVaultSecret (SecretBase)

Task for removing ("soft delete") a secret from an Azure Key Vault. Note that all initialization arguments can optionally be provided or overwritten at runtime.

For authentication, there are two options: you can set the AZURE_CREDENTIALS Prefect Secret containing your Azure Key Vault credentials which will be passed directly to SecretClient, or you can configure your flow's runtime environment for EnvironmentCredential.

Parameters:

Name Type Description Default
- secret (str

the name of the secret to retrieve

required
- vault_name (str

the name of the vault from which to fetch the secret

required
- secret_client_kwargs (dict

additional keyword arguments to forward to the SecretClient.

required
- **kwargs (dict

additional keyword arguments to pass to the Task constructor

required

run(self, secret=None, vault_name=None, credentials=None, max_retries=None, retry_delay=None)

Task run method.

Parameters:

Name Type Description Default
- secret (str

the name of the secret to delete

required
- vault_name (str

the name of the vault whethe the secret is located

required
- credentials (dict

your Azure Key Vault credentials passed from an upstream Secret task. By default, credentials are read from the AZURE_CREDENTIALS Prefect Secret; this Secret must be a JSON string with the subkey KEY_VAULT and then vault_name containing three keys: AZURE_TENANT_ID, AZURE_CLIENT_ID, and AZURE_CLIENT_SECRET, which will be passed directly to SecretClient. If not provided here or in context, the task will fall back to Azure credentials discovery using EnvironmentCredential(). Example AZURE_CREDENTIALS environment variable: export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'

required

Examples:

from prefect import Flow
from viadot.tasks import DeleteAzureKeyVaultSecret

azure_secret_task = DeleteAzureKeyVaultSecret()

with Flow(name="example") as f:
    secret = azure_secret_task(secret="test", vault_name="my_vault_name")
out = f.run()

Returns:

Type Description
- bool

Whether the secret was deleted successfully.

Source code in viadot/tasks/azure_key_vault.py
@defaults_from_attrs("secret", "vault_name")
def run(
    self,
    secret: str = None,
    vault_name: str = None,
    credentials: dict = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
) -> bool:
    """
    Task run method.

    Args:
        - secret (str): the name of the secret to delete
        - vault_name (str): the name of the vault whethe the secret is located
        - credentials (dict, optional): your Azure Key Vault credentials passed from an upstream
            Secret task. By default, credentials are read from the `AZURE_CREDENTIALS`
            Prefect Secret; this Secret must be a JSON string
            with the subkey `KEY_VAULT` and then vault_name containing three keys:
            `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`, which will be
            passed directly to `SecretClient`.  If not provided here or in context, the task
            will fall back to Azure credentials discovery using `EnvironmentCredential()`.
            Example `AZURE_CREDENTIALS` environment variable:
            `export AZURE_CREDENTIALS = '{"KEY_VAULT": {"test_key_vault": {"AZURE_TENANT_ID": "a", "AZURE_CLIENT_ID": "b", "AZURE_CLIENT_SECRET": "c"}}}'`

    Example:
        ```python
        from prefect import Flow
        from viadot.tasks import DeleteAzureKeyVaultSecret

        azure_secret_task = DeleteAzureKeyVaultSecret()

        with Flow(name="example") as f:
            secret = azure_secret_task(secret="test", vault_name="my_vault_name")
        out = f.run()
        ```
    Returns:
        - bool: Whether the secret was deleted successfully.
    """

    if secret is None:
        raise ValueError("A secret name must be provided.")

    key_vault = get_key_vault(
        vault_name=vault_name,
        credentials=credentials,
        secret_client_kwargs=self.secret_client_kwargs,
    )

    poller = key_vault.begin_delete_secret(secret)
    poller.wait(timeout=60 * 5)
    was_successful = poller.status() == "finished"

    return was_successful

viadot.tasks.azure_sql.AzureSQLBulkInsert (Task)

run(self, from_path=None, schema=None, table=None, dtypes=None, sep=None, if_exists=None, credentials_secret=None, vault_name=None)

Bulk insert data from Azure Data Lake into an Azure SQL Database table. This task also creates the table if it doesn't exist. Currently, only CSV files are supported.

from_path (str): Path to the file to be inserted. schema (str): Destination schema. table (str): Destination table. dtypes (Dict[str, Any]): Data types to force. sep (str): The separator to use to read the CSV file. if_exists (Literal, optional): What to do if the table already exists. credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with SQL db credentials (server, db_name, user, and password). vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.

Source code in viadot/tasks/azure_sql.py
@defaults_from_attrs("sep", "if_exists", "credentials_secret")
def run(
    self,
    from_path: str = None,
    schema: str = None,
    table: str = None,
    dtypes: Dict[str, Any] = None,
    sep: str = None,
    if_exists: Literal["fail", "replace", "append", "delete"] = None,
    credentials_secret: str = None,
    vault_name: str = None,
):
    """
    Bulk insert data from Azure Data Lake into an Azure SQL Database table.
    This task also creates the table if it doesn't exist.
    Currently, only CSV files are supported.

    Args:
    from_path (str): Path to the file to be inserted.
    schema (str): Destination schema.
    table (str): Destination table.
    dtypes (Dict[str, Any]): Data types to force.
    sep (str): The separator to use to read the CSV file.
    if_exists (Literal, optional): What to do if the table already exists.
    credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
    with SQL db credentials (server, db_name, user, and password).
    vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """

    fqn = f"{schema}.{table}" if schema else table
    credentials = get_credentials(credentials_secret, vault_name=vault_name)
    azure_sql = AzureSQL(credentials=credentials)

    if if_exists == "replace":
        azure_sql.create_table(
            schema=schema, table=table, dtypes=dtypes, if_exists=if_exists
        )

        self.logger.info(f"Successfully created table {fqn}.")

    azure_sql.bulk_insert(
        schema=schema,
        table=table,
        source_path=from_path,
        sep=sep,
        if_exists=if_exists,
    )
    self.logger.info(f"Successfully inserted data into {fqn}.")

viadot.tasks.azure_sql.AzureSQLCreateTable (Task)

run(self, schema=None, table=None, dtypes=None, if_exists=None, credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None)

Create a table in Azure SQL Database.

Parameters:

Name Type Description Default
schema str

Destination schema.

None
table str

Destination table.

None
dtypes Dict[str, Any]

Data types to force.

None
if_exists Literal

What to do if the table already exists.

None
credentials_secret str

The name of the Azure Key Vault secret containing a dictionary

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
Source code in viadot/tasks/azure_sql.py
@defaults_from_attrs("if_exists")
def run(
    self,
    schema: str = None,
    table: str = None,
    dtypes: Dict[str, Any] = None,
    if_exists: Literal["fail", "replace", "skip", "delete"] = None,
    credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
):
    """
    Create a table in Azure SQL Database.

    Args:
        schema (str, optional): Destination schema.
        table (str, optional): Destination table.
        dtypes (Dict[str, Any], optional): Data types to force.
        if_exists (Literal, optional): What to do if the table already exists.
        credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
        with SQL db credentials (server, db_name, user, and password).
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """

    credentials = get_credentials(credentials_secret, vault_name=vault_name)
    azure_sql = AzureSQL(credentials=credentials)

    fqn = f"{schema}.{table}" if schema is not None else table
    created = azure_sql.create_table(
        schema=schema, table=table, dtypes=dtypes, if_exists=if_exists
    )
    if created:
        self.logger.info(f"Successfully created table {fqn}.")
    else:
        self.logger.info(
            f"Table {fqn} has not been created as if_exists is set to {if_exists}."
        )

viadot.tasks.azure_sql.AzureSQLDBQuery (Task)

Task for running an Azure SQL Database query.

Parameters:

Name Type Description Default
query str, required

The query to execute on the database.

required
credentials_secret str

The name of the Azure Key Vault secret containing a dictionary

required
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

run(self, query, credentials_secret=None, vault_name=None)

Run an Azure SQL Database query

Parameters:

Name Type Description Default
query str, required

The query to execute on the database.

required
credentials_secret str

The name of the Azure Key Vault secret containing a dictionary

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None
Source code in viadot/tasks/azure_sql.py
def run(
    self,
    query: str,
    credentials_secret: str = None,
    vault_name: str = None,
):
    """Run an Azure SQL Database query

    Args:
        query (str, required): The query to execute on the database.
        credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
        with SQL db credentials (server, db_name, user, and password).
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
    """

    credentials = get_credentials(credentials_secret, vault_name=vault_name)
    azure_sql = AzureSQL(credentials=credentials)

    # run the query and fetch the results if it's a select
    result = azure_sql.run(query)

    self.logger.info(f"Successfully ran the query.")
    return result

viadot.tasks.bcp.BCPTask (ShellTask)

Task for bulk inserting data into SQL Server-compatible databases.

Parameters:

Name Type Description Default
- path (str

The path to the local CSV file to be inserted.

required
- schema (str

The destination schema.

required
- table (str

The destination table.

required
- chunksize (int

The chunk size to use.

required
- error_log_file_path (string

Full path of an error file. Defaults to "log_file.log".

required
- on_error (Literal["skip", "fail"]

What to do if error occurs. Defaults to "skip".

required
- credentials (dict

The credentials to use for connecting with the database.

required
- vault_name (str

The name of the vault from which to fetch the secret.

required
- timeout(int

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
- **kwargs (dict

Additional keyword arguments to pass to the Task constructor.

required

run(self, path=None, schema=None, table=None, chunksize=None, error_log_file_path=None, on_error=None, credentials=None, credentials_secret=None, vault_name=None, max_retries=None, retry_delay=None, **kwargs)

Task run method.

  • path (str, optional): The path to the local CSV file to be inserted.
  • schema (str, optional): The destination schema.
  • table (str, optional): The destination table.
  • chunksize (int, optional): The chunk size to use. By default 5000.
  • error_log_file_path (string, optional): Full path of an error file. Defaults to "log_file.log".
  • on_error (Literal, optional): What to do if error occur. Defaults to None.
  • credentials (dict, optional): The credentials to use for connecting with SQL Server.
  • credentials_secret (str, optional): The name of the Key Vault secret containing database credentials. (server, db_name, user, password)
  • vault_name (str): The name of the vault from which to fetch the secret.

Returns:

Type Description
str

The output of the bcp CLI command.

Source code in viadot/tasks/bcp.py
@defaults_from_attrs(
    "path",
    "schema",
    "table",
    "chunksize",
    "error_log_file_path",
    "on_error",
    "credentials",
    "vault_name",
    "max_retries",
    "retry_delay",
)
def run(
    self,
    path: str = None,
    schema: str = None,
    table: str = None,
    chunksize: int = None,
    error_log_file_path: str = None,
    on_error: Literal = None,
    credentials: dict = None,
    credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
    **kwargs,
) -> str:
    """
    Task run method.
    Args:
    - path (str, optional): The path to the local CSV file to be inserted.
    - schema (str, optional): The destination schema.
    - table (str, optional): The destination table.
    - chunksize (int, optional): The chunk size to use. By default 5000.
    - error_log_file_path (string, optional): Full path of an error file. Defaults to "log_file.log".
    - on_error (Literal, optional): What to do if error occur. Defaults to None.
    - credentials (dict, optional): The credentials to use for connecting with SQL Server.
    - credentials_secret (str, optional): The name of the Key Vault secret containing database credentials.
    (server, db_name, user, password)
    - vault_name (str): The name of the vault from which to fetch the secret.
    Returns:
        str: The output of the bcp CLI command.
    """
    if not credentials:
        if not credentials_secret:
            # attempt to read a default for the service principal secret name
            try:
                credentials_secret = PrefectSecret(
                    "AZURE_DEFAULT_SQLDB_SERVICE_PRINCIPAL_SECRET"
                ).run()
            except ValueError:
                pass

        if credentials_secret:
            credentials_str = AzureKeyVaultSecret(
                credentials_secret, vault_name=vault_name
            ).run()
            credentials = json.loads(credentials_str)

    fqn = f"{schema}.{table}" if schema else table

    server = credentials["server"]
    db_name = credentials["db_name"]
    uid = credentials["user"]
    pwd = credentials["password"]

    if "," in server:
        # A space after the comma is allowed in the ODBC connection string
        # but not in BCP's 'server' argument.
        server = server.replace(" ", "")

    if on_error == "skip":
        max_error = 0
    elif on_error == "fail":
        max_error = 1
    else:
        raise ValueError(
            "Please provide correct 'on_error' parameter value - 'skip' or 'fail'. "
        )
    command = f"/opt/mssql-tools/bin/bcp {fqn} in '{path}' -S {server} -d {db_name} -U {uid} -P '{pwd}' -c -F 2 -b {chunksize} -h 'TABLOCK' -e '{error_log_file_path}' -m {max_error}"
    run_command = super().run(command=command, **kwargs)
    try:
        parse_logs(error_log_file_path)
    except:
        logger.warning("BCP logs couldn't be parsed.")
    return run_command

viadot.tasks.great_expectations.RunGreatExpectationsValidation (RunGreatExpectationsValidation)

Task for running data validation with Great Expectations on a pandas DataFrame. See https://docs.prefect.io/api/latest/tasks/great_expectations.html#rungreatexpectationsvalidation for full documentation.

Parameters:

Name Type Description Default
expectations_path str

The path to the directory containing the expectation suites.

required
df pd.DataFrame

The DataFrame to validate.

required

viadot.tasks.sqlite.SQLiteInsert (Task)

Task for inserting data from a pandas DataFrame into SQLite.

Parameters:

Name Type Description Default
db_path str

The path to the database to be used. Defaults to None.

required
sql_path str

The path to the text file containing the query. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

viadot.tasks.sqlite.SQLiteSQLtoDF (Task)

Task for downloading data from the SQLite to a pandas DataFrame.

SQLite will create a new database in the directory specified by the 'db_path' parameter.

Parameters:

Name Type Description Default
db_path str

The path to the database to be used. Defaults to None.

required
sql_path str

The path to the text file containing the query. Defaults to None.

required
timeout(int, optional

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

__call__(self) special

Generate a DataFrame from a SQLite SQL query

Source code in viadot/tasks/sqlite.py
def __call__(self):
    """Generate a DataFrame from a SQLite SQL query"""

viadot.tasks.supermetrics.SupermetricsToCSV (Task)

Task to downloading data from Supermetrics API to CSV file.

Parameters:

Name Type Description Default
path str

The destination path. Defaults to "supermetrics_extract.csv".

required
max_retries int

The maximum number of retries. Defaults to 5.

required
retry_delay timedelta

The delay between task retries. Defaults to 10 seconds.

required
timeout int

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required
max_rows int

Maximum number of rows the query results should contain. Defaults to 1 000 000.

required
max_cols int

Maximum number of columns the query results should contain. Defaults to None.

required
if_exists str

What to do if file already exists. Defaults to "replace".

required
if_empty str

What to do if query returns no data. Defaults to "warn".

required
sep str

The separator in a target csv file. Defaults to "/t".

required

__call__(self) special

Download Supermetrics data to a CSV

Source code in viadot/tasks/supermetrics.py
def __call__(self):
    """Download Supermetrics data to a CSV"""
    super().__call__(self)

run(self, path=None, ds_id=None, ds_accounts=None, ds_segments=None, ds_user=None, fields=None, date_range_type=None, start_date=None, end_date=None, settings=None, filter=None, max_rows=None, max_columns=None, order_columns=None, if_exists=None, if_empty=None, max_retries=None, retry_delay=None, timeout=None, sep=None)

Task run method.

Parameters:

Name Type Description Default
path str

The destination path. Defaulrs to None

None
ds_id str

A Supermetrics query parameter.

None
ds_accounts Union[str, List[str]]

A Supermetrics query parameter. Defaults to None.

None
ds_segments List[str]

A Supermetrics query parameter. Defaults to None.

None
ds_user str

A Supermetrics query parameter. Defaults to None.

None
fields List[str]

A Supermetrics query parameter. Defaults to None.

None
date_range_type str

A Supermetrics query parameter. Defaults to None.

None
start_date str

A Supermetrics query parameter. Defaults to None.

None
settings Dict[str, Any]

A Supermetrics query parameter. Defaults to None.

None
filter str

A Supermetrics query parameter. Defaults to None.

None
max_rows int

A Supermetrics query parameter. Defaults to None.

None
max_columns int

A Supermetrics query parameter. Defaults to None.

None
order_columns str

A Supermetrics query parameter. Defaults to None.

None
if_exists str

What to do if file already exists. Defaults to "replace".

None
if_empty str

What to do if query returns no data. Defaults to "warn".

None
max_retries int

The maximum number of retries. Defaults to 5.

None
retry_delay timedelta

The delay between task retries. Defaults to 10 seconds.

None
timeout int

Task timeout. Defaults to 30 minuntes.

None
Source code in viadot/tasks/supermetrics.py
@defaults_from_attrs(
    "path",
    "max_rows",
    "if_exists",
    "if_empty",
    "max_retries",
    "retry_delay",
    "timeout",
    "sep",
)
def run(
    self,
    path: str = None,
    ds_id: str = None,
    ds_accounts: Union[str, List[str]] = None,
    ds_segments: List[str] = None,
    ds_user: str = None,
    fields: List[str] = None,
    date_range_type: str = None,
    start_date: str = None,
    end_date: str = None,
    settings: Dict[str, Any] = None,
    filter: str = None,
    max_rows: int = None,
    max_columns: int = None,
    order_columns: str = None,
    if_exists: str = None,
    if_empty: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
    timeout: int = None,
    sep: str = None,
):

    """
    Task run method.

    Args:
        path (str, optional): The destination path. Defaulrs to None
        ds_id (str, optional): A Supermetrics query parameter.
        ds_accounts (Union[str, List[str]], optional): A Supermetrics query parameter. Defaults to None.
        ds_segments (List[str], optional): A Supermetrics query parameter. Defaults to None.
        ds_user (str, optional): A Supermetrics query parameter. Defaults to None.
        fields (List[str], optional): A Supermetrics query parameter. Defaults to None.
        date_range_type (str, optional): A Supermetrics query parameter. Defaults to None.
        start_date (str, optional): A Supermetrics query parameter. Defaults to None.
        end_date (str, optional) A Supermetrics query parameter. Defaults to None.
        settings (Dict[str, Any], optional): A Supermetrics query parameter. Defaults to None.
        filter (str, optional): A Supermetrics query parameter. Defaults to None.
        max_rows (int, optional): A Supermetrics query parameter. Defaults to None.
        max_columns (int, optional): A Supermetrics query parameter. Defaults to None.
        order_columns (str, optional): A Supermetrics query parameter. Defaults to None.
        if_exists (str, optional): What to do if file already exists. Defaults to "replace".
        if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
        max_retries (int, optional): The maximum number of retries. Defaults to 5.
        retry_delay (timedelta, optional): The delay between task retries. Defaults to 10 seconds.
        timeout (int, optional): Task timeout. Defaults to 30 minuntes.
        sep (str, optional)

    """

    if max_retries:
        self.max_retries = max_retries

    if retry_delay:
        self.retry_delay = retry_delay

    if isinstance(ds_accounts, str):
        ds_accounts = [ds_accounts]

    # Build the URL
    # Note the task accepts only one account per query
    query = dict(
        ds_id=ds_id,
        ds_accounts=ds_accounts,
        ds_segments=ds_segments,
        ds_user=ds_user,
        fields=fields,
        date_range_type=date_range_type,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
        filter=filter,
        max_rows=max_rows,
        max_columns=max_columns,
        order_columns=order_columns,
    )
    query = {param: val for param, val in query.items() if val is not None}
    supermetrics = Supermetrics()
    supermetrics.query(query)

    # Download data to a local CSV file
    self.logger.info(f"Downloading data to {path}...")
    supermetrics.to_csv(path, if_exists=if_exists, if_empty=if_empty, sep=sep)
    self.logger.info(f"Successfully downloaded data to {path}.")

viadot.tasks.supermetrics.SupermetricsToDF (Task)

Task for downloading data from the Supermetrics API to a pandas DataFrame.

Parameters:

Name Type Description Default
ds_id str

A Supermetrics query parameter.

required
ds_accounts Union[str, List[str]]

A Supermetrics query parameter. Defaults to None.

required
ds_segments List[str]

A Supermetrics query parameter. Defaults to None.

required
ds_user str

A Supermetrics query parameter. Defaults to None.

required
fields List[str]

A Supermetrics query parameter. Defaults to None.

required
date_range_type str

A Supermetrics query parameter. Defaults to None.

required
settings Dict[str, Any]

A Supermetrics query parameter. Defaults to None.

required
filter str

A Supermetrics query parameter. Defaults to None.

required
max_rows int

A Supermetrics query parameter. Defaults to None.

required
max_columns int

A Supermetrics query parameter. Defaults to None.

required
order_columns str

A Supermetrics query parameter. Defaults to None.

required
if_empty str

What to do if query returns no data. Defaults to "warn".

required
max_retries int

The maximum number of retries. Defaults to 5.

required
retry_delay timedelta

The delay between task retries. Defaults to 10 seconds.

required
timeout int

The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 3600.

required

run(self, ds_id=None, ds_accounts=None, ds_segments=None, ds_user=None, fields=None, date_range_type=None, start_date=None, end_date=None, settings=None, filter=None, max_rows=None, max_columns=None, order_columns=None, if_empty=None, max_retries=None, retry_delay=None, timeout=None)

Task run method.

Parameters:

Name Type Description Default
ds_id str

A Supermetrics query parameter.

None
ds_accounts Union[str, List[str]]

A Supermetrics query parameter. Defaults to None.

None
ds_segments List[str]

A Supermetrics query parameter. Defaults to None.

None
ds_user str

A Supermetrics query parameter. Defaults to None.

None
fields List[str]

A Supermetrics query parameter. Defaults to None.

None
date_range_type str

A Supermetrics query parameter. Defaults to None.

None
start_date str

A query paramter to pass start date to the date range filter. Defaults to None.

None
end_date str

A query paramter to pass end date to the date range filter. Defaults to None.

None
settings Dict[str, Any]

A Supermetrics query parameter. Defaults to None.

None
filter str

A Supermetrics query parameter. Defaults to None.

None
max_rows int

A Supermetrics query parameter. Defaults to None.

None
max_columns int

A Supermetrics query parameter. Defaults to None.

None
order_columns str

A Supermetrics query parameter. Defaults to None.

None
if_empty str

What to do if query returns no data. Defaults to "warn".

None
max_retries int

The maximum number of retries. Defaults to 5.

None
retry_delay timedelta

The delay between task retries. Defaults to 10 seconds.

None
timeout int

Task timeout. Defaults to 30 minuntes.

None

Returns:

Type Description
pd.DataFrame

The query result as a pandas DataFrame.

Source code in viadot/tasks/supermetrics.py
@defaults_from_attrs(
    "if_empty",
    "max_rows",
    "max_retries",
    "retry_delay",
    "timeout",
)
def run(
    self,
    ds_id: str = None,
    ds_accounts: Union[str, List[str]] = None,
    ds_segments: List[str] = None,
    ds_user: str = None,
    fields: List[str] = None,
    date_range_type: str = None,
    start_date: str = None,
    end_date: str = None,
    settings: Dict[str, Any] = None,
    filter: str = None,
    max_rows: int = None,
    max_columns: int = None,
    order_columns: str = None,
    if_empty: str = None,
    max_retries: int = None,
    retry_delay: timedelta = None,
    timeout: int = None,
) -> pd.DataFrame:
    """
    Task run method.

    Args:
        ds_id (str, optional): A Supermetrics query parameter.
        ds_accounts (Union[str, List[str]], optional): A Supermetrics query parameter. Defaults to None.
        ds_segments (List[str], optional): A Supermetrics query parameter. Defaults to None.
        ds_user (str, optional): A Supermetrics query parameter. Defaults to None.
        fields (List[str], optional): A Supermetrics query parameter. Defaults to None.
        date_range_type (str, optional): A Supermetrics query parameter. Defaults to None.
        start_date (str, optional): A query paramter to pass start date to the date range filter. Defaults to None.
        end_date (str, optional): A query paramter to pass end date to the date range filter. Defaults to None.
        settings (Dict[str, Any], optional): A Supermetrics query parameter. Defaults to None.
        filter (str, optional): A Supermetrics query parameter. Defaults to None.
        max_rows (int, optional): A Supermetrics query parameter. Defaults to None.
        max_columns (int, optional): A Supermetrics query parameter. Defaults to None.
        order_columns (str, optional): A Supermetrics query parameter. Defaults to None.
        if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
        max_retries (int, optional): The maximum number of retries. Defaults to 5.
        retry_delay (timedelta, optional): The delay between task retries. Defaults to 10 seconds.
        timeout (int, optional): Task timeout. Defaults to 30 minuntes.

    Returns:
        pd.DataFrame: The query result as a pandas DataFrame.
    """

    if max_retries:
        self.max_retries = max_retries

    if retry_delay:
        self.retry_delay = retry_delay

    if isinstance(ds_accounts, str):
        ds_accounts = [ds_accounts]

    # Build the URL
    # Note the task accepts only one account per query
    query = dict(
        ds_id=ds_id,
        ds_accounts=ds_accounts,
        ds_segments=ds_segments,
        ds_user=ds_user,
        fields=fields,
        date_range_type=date_range_type,
        start_date=start_date,
        end_date=end_date,
        settings=settings,
        filter=filter,
        max_rows=max_rows,
        max_columns=max_columns,
        order_columns=order_columns,
    )
    query = {param: val for param, val in query.items() if val is not None}
    supermetrics = Supermetrics()
    supermetrics.query(query)

    # Download data to a local CSV file
    self.logger.info(f"Downloading data to a DataFrame...")
    df = supermetrics.to_df(if_empty=if_empty)
    self.logger.info(f"Successfully downloaded data to a DataFrame.")
    return df

viadot.task_utils.add_ingestion_metadata_task(df)

Add ingestion metadata columns, eg. data download date

Parameters:

Name Type Description Default
df pd.DataFrame

input DataFrame.

required
Source code in viadot/task_utils.py
@task(timeout=3600)
def add_ingestion_metadata_task(
    df: pd.DataFrame,
):
    """Add ingestion metadata columns, eg. data download date

    Args:
        df (pd.DataFrame): input DataFrame.
    """
    # Don't skip when df has columns but has no data
    if len(df.columns) == 0:
        return df
    else:
        df2 = df.copy(deep=True)
        df2["_viadot_downloaded_at_utc"] = datetime.now(timezone.utc).replace(
            microsecond=0
        )
        return df2

viadot.task_utils.get_latest_timestamp_file_path(files)

Return the name of the latest file in a given data lake directory, given a list of paths in that directory. Such list can be obtained using the AzureDataLakeList task. This task is useful for working with immutable data lakes as the data is often written in the format /path/table_name/TIMESTAMP.parquet.

Source code in viadot/task_utils.py
@task(timeout=3600)
def get_latest_timestamp_file_path(files: List[str]) -> str:
    """
    Return the name of the latest file in a given data lake directory,
    given a list of paths in that directory. Such list can be obtained using the
    `AzureDataLakeList` task. This task is useful for working with immutable data lakes as
    the data is often written in the format /path/table_name/TIMESTAMP.parquet.
    """

    logger = prefect.context.get("logger")

    extract_fname = (
        lambda f: os.path.basename(f).replace(".csv", "").replace(".parquet", "")
    )
    file_names = [extract_fname(file) for file in files]
    latest_file_name = max(file_names, key=lambda d: datetime.fromisoformat(d))
    latest_file = files[file_names.index(latest_file_name)]

    logger.debug(f"Latest file: {latest_file}")

    return latest_file

viadot.tasks.cloud_for_customers.C4CToDF (Task)

run(self, url=None, env='QA', endpoint=None, fields=None, params=None, chunksize=None, if_empty='warn', credentials_secret=None, vault_name=None)

Task for downloading data from the Cloud for Customers to a pandas DataFrame using normal URL (with query parameters). This task grab data from table from 'scratch' with passing table name in url or endpoint. It is rocommended to add some filters parameters in this case.

Examples:

url = "https://mysource.com/sap/c4c/odata/v1/c4codataapi" endpoint = "ServiceRequestCollection" params = {"$filter": "CreationDateTime ge 2021-12-21T00:00:00Z"}

Parameters:

Name Type Description Default
url str

The url to the API in case of prepared report. Defaults to None.

None
env str

The environment to use. Defaults to 'QA'.

'QA'
endpoint str

The endpoint of the API. Defaults to None.

None
fields List[str]

The C4C Table fields. Defaults to None.

None
params Dict[str, str]

Query parameters. Defaults to $format=json.

None
chunksize int

How many rows to retrieve from C4C at a time. Uses a server-side cursor.

None
if_empty str

What to do if query returns no data. Defaults to "warn".

'warn'
credentials_secret str

The name of the Azure Key Vault secret containing a dictionary

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None

Returns:

Type Description
pd.DataFrame

The query result as a pandas DataFrame.

Source code in viadot/tasks/cloud_for_customers.py
@defaults_from_attrs(
    "url", "endpoint", "fields", "params", "chunksize", "env", "if_empty"
)
def run(
    self,
    url: str = None,
    env: str = "QA",
    endpoint: str = None,
    fields: List[str] = None,
    params: Dict[str, str] = None,
    chunksize: int = None,
    if_empty: str = "warn",
    credentials_secret: str = None,
    vault_name: str = None,
):
    """
    Task for downloading data from the Cloud for Customers to a pandas DataFrame using normal URL (with query parameters).
    This task grab data from table from 'scratch' with passing table name in url or endpoint. It is rocommended to add
    some filters parameters in this case.

    Example:
        url = "https://mysource.com/sap/c4c/odata/v1/c4codataapi"
        endpoint = "ServiceRequestCollection"
        params = {"$filter": "CreationDateTime ge 2021-12-21T00:00:00Z"}

    Args:
        url (str, optional): The url to the API in case of prepared report. Defaults to None.
        env (str, optional): The environment to use. Defaults to 'QA'.
        endpoint (str, optional): The endpoint of the API. Defaults to None.
        fields (List[str], optional): The C4C Table fields. Defaults to None.
        params (Dict[str, str]): Query parameters. Defaults to $format=json.
        chunksize (int, optional): How many rows to retrieve from C4C at a time. Uses a server-side cursor.
        if_empty (str, optional): What to do if query returns no data. Defaults to "warn".
        credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
        with C4C credentials. Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.

    Returns:
        pd.DataFrame: The query result as a pandas DataFrame.
    """
    if not credentials_secret:
        try:
            credentials_secret = PrefectSecret("C4C_KV").run()
        except ValueError:
            pass

    if credentials_secret:
        credentials_str = AzureKeyVaultSecret(
            credentials_secret, vault_name=vault_name
        ).run()
        credentials = json.loads(credentials_str)
    else:
        credentials = local_config.get("CLOUD_FOR_CUSTOMERS")[env]

    self.logger.info(f"Downloading data from {url+endpoint}...")

    # If we get any of these in params, we don't perform any chunking
    if any(["$skip" in params, "$top" in params]):
        return CloudForCustomers(
            url=url,
            endpoint=endpoint,
            params=params,
            env=env,
            credentials=credentials,
        ).to_df(if_empty=if_empty, fields=fields)

    def _generate_chunks() -> Generator[pd.DataFrame, None, None]:
        """
        Util returning chunks as a generator to save memory.
        """
        offset = 0
        total_record_count = 0
        while True:
            boundaries = {"$skip": offset, "$top": chunksize}
            params.update(boundaries)

            chunk = CloudForCustomers(
                url=url,
                endpoint=endpoint,
                params=params,
                env=env,
                credentials=credentials,
            ).to_df(if_empty=if_empty, fields=fields)

            chunk_record_count = chunk.shape[0]
            total_record_count += chunk_record_count
            self.logger.info(
                f"Successfully downloaded {total_record_count} records."
            )

            yield chunk

            if chunk.shape[0] < chunksize:
                break

            offset += chunksize

    self.logger.info(f"Data from {url+endpoint} has been downloaded successfully.")

    chunks = _generate_chunks()
    df = pd.concat(chunks)

    return df

viadot.tasks.cloud_for_customers.C4CReportToDF (Task)

__call__(self, *args, **kwargs) special

Download report to DF

Source code in viadot/tasks/cloud_for_customers.py
def __call__(self, *args, **kwargs):
    """Download report to DF"""
    return super().__call__(*args, **kwargs)

run(self, report_url=None, env='QA', skip=0, top=1000, credentials_secret=None, vault_name=None, max_retries=3, retry_delay=datetime.timedelta(seconds=10))

Task for downloading data from the Cloud for Customers to a pandas DataFrame using report URL.

Parameters:

Name Type Description Default
report_url str

The URL to the report. Defaults to None.

None
env str

The environment to use. Defaults to 'QA'.

'QA'
skip int

Initial index value of reading row. Defaults to 0.

0
top int

The value of top reading row. Defaults to 1000.

1000
credentials_secret str

The name of the Azure Key Vault secret containing a dictionary

None
vault_name str

The name of the vault from which to obtain the secret. Defaults to None.

None

Returns:

Type Description
pd.DataFrame

The query result as a pandas DataFrame.

Source code in viadot/tasks/cloud_for_customers.py
@defaults_from_attrs(
    "report_url",
    "env",
    "skip",
    "top",
)
def run(
    self,
    report_url: str = None,
    env: str = "QA",
    skip: int = 0,
    top: int = 1000,
    credentials_secret: str = None,
    vault_name: str = None,
    max_retries: int = 3,
    retry_delay: timedelta = timedelta(seconds=10),
):
    """
    Task for downloading data from the Cloud for Customers to a pandas DataFrame using report URL.

    Args:
        report_url (str, optional): The URL to the report. Defaults to None.
        env (str, optional): The environment to use. Defaults to 'QA'.
        skip (int, optional): Initial index value of reading row. Defaults to 0.
        top (int, optional): The value of top reading row. Defaults to 1000.
        credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary
        with C4C credentials (username & password). Defaults to None.
        vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.

    Returns:
        pd.DataFrame: The query result as a pandas DataFrame.
    """

    if not credentials_secret:
        try:
            credentials_secret = PrefectSecret("C4C_KV").run()
        except ValueError:
            pass

    if credentials_secret:
        credentials_str = AzureKeyVaultSecret(
            credentials_secret, vault_name=vault_name
        ).run()
        credentials = json.loads(credentials_str)
    else:
        credentials = local_config.get("CLOUD_FOR_CUSTOMERS")[env]

    final_df = pd.DataFrame()
    next_batch = True
    while next_batch:
        new_url = f"{report_url}&$top={top}&$skip={skip}"
        chunk_from_url = CloudForCustomers(
            report_url=new_url, env=env, credentials=credentials
        )
        df = chunk_from_url.to_df()
        final_df = final_df.append(df)
        if not final_df.empty:
            df_count = df.count()[1]
            if df_count != top:
                next_batch = False
            skip += top
        else:
            break
    return final_df