Skip to content

SQL Sources

viadot.sources.base.Source

to_arrow(self, if_empty='warn')

Creates a pyarrow table from source.

Parameters:

Name Type Description Default
if_empty str

: What to do if data sourse contains no data. Defaults to "warn".

'warn'
Source code in viadot/sources/base.py
def to_arrow(self, if_empty: str = "warn") -> pa.Table:
    """
    Creates a pyarrow table from source.
    Args:
        if_empty (str, optional): : What to do if data sourse contains no data. Defaults to "warn".
    """

    try:
        df = self.to_df(if_empty=if_empty)
    except SKIP:
        return False

    table = pa.Table.from_pandas(df)
    return table

to_csv(self, path, if_exists='replace', if_empty='warn', sep='\t', **kwargs)

Write from source to a CSV file. Note that the source can be a particular file or table, but also a database in general. Therefore, some sources may require additional parameters to pull the right resource. Hence this method passes kwargs to the to_df() method implemented by the concrete source.

Parameters:

Name Type Description Default
path str

The destination path.

required
if_exists Literal[

What to do if the file exists.

'replace'
if_empty str

What to do if the source contains no data.

'warn'
sep str

The separator to use in the CSV. Defaults to " ".

'\t'

Exceptions:

Type Description
ValueError

If the if_exists argument is incorrect.

Returns:

Type Description
bool

Whether the operation was successful.

Source code in viadot/sources/base.py
def to_csv(
    self,
    path: str,
    if_exists: Literal["append", "replace"] = "replace",
    if_empty: str = "warn",
    sep="\t",
    **kwargs,
) -> bool:
    """
    Write from source to a CSV file.
    Note that the source can be a particular file or table,
    but also a database in general. Therefore, some sources may require
    additional parameters to pull the right resource. Hence this method
    passes kwargs to the `to_df()` method implemented by the concrete source.

    Args:
        path (str): The destination path.
        if_exists (Literal[, optional): What to do if the file exists.
        Defaults to "replace".
        if_empty (str, optional): What to do if the source contains no data.
        Defaults to "warn".
        sep (str, optional): The separator to use in the CSV. Defaults to "\t".

    Raises:
        ValueError: If the `if_exists` argument is incorrect.

    Returns:
        bool: Whether the operation was successful.
    """

    try:
        df = self.to_df(if_empty=if_empty, **kwargs)
    except SKIP:
        return False

    if if_exists == "append":
        mode = "a"
    elif if_exists == "replace":
        mode = "w"
    else:
        raise ValueError("'if_exists' must be one of ['append', 'replace']")

    df.to_csv(
        path, sep=sep, mode=mode, index=False, header=not os.path.exists(path)
    )

    return True

to_excel(self, path, if_exists='replace', if_empty='warn')

Write from source to a excel file.

Parameters:

Name Type Description Default
path str

The destination path.

required
if_exists str

What to do if the file exists. Defaults to "replace".

'replace'
if_empty str

What to do if the source contains no data.

'warn'
Source code in viadot/sources/base.py
def to_excel(
    self, path: str, if_exists: str = "replace", if_empty: str = "warn"
) -> bool:
    """
    Write from source to a excel file.
    Args:
        path (str): The destination path.
        if_exists (str, optional): What to do if the file exists. Defaults to "replace".
        if_empty (str, optional): What to do if the source contains no data.

    """

    try:
        df = self.to_df(if_empty=if_empty)
    except SKIP:
        return False

    if if_exists == "append":
        if os.path.isfile(path):
            excel_df = pd.read_excel(path)
            out_df = pd.concat([excel_df, df])
        else:
            out_df = df
    elif if_exists == "replace":
        out_df = df
    out_df.to_excel(path, index=False, encoding="utf8")
    return True

to_parquet(self, path, if_exists='replace', if_empty='warn', **kwargs)

Write from source to a Parquet file.

Parameters:

Name Type Description Default
path str

The destination path.

required
if_exists Literal["append", "replace", "skip"]

What to do if the file exists. Defaults to "replace".

'replace'
if_empty Literal["warn", "fail", "skip"]

What to do if the source contains no data. Defaults to "warn".

'warn'
Source code in viadot/sources/base.py
def to_parquet(
    self,
    path: str,
    if_exists: Literal["append", "replace", "skip"] = "replace",
    if_empty: Literal["warn", "fail", "skip"] = "warn",
    **kwargs,
) -> None:
    """
    Write from source to a Parquet file.

    Args:
        path (str): The destination path.
        if_exists (Literal["append", "replace", "skip"], optional): What to do if the file exists. Defaults to "replace".
        if_empty (Literal["warn", "fail", "skip"], optional): What to do if the source contains no data. Defaults to "warn".

    """
    try:
        df = self.to_df(if_empty=if_empty)
    except SKIP:
        return False
    if if_exists == "append" and os.path.isfile(path):
        parquet_df = pd.read_parquet(path)
        out_df = pd.concat([parquet_df, df])
    elif if_exists == "replace":
        out_df = df
    elif if_exists == "skip":
        logger.info("Skipped.")
        return
    else:
        out_df = df

    # create directories if they don't exist
    try:
        if not os.path.isfile(path):
            directory = os.path.dirname(path)
            os.makedirs(directory, exist_ok=True)
    except FileNotFoundError:
        logger.info("File not found.")
        pass

    out_df.to_parquet(path, index=False, **kwargs)

viadot.sources.base.SQL (Source)

con: Connection property readonly

A singleton-like property for initiating a connection to the database.

Returns:

Type Description
pyodbc.Connection

database connection.

conn_str: str property readonly

Generate a connection string from params or config. Note that the user and password are escaped with '{}' characters.

Returns:

Type Description
str

The ODBC connection string.

__init__(self, driver=None, config_key=None, credentials=None, query_timeout=3600, *args, **kwargs) special

A base SQL source class.

Parameters:

Name Type Description Default
driver str

The SQL driver to use. Defaults to None.

None
config_key str

The key inside local config containing the config.

None
credentials str

Credentials for the connection. Defaults to None.

None
query_timeout int

The timeout for executed queries. Defaults to 1 hour.

3600
Source code in viadot/sources/base.py
def __init__(
    self,
    driver: str = None,
    config_key: str = None,
    credentials: str = None,
    query_timeout: int = 60 * 60,
    *args,
    **kwargs,
):
    """A base SQL source class.

    Args:
        driver (str, optional): The SQL driver to use. Defaults to None.
        config_key (str, optional): The key inside local config containing the config.
        User can choose to use this or pass credentials directly to the `credentials`
        parameter. Defaults to None.
        credentials (str, optional): Credentials for the connection. Defaults to None.
        query_timeout (int, optional): The timeout for executed queries. Defaults to 1 hour.
    """

    self.query_timeout = query_timeout

    if config_key:
        config_credentials = local_config.get(config_key)
    else:
        config_credentials = None

    credentials = credentials or config_credentials or {}

    if driver:
        credentials["driver"] = driver

    super().__init__(*args, credentials=credentials, **kwargs)

    self._con = None

create_table(self, table, schema=None, dtypes=None, if_exists='fail')

Create a table.

Parameters:

Name Type Description Default
table str

The destination table. Defaults to None.

required
schema str

The destination schema. Defaults to None.

None
dtypes Dict[str, Any]

The data types to use for the table. Defaults to None.

None
if_exists Literal

What to do if the table already exists. Defaults to "fail".

'fail'

Returns:

Type Description
bool

Whether the operation was successful.

Source code in viadot/sources/base.py
def create_table(
    self,
    table: str,
    schema: str = None,
    dtypes: Dict[str, Any] = None,
    if_exists: Literal["fail", "replace", "skip", "delete"] = "fail",
) -> bool:
    """Create a table.

    Args:
        table (str): The destination table. Defaults to None.
        schema (str, optional): The destination schema. Defaults to None.
        dtypes (Dict[str, Any], optional): The data types to use for the table. Defaults to None.
        if_exists (Literal, optional): What to do if the table already exists. Defaults to "fail".

    Returns:
        bool: Whether the operation was successful.
    """
    fqn = f"{schema}.{table}" if schema is not None else table
    exists = self._check_if_table_exists(schema=schema, table=table)

    if exists:
        if if_exists == "replace":
            self.run(f"DROP TABLE {fqn}")
        elif if_exists == "delete":
            self.run(f"DELETE FROM {fqn}")
            return True
        elif if_exists == "fail":
            raise ValueError(
                "The table already exists and 'if_exists' is set to 'fail'."
            )
        elif if_exists == "skip":
            return False

    indent = "  "
    dtypes_rows = [
        indent + f'"{col}"' + " " + dtype for col, dtype in dtypes.items()
    ]
    dtypes_formatted = ",\n".join(dtypes_rows)
    create_table_sql = f"CREATE TABLE {fqn}(\n{dtypes_formatted}\n)"
    self.run(create_table_sql)
    return True

insert_into(self, table, df)

Insert values from a pandas DataFrame into an existing database table.

Parameters:

Name Type Description Default
table str

table name

required
df pd.DataFrame

pandas dataframe

required

Returns:

Type Description
str

The executed SQL insert query.

Source code in viadot/sources/base.py
def insert_into(self, table: str, df: pd.DataFrame) -> str:
    """Insert values from a pandas DataFrame into an existing
    database table.

    Args:
        table (str): table name
        df (pd.DataFrame): pandas dataframe

    Returns:
        str: The executed SQL insert query.
    """

    values = ""
    rows_count = df.shape[0]
    counter = 0
    for row in df.values:
        counter += 1
        out_row = ", ".join(map(self._sql_column, row))
        comma = ",\n"
        if counter == rows_count:
            comma = ";"
        out_row = f"({out_row}){comma}"
        values += out_row

    columns = ", ".join(df.columns)

    sql = f"INSERT INTO {table} ({columns})\n VALUES {values}"
    self.run(sql)

    return sql

to_df(self, query, con=None, if_empty=None)

Creates DataFrame form SQL query.

Parameters:

Name Type Description Default
query str

SQL query. If don't start with "SELECT" returns empty DataFrame.

required
con pyodbc.Connection

The connection to use to pull the data.

None
if_empty str

What to do if the query returns no data. Defaults to None.

None
Source code in viadot/sources/base.py
def to_df(
    self, query: str, con: pyodbc.Connection = None, if_empty: str = None
) -> pd.DataFrame:
    """Creates DataFrame form SQL query.
    Args:
        query (str): SQL query. If don't start with "SELECT" returns empty DataFrame.
        con (pyodbc.Connection, optional): The connection to use to pull the data.
        if_empty (str, optional): What to do if the query returns no data. Defaults to None.
    """
    conn = con or self.con

    query_sanitized = query.strip().upper()
    if query_sanitized.startswith("SELECT") or query_sanitized.startswith("WITH"):
        df = pd.read_sql_query(query, conn)
        if df.empty:
            self._handle_if_empty(if_empty=if_empty)
    else:
        df = pd.DataFrame()
    return df

viadot.sources.azure_data_lake.AzureDataLake (Source)

A class for pulling data from the Azure Data Lakes (gen1 and gen2). You can either connect to the lake in general or to a particular path, eg. lake = AzureDataLake(); lake.exists("a/b/c.csv") vs lake = AzureDataLake(path="a/b/c.csv"); lake.exists()

Parameters

credentials : Dict[str, Any], optional A dictionary containing ACCOUNT_NAME and the following Service Principal credentials: - AZURE_TENANT_ID - AZURE_CLIENT_ID - AZURE_CLIENT_SECRET

cp(self, from_path=None, to_path=None, recursive=False)

Copies source to a destination.

Parameters:

Name Type Description Default
from_path str

Path form which to copy file. Defauls to None.

None
to_path str

Path where to copy files. Defaults to None.

None
recursive bool

Whether to copy files recursively or not. Defaults to False.

False
Source code in viadot/sources/azure_data_lake.py
def cp(self, from_path: str = None, to_path: str = None, recursive: bool = False):
    """
    Copies source to a destination.

    Args:
        from_path (str, optional): Path form which to copy file. Defauls to None.
        to_path (str, optional): Path where to copy files. Defaults to None.
        recursive (bool, optional): Whether to copy files recursively or not. Defaults to False.
    """
    from_path = from_path or self.path
    to_path = to_path
    self.fs.cp(from_path, to_path, recursive=recursive)

exists(self, path=None)

Check if a location exists in Azure Data Lake.

Parameters:

Name Type Description Default
path str

The path to check. Can be a file or a directory.

None

Examples:

from viadot.sources import AzureDataLake

lake = AzureDataLake(gen=1)
lake.exists("tests/test.csv")

Returns:

Type Description
bool

Whether the paths exists.

Source code in viadot/sources/azure_data_lake.py
def exists(self, path: str = None) -> bool:
    """
    Check if a location exists in Azure Data Lake.

    Args:
        path (str): The path to check. Can be a file or a directory.

    Example:
    ```python
    from viadot.sources import AzureDataLake

    lake = AzureDataLake(gen=1)
    lake.exists("tests/test.csv")
    ```

    Returns:
        bool: Whether the paths exists.
    """
    path = path or self.path
    return self.fs.exists(path)

find(self, path=None)

Returns list of files in a path using recursive method.

Parameters:

Name Type Description Default
path str

Path to a folder. Defaults to None.

None

Returns:

Type Description
List[str]

List of paths.

Source code in viadot/sources/azure_data_lake.py
def find(self, path: str = None) -> List[str]:
    """
    Returns list of files in a path using recursive method.

    Args:
        path (str, optional): Path to a folder. Defaults to None.

    Returns:
        List[str]: List of paths.

    """
    path = path or self.path

    files_path_list_raw = self.fs.find(path)
    paths_list = [p for p in files_path_list_raw if not p.endswith("/")]
    return paths_list

ls(self, path=None)

Returns list of files in a path.

Parameters:

Name Type Description Default
path str

Path to a folder. Defaults to None.

None
Source code in viadot/sources/azure_data_lake.py
def ls(self, path: str = None) -> List[str]:
    """
    Returns list of files in a path.

    Args:
        path (str, optional): Path to a folder. Defaults to None.
    """
    path = path or self.path

    return self.fs.ls(path)

rm(self, path=None, recursive=False)

Deletes a file or directory from the path.

Parameters:

Name Type Description Default
path str

Path to a folder or file. Defaults to None.

None
recursive bool

Whether to delete files recursively or not. Defaults to False.

False
Source code in viadot/sources/azure_data_lake.py
def rm(self, path: str = None, recursive: bool = False):
    """
    Deletes a file or directory from the path.

    Args:
        path (str, optional): Path to a folder or file. Defaults to None.
        recursive (bool, optional): Whether to delete files recursively or not. Defaults to False.
    """
    path = path or self.path
    self.fs.rm(path, recursive=recursive)

upload(self, from_path, to_path=None, recursive=False, overwrite=False)

Upload file(s) to the lake.

Parameters:

Name Type Description Default
from_path str

Path to the local file(s) to be uploaded.

required
to_path str

Path to the destination file/folder

None
recursive bool

Set this to true if working with directories.

False
overwrite bool

Whether to overwrite the file(s) if they exist.

False

Examples:

from viadot.sources import AzureDataLake
lake = AzureDataLake()
lake.upload(from_path='tests/test.csv', to_path="sandbox/test.csv")
Source code in viadot/sources/azure_data_lake.py
def upload(
    self,
    from_path: str,
    to_path: str = None,
    recursive: bool = False,
    overwrite: bool = False,
) -> None:
    """
    Upload file(s) to the lake.

    Args:
        from_path (str): Path to the local file(s) to be uploaded.
        to_path (str): Path to the destination file/folder
        recursive (bool): Set this to true if working with directories.
        overwrite (bool): Whether to overwrite the file(s) if they exist.

    Example:
    ```python
    from viadot.sources import AzureDataLake
    lake = AzureDataLake()
    lake.upload(from_path='tests/test.csv', to_path="sandbox/test.csv")
    ```
    """

    if self.gen == 1:
        raise NotImplemented(
            "Azure Data Lake Gen1 does not support simple file upload."
        )

    to_path = to_path or self.path
    self.fs.upload(
        lpath=from_path,
        rpath=to_path,
        recursive=recursive,
        overwrite=overwrite,
    )

viadot.sources.azure_sql.AzureSQL (SQLServer)

bulk_insert(self, table, schema=None, source_path=None, sep='\t', if_exists='append')

Fuction to bulk insert.

Parameters:

Name Type Description Default
table str

Table name.

required
schema str

Schema name. Defaults to None.

None
source_path str

Full path to a data file. Defaults to one.

None
sep str

field terminator to be used for char and widechar data files. Defaults to " ".

'\t'
if_exists Literal

What to do if the table already exists. Defaults to "append".

'append'
Source code in viadot/sources/azure_sql.py
def bulk_insert(
    self,
    table: str,
    schema: str = None,
    source_path: str = None,
    sep="\t",
    if_exists: Literal = "append",
):
    """Fuction to bulk insert.
    Args:
        table (str): Table name.
        schema (str, optional): Schema name. Defaults to None.
        source_path (str, optional): Full path to a data file. Defaults to one.
        sep (str, optional):  field terminator to be used for char and widechar data files. Defaults to "\t".
        if_exists (Literal, optional): What to do if the table already exists. Defaults to "append".
    """
    if schema is None:
        schema = self.DEFAULT_SCHEMA
    fqn = f"{schema}.{table}"
    insert_sql = f"""
        BULK INSERT {fqn} FROM '{source_path}'
        WITH (
            CHECK_CONSTRAINTS,
            DATA_SOURCE='{self.credentials['data_source']}',
            DATAFILETYPE='char',
            FIELDTERMINATOR='{sep}',
            ROWTERMINATOR='0x0a',
            FIRSTROW=2,
            KEEPIDENTITY,
            TABLOCK,
            CODEPAGE='65001'
        );
    """
    if if_exists == "replace":
        self.run(f"DELETE FROM {schema}.{table}")
    self.run(insert_sql)
    return True

create_external_database(self, external_database_name, storage_account_name, container_name, sas_token, master_key_password, credential_name=None)

Create an external database. Used to eg. execute BULK INSERT or OPENROWSET queries.

Parameters:

Name Type Description Default
external_database_name str

The name of the extrnal source (db) to be created.

required
storage_account_name str

The name of the Azure storage account.

required
container_name str

The name of the container which should become the "database".

required
sas_token str

The SAS token to be used as the credential. Note that the auth

required
master_key_password str

The password for the database master key of your

required
credential_name str

How to name the SAS credential. This is really an Azure

None
Source code in viadot/sources/azure_sql.py
def create_external_database(
    self,
    external_database_name: str,
    storage_account_name: str,
    container_name: str,
    sas_token: str,
    master_key_password: str,
    credential_name: str = None,
):
    """Create an external database. Used to eg. execute BULK INSERT or OPENROWSET
    queries.

    Args:
        external_database_name (str): The name of the extrnal source (db) to be created.
        storage_account_name (str): The name of the Azure storage account.
        container_name (str): The name of the container which should become the "database".
        sas_token (str): The SAS token to be used as the credential. Note that the auth
        system in Azure is pretty broken and you might need to paste here your storage
        account's account key instead.
        master_key_password (str): The password for the database master key of your
        Azure SQL Database.
        credential_name (str): How to name the SAS credential. This is really an Azure
        internal thing and can be anything. By default '{external_database_name}_credential`.
    """

    # stupid Microsoft thing
    if sas_token.startswith("?"):
        sas_token = sas_token[1:]

    if credential_name is None:
        credential_name = f"{external_database_name}_credential"

    create_master_key_sql = (
        f"CREATE MASTER KEY ENCRYPTION BY PASSWORD = {master_key_password}"
    )

    create_external_db_credential_sql = f"""
    CREATE DATABASE SCOPED CREDENTIAL {credential_name}
    WITH IDENTITY = 'SHARED ACCESS SIGNATURE'
    SECRET = '{sas_token}';
    """

    create_external_db_sql = f"""
    CREATE EXTERNAL DATA SOURCE {external_database_name} WITH (
    LOCATION = f'https://{storage_account_name}.blob.core.windows.net/{container_name}',
    CREDENTIAL = {credential_name}
    );
    """

    self.run(create_master_key_sql)
    self.run(create_external_db_credential_sql)
    self.run(create_external_db_sql)

viadot.sources.sqlite.SQLite (SQL)

A SQLite source

Parameters:

Name Type Description Default
server str

server string, usually localhost

required
db str

the file path to the db e.g. /home/somedb.sqlite

required

conn_str property readonly

Generate a connection string from params or config. Note that the user and password are escapedd with '{}' characters.

Returns:

Type Description
str

The ODBC connection string.