SQL Sources
viadot.sources.base.Source
to_arrow(self, if_empty='warn')
Creates a pyarrow table from source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
if_empty |
str |
: What to do if data sourse contains no data. Defaults to "warn". |
'warn' |
Source code in viadot/sources/base.py
def to_arrow(self, if_empty: str = "warn") -> pa.Table:
"""
Creates a pyarrow table from source.
Args:
if_empty (str, optional): : What to do if data sourse contains no data. Defaults to "warn".
"""
try:
df = self.to_df(if_empty=if_empty)
except SKIP:
return False
table = pa.Table.from_pandas(df)
return table
to_csv(self, path, if_exists='replace', if_empty='warn', sep='\t', **kwargs)
Write from source to a CSV file.
Note that the source can be a particular file or table,
but also a database in general. Therefore, some sources may require
additional parameters to pull the right resource. Hence this method
passes kwargs to the to_df()
method implemented by the concrete source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The destination path. |
required |
if_exists |
Literal[ |
What to do if the file exists. |
'replace' |
if_empty |
str |
What to do if the source contains no data. |
'warn' |
sep |
str |
The separator to use in the CSV. Defaults to " ". |
'\t' |
Exceptions:
Type | Description |
---|---|
ValueError |
If the |
Returns:
Type | Description |
---|---|
bool |
Whether the operation was successful. |
Source code in viadot/sources/base.py
def to_csv(
self,
path: str,
if_exists: Literal["append", "replace"] = "replace",
if_empty: str = "warn",
sep="\t",
**kwargs,
) -> bool:
"""
Write from source to a CSV file.
Note that the source can be a particular file or table,
but also a database in general. Therefore, some sources may require
additional parameters to pull the right resource. Hence this method
passes kwargs to the `to_df()` method implemented by the concrete source.
Args:
path (str): The destination path.
if_exists (Literal[, optional): What to do if the file exists.
Defaults to "replace".
if_empty (str, optional): What to do if the source contains no data.
Defaults to "warn".
sep (str, optional): The separator to use in the CSV. Defaults to "\t".
Raises:
ValueError: If the `if_exists` argument is incorrect.
Returns:
bool: Whether the operation was successful.
"""
try:
df = self.to_df(if_empty=if_empty, **kwargs)
except SKIP:
return False
if if_exists == "append":
mode = "a"
elif if_exists == "replace":
mode = "w"
else:
raise ValueError("'if_exists' must be one of ['append', 'replace']")
df.to_csv(
path, sep=sep, mode=mode, index=False, header=not os.path.exists(path)
)
return True
to_excel(self, path, if_exists='replace', if_empty='warn')
Write from source to a excel file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The destination path. |
required |
if_exists |
str |
What to do if the file exists. Defaults to "replace". |
'replace' |
if_empty |
str |
What to do if the source contains no data. |
'warn' |
Source code in viadot/sources/base.py
def to_excel(
self, path: str, if_exists: str = "replace", if_empty: str = "warn"
) -> bool:
"""
Write from source to a excel file.
Args:
path (str): The destination path.
if_exists (str, optional): What to do if the file exists. Defaults to "replace".
if_empty (str, optional): What to do if the source contains no data.
"""
try:
df = self.to_df(if_empty=if_empty)
except SKIP:
return False
if if_exists == "append":
if os.path.isfile(path):
excel_df = pd.read_excel(path)
out_df = pd.concat([excel_df, df])
else:
out_df = df
elif if_exists == "replace":
out_df = df
out_df.to_excel(path, index=False, encoding="utf8")
return True
to_parquet(self, path, if_exists='replace', if_empty='warn', **kwargs)
Write from source to a Parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The destination path. |
required |
if_exists |
Literal["append", "replace", "skip"] |
What to do if the file exists. Defaults to "replace". |
'replace' |
if_empty |
Literal["warn", "fail", "skip"] |
What to do if the source contains no data. Defaults to "warn". |
'warn' |
Source code in viadot/sources/base.py
def to_parquet(
self,
path: str,
if_exists: Literal["append", "replace", "skip"] = "replace",
if_empty: Literal["warn", "fail", "skip"] = "warn",
**kwargs,
) -> None:
"""
Write from source to a Parquet file.
Args:
path (str): The destination path.
if_exists (Literal["append", "replace", "skip"], optional): What to do if the file exists. Defaults to "replace".
if_empty (Literal["warn", "fail", "skip"], optional): What to do if the source contains no data. Defaults to "warn".
"""
try:
df = self.to_df(if_empty=if_empty)
except SKIP:
return False
if if_exists == "append" and os.path.isfile(path):
parquet_df = pd.read_parquet(path)
out_df = pd.concat([parquet_df, df])
elif if_exists == "replace":
out_df = df
elif if_exists == "skip":
logger.info("Skipped.")
return
else:
out_df = df
# create directories if they don't exist
try:
if not os.path.isfile(path):
directory = os.path.dirname(path)
os.makedirs(directory, exist_ok=True)
except FileNotFoundError:
logger.info("File not found.")
pass
out_df.to_parquet(path, index=False, **kwargs)
viadot.sources.base.SQL (Source)
con: Connection
property
readonly
A singleton-like property for initiating a connection to the database.
Returns:
Type | Description |
---|---|
pyodbc.Connection |
database connection. |
conn_str: str
property
readonly
Generate a connection string from params or config. Note that the user and password are escaped with '{}' characters.
Returns:
Type | Description |
---|---|
str |
The ODBC connection string. |
__init__(self, driver=None, config_key=None, credentials=None, query_timeout=3600, *args, **kwargs)
special
A base SQL source class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
driver |
str |
The SQL driver to use. Defaults to None. |
None |
config_key |
str |
The key inside local config containing the config. |
None |
credentials |
str |
Credentials for the connection. Defaults to None. |
None |
query_timeout |
int |
The timeout for executed queries. Defaults to 1 hour. |
3600 |
Source code in viadot/sources/base.py
def __init__(
self,
driver: str = None,
config_key: str = None,
credentials: str = None,
query_timeout: int = 60 * 60,
*args,
**kwargs,
):
"""A base SQL source class.
Args:
driver (str, optional): The SQL driver to use. Defaults to None.
config_key (str, optional): The key inside local config containing the config.
User can choose to use this or pass credentials directly to the `credentials`
parameter. Defaults to None.
credentials (str, optional): Credentials for the connection. Defaults to None.
query_timeout (int, optional): The timeout for executed queries. Defaults to 1 hour.
"""
self.query_timeout = query_timeout
if config_key:
config_credentials = local_config.get(config_key)
else:
config_credentials = None
credentials = credentials or config_credentials or {}
if driver:
credentials["driver"] = driver
super().__init__(*args, credentials=credentials, **kwargs)
self._con = None
create_table(self, table, schema=None, dtypes=None, if_exists='fail')
Create a table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
str |
The destination table. Defaults to None. |
required |
schema |
str |
The destination schema. Defaults to None. |
None |
dtypes |
Dict[str, Any] |
The data types to use for the table. Defaults to None. |
None |
if_exists |
Literal |
What to do if the table already exists. Defaults to "fail". |
'fail' |
Returns:
Type | Description |
---|---|
bool |
Whether the operation was successful. |
Source code in viadot/sources/base.py
def create_table(
self,
table: str,
schema: str = None,
dtypes: Dict[str, Any] = None,
if_exists: Literal["fail", "replace", "skip", "delete"] = "fail",
) -> bool:
"""Create a table.
Args:
table (str): The destination table. Defaults to None.
schema (str, optional): The destination schema. Defaults to None.
dtypes (Dict[str, Any], optional): The data types to use for the table. Defaults to None.
if_exists (Literal, optional): What to do if the table already exists. Defaults to "fail".
Returns:
bool: Whether the operation was successful.
"""
fqn = f"{schema}.{table}" if schema is not None else table
exists = self._check_if_table_exists(schema=schema, table=table)
if exists:
if if_exists == "replace":
self.run(f"DROP TABLE {fqn}")
elif if_exists == "delete":
self.run(f"DELETE FROM {fqn}")
return True
elif if_exists == "fail":
raise ValueError(
"The table already exists and 'if_exists' is set to 'fail'."
)
elif if_exists == "skip":
return False
indent = " "
dtypes_rows = [
indent + f'"{col}"' + " " + dtype for col, dtype in dtypes.items()
]
dtypes_formatted = ",\n".join(dtypes_rows)
create_table_sql = f"CREATE TABLE {fqn}(\n{dtypes_formatted}\n)"
self.run(create_table_sql)
return True
insert_into(self, table, df)
Insert values from a pandas DataFrame into an existing database table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
str |
table name |
required |
df |
pd.DataFrame |
pandas dataframe |
required |
Returns:
Type | Description |
---|---|
str |
The executed SQL insert query. |
Source code in viadot/sources/base.py
def insert_into(self, table: str, df: pd.DataFrame) -> str:
"""Insert values from a pandas DataFrame into an existing
database table.
Args:
table (str): table name
df (pd.DataFrame): pandas dataframe
Returns:
str: The executed SQL insert query.
"""
values = ""
rows_count = df.shape[0]
counter = 0
for row in df.values:
counter += 1
out_row = ", ".join(map(self._sql_column, row))
comma = ",\n"
if counter == rows_count:
comma = ";"
out_row = f"({out_row}){comma}"
values += out_row
columns = ", ".join(df.columns)
sql = f"INSERT INTO {table} ({columns})\n VALUES {values}"
self.run(sql)
return sql
to_df(self, query, con=None, if_empty=None)
Creates DataFrame form SQL query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str |
SQL query. If don't start with "SELECT" returns empty DataFrame. |
required |
con |
pyodbc.Connection |
The connection to use to pull the data. |
None |
if_empty |
str |
What to do if the query returns no data. Defaults to None. |
None |
Source code in viadot/sources/base.py
def to_df(
self, query: str, con: pyodbc.Connection = None, if_empty: str = None
) -> pd.DataFrame:
"""Creates DataFrame form SQL query.
Args:
query (str): SQL query. If don't start with "SELECT" returns empty DataFrame.
con (pyodbc.Connection, optional): The connection to use to pull the data.
if_empty (str, optional): What to do if the query returns no data. Defaults to None.
"""
conn = con or self.con
query_sanitized = query.strip().upper()
if query_sanitized.startswith("SELECT") or query_sanitized.startswith("WITH"):
df = pd.read_sql_query(query, conn)
if df.empty:
self._handle_if_empty(if_empty=if_empty)
else:
df = pd.DataFrame()
return df
viadot.sources.azure_data_lake.AzureDataLake (Source)
A class for pulling data from the Azure Data Lakes (gen1 and gen2). You can either connect to the lake in general or to a particular path, eg. lake = AzureDataLake(); lake.exists("a/b/c.csv") vs lake = AzureDataLake(path="a/b/c.csv"); lake.exists()
Parameters
credentials : Dict[str, Any], optional A dictionary containing ACCOUNT_NAME and the following Service Principal credentials: - AZURE_TENANT_ID - AZURE_CLIENT_ID - AZURE_CLIENT_SECRET
cp(self, from_path=None, to_path=None, recursive=False)
Copies source to a destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
Path form which to copy file. Defauls to None. |
None |
to_path |
str |
Path where to copy files. Defaults to None. |
None |
recursive |
bool |
Whether to copy files recursively or not. Defaults to False. |
False |
Source code in viadot/sources/azure_data_lake.py
def cp(self, from_path: str = None, to_path: str = None, recursive: bool = False):
"""
Copies source to a destination.
Args:
from_path (str, optional): Path form which to copy file. Defauls to None.
to_path (str, optional): Path where to copy files. Defaults to None.
recursive (bool, optional): Whether to copy files recursively or not. Defaults to False.
"""
from_path = from_path or self.path
to_path = to_path
self.fs.cp(from_path, to_path, recursive=recursive)
exists(self, path=None)
Check if a location exists in Azure Data Lake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to check. Can be a file or a directory. |
None |
Examples:
from viadot.sources import AzureDataLake
lake = AzureDataLake(gen=1)
lake.exists("tests/test.csv")
Returns:
Type | Description |
---|---|
bool |
Whether the paths exists. |
Source code in viadot/sources/azure_data_lake.py
def exists(self, path: str = None) -> bool:
"""
Check if a location exists in Azure Data Lake.
Args:
path (str): The path to check. Can be a file or a directory.
Example:
```python
from viadot.sources import AzureDataLake
lake = AzureDataLake(gen=1)
lake.exists("tests/test.csv")
```
Returns:
bool: Whether the paths exists.
"""
path = path or self.path
return self.fs.exists(path)
find(self, path=None)
Returns list of files in a path using recursive method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Path to a folder. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
List[str] |
List of paths. |
Source code in viadot/sources/azure_data_lake.py
def find(self, path: str = None) -> List[str]:
"""
Returns list of files in a path using recursive method.
Args:
path (str, optional): Path to a folder. Defaults to None.
Returns:
List[str]: List of paths.
"""
path = path or self.path
files_path_list_raw = self.fs.find(path)
paths_list = [p for p in files_path_list_raw if not p.endswith("/")]
return paths_list
ls(self, path=None)
Returns list of files in a path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Path to a folder. Defaults to None. |
None |
Source code in viadot/sources/azure_data_lake.py
def ls(self, path: str = None) -> List[str]:
"""
Returns list of files in a path.
Args:
path (str, optional): Path to a folder. Defaults to None.
"""
path = path or self.path
return self.fs.ls(path)
rm(self, path=None, recursive=False)
Deletes a file or directory from the path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Path to a folder or file. Defaults to None. |
None |
recursive |
bool |
Whether to delete files recursively or not. Defaults to False. |
False |
Source code in viadot/sources/azure_data_lake.py
def rm(self, path: str = None, recursive: bool = False):
"""
Deletes a file or directory from the path.
Args:
path (str, optional): Path to a folder or file. Defaults to None.
recursive (bool, optional): Whether to delete files recursively or not. Defaults to False.
"""
path = path or self.path
self.fs.rm(path, recursive=recursive)
upload(self, from_path, to_path=None, recursive=False, overwrite=False)
Upload file(s) to the lake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_path |
str |
Path to the local file(s) to be uploaded. |
required |
to_path |
str |
Path to the destination file/folder |
None |
recursive |
bool |
Set this to true if working with directories. |
False |
overwrite |
bool |
Whether to overwrite the file(s) if they exist. |
False |
Examples:
from viadot.sources import AzureDataLake
lake = AzureDataLake()
lake.upload(from_path='tests/test.csv', to_path="sandbox/test.csv")
Source code in viadot/sources/azure_data_lake.py
def upload(
self,
from_path: str,
to_path: str = None,
recursive: bool = False,
overwrite: bool = False,
) -> None:
"""
Upload file(s) to the lake.
Args:
from_path (str): Path to the local file(s) to be uploaded.
to_path (str): Path to the destination file/folder
recursive (bool): Set this to true if working with directories.
overwrite (bool): Whether to overwrite the file(s) if they exist.
Example:
```python
from viadot.sources import AzureDataLake
lake = AzureDataLake()
lake.upload(from_path='tests/test.csv', to_path="sandbox/test.csv")
```
"""
if self.gen == 1:
raise NotImplemented(
"Azure Data Lake Gen1 does not support simple file upload."
)
to_path = to_path or self.path
self.fs.upload(
lpath=from_path,
rpath=to_path,
recursive=recursive,
overwrite=overwrite,
)
viadot.sources.azure_sql.AzureSQL (SQLServer)
bulk_insert(self, table, schema=None, source_path=None, sep='\t', if_exists='append')
Fuction to bulk insert.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
str |
Table name. |
required |
schema |
str |
Schema name. Defaults to None. |
None |
source_path |
str |
Full path to a data file. Defaults to one. |
None |
sep |
str |
field terminator to be used for char and widechar data files. Defaults to " ". |
'\t' |
if_exists |
Literal |
What to do if the table already exists. Defaults to "append". |
'append' |
Source code in viadot/sources/azure_sql.py
def bulk_insert(
self,
table: str,
schema: str = None,
source_path: str = None,
sep="\t",
if_exists: Literal = "append",
):
"""Fuction to bulk insert.
Args:
table (str): Table name.
schema (str, optional): Schema name. Defaults to None.
source_path (str, optional): Full path to a data file. Defaults to one.
sep (str, optional): field terminator to be used for char and widechar data files. Defaults to "\t".
if_exists (Literal, optional): What to do if the table already exists. Defaults to "append".
"""
if schema is None:
schema = self.DEFAULT_SCHEMA
fqn = f"{schema}.{table}"
insert_sql = f"""
BULK INSERT {fqn} FROM '{source_path}'
WITH (
CHECK_CONSTRAINTS,
DATA_SOURCE='{self.credentials['data_source']}',
DATAFILETYPE='char',
FIELDTERMINATOR='{sep}',
ROWTERMINATOR='0x0a',
FIRSTROW=2,
KEEPIDENTITY,
TABLOCK,
CODEPAGE='65001'
);
"""
if if_exists == "replace":
self.run(f"DELETE FROM {schema}.{table}")
self.run(insert_sql)
return True
create_external_database(self, external_database_name, storage_account_name, container_name, sas_token, master_key_password, credential_name=None)
Create an external database. Used to eg. execute BULK INSERT or OPENROWSET queries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
external_database_name |
str |
The name of the extrnal source (db) to be created. |
required |
storage_account_name |
str |
The name of the Azure storage account. |
required |
container_name |
str |
The name of the container which should become the "database". |
required |
sas_token |
str |
The SAS token to be used as the credential. Note that the auth |
required |
master_key_password |
str |
The password for the database master key of your |
required |
credential_name |
str |
How to name the SAS credential. This is really an Azure |
None |
Source code in viadot/sources/azure_sql.py
def create_external_database(
self,
external_database_name: str,
storage_account_name: str,
container_name: str,
sas_token: str,
master_key_password: str,
credential_name: str = None,
):
"""Create an external database. Used to eg. execute BULK INSERT or OPENROWSET
queries.
Args:
external_database_name (str): The name of the extrnal source (db) to be created.
storage_account_name (str): The name of the Azure storage account.
container_name (str): The name of the container which should become the "database".
sas_token (str): The SAS token to be used as the credential. Note that the auth
system in Azure is pretty broken and you might need to paste here your storage
account's account key instead.
master_key_password (str): The password for the database master key of your
Azure SQL Database.
credential_name (str): How to name the SAS credential. This is really an Azure
internal thing and can be anything. By default '{external_database_name}_credential`.
"""
# stupid Microsoft thing
if sas_token.startswith("?"):
sas_token = sas_token[1:]
if credential_name is None:
credential_name = f"{external_database_name}_credential"
create_master_key_sql = (
f"CREATE MASTER KEY ENCRYPTION BY PASSWORD = {master_key_password}"
)
create_external_db_credential_sql = f"""
CREATE DATABASE SCOPED CREDENTIAL {credential_name}
WITH IDENTITY = 'SHARED ACCESS SIGNATURE'
SECRET = '{sas_token}';
"""
create_external_db_sql = f"""
CREATE EXTERNAL DATA SOURCE {external_database_name} WITH (
LOCATION = f'https://{storage_account_name}.blob.core.windows.net/{container_name}',
CREDENTIAL = {credential_name}
);
"""
self.run(create_master_key_sql)
self.run(create_external_db_credential_sql)
self.run(create_external_db_sql)
viadot.sources.sqlite.SQLite (SQL)
A SQLite source
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server |
str |
server string, usually localhost |
required |
db |
str |
the file path to the db e.g. /home/somedb.sqlite |
required |
conn_str
property
readonly
Generate a connection string from params or config. Note that the user and password are escapedd with '{}' characters.
Returns:
Type | Description |
---|---|
str |
The ODBC connection string. |