API Sources
viadot.sources.uk_carbon_intensity.UKCarbonIntensity (Source)
Fetches data of Carbon Intensity of the UK Power Grid.
Documentation for this source API is located at: https://carbon-intensity.github.io/api-definitions/#carbon-intensity-api-v2-0-0
Parameters
api_url : str, optional The URL endpoint to call, by default None
to_df(self, if_empty='warn')
Returns a pandas DataFrame with flattened data
Returns:
Type | Description |
---|---|
pandas.DataFrame |
A Pandas DataFrame |
Source code in viadot/sources/uk_carbon_intensity.py
def to_df(self, if_empty: str = "warn"):
"""Returns a pandas DataFrame with flattened data
Returns:
pandas.DataFrame: A Pandas DataFrame
"""
from_ = []
to = []
forecast = []
actual = []
max_ = []
average = []
min_ = []
index = []
json_data = self.to_json()
if not json_data:
self._handle_if_empty(if_empty)
for row in json_data["data"]:
from_.append(row["from"])
to.append(row["to"])
index.append(row["intensity"]["index"])
try:
forecast.append(row["intensity"]["forecast"])
actual.append(row["intensity"]["actual"])
df = pd.DataFrame(
{
"from": from_,
"to": to,
"forecast": forecast,
"actual": actual,
"index": index,
}
)
except KeyError:
max_.append(row["intensity"]["max"])
average.append(row["intensity"]["average"])
min_.append(row["intensity"]["min"])
df = pd.DataFrame(
{
"from": from_,
"to": to,
"max": max_,
"average": average,
"min": min_,
}
)
return df
to_json(self)
Creates json file
Source code in viadot/sources/uk_carbon_intensity.py
def to_json(self):
"""Creates json file"""
url = f"{self.API_ENDPOINT}{self.api_url}"
headers = {"Accept": "application/json"}
response = requests.get(url, params={}, headers=headers)
if response.ok:
return response.json()
else:
raise f"Error {response.json()}"
viadot.sources.supermetrics.Supermetrics (Source)
A class implementing the Supermetrics API.
Documentation for this API is located at: https://supermetrics.com/docs/product-api-getting-started/ Usage limits: https://supermetrics.com/docs/product-api-usage-limits/
Parameters
query_params : Dict[str, Any], optional The parameters to pass to the GET query. See https://supermetrics.com/docs/product-api-get-data/ for full specification, by default None
get_params_from_api_query(url)
classmethod
Returns parmeters from API query in a dictionary
Source code in viadot/sources/supermetrics.py
@classmethod
def get_params_from_api_query(cls, url: str) -> Dict[str, Any]:
"""Returns parmeters from API query in a dictionary"""
url_unquoted = urllib.parse.unquote(url)
s = urllib.parse.parse_qs(url_unquoted)
endpoint = list(s.keys())[0]
params = s[endpoint][0]
params_d = json.loads(params)
return params_d
to_df(self, if_empty='warn')
Download data into a pandas DataFrame.
Note that Supermetric can calculate some fields on the fly and alias them in the
returned result. For example, if the query requests the position
field,
Supermetric may return an Average position
caclulated field.
For this reason we take columns names from the actual results rather than from input fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
if_empty |
str |
What to do if query returned no data. Defaults to "warn". |
'warn' |
Returns:
Type | Description |
---|---|
pd.DataFrame |
the DataFrame containing query results |
Source code in viadot/sources/supermetrics.py
def to_df(self, if_empty: str = "warn") -> pd.DataFrame:
"""Download data into a pandas DataFrame.
Note that Supermetric can calculate some fields on the fly and alias them in the
returned result. For example, if the query requests the `position` field,
Supermetric may return an `Average position` caclulated field.
For this reason we take columns names from the actual results rather than from input fields.
Args:
if_empty (str, optional): What to do if query returned no data. Defaults to "warn".
Returns:
pd.DataFrame: the DataFrame containing query results
"""
try:
columns = self._get_col_names()
except ValueError:
columns = None
data = self.to_json()["data"]
if data:
df = pd.DataFrame(data[1:], columns=columns).replace("", np.nan)
else:
df = pd.DataFrame(columns=columns)
if df.empty:
self._handle_if_empty(if_empty)
return df
to_json(self, timeout=(3.05, 1800))
Download query results to a dictionary. Note that Supermetrics API will sometimes hang and not return any error message, so we're adding a timeout to GET.
See requests docs for an explanation of why this timeout value will work on long-running queries but fail fast on connection issues.
Source code in viadot/sources/supermetrics.py
def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]:
"""Download query results to a dictionary.
Note that Supermetrics API will sometimes hang and not return any error message,
so we're adding a timeout to GET.
See [requests docs](https://docs.python-requests.org/en/master/user/advanced/#timeouts)
for an explanation of why this timeout value will work on long-running queries but fail fast
on connection issues.
"""
if not self.query_params:
raise ValueError("Please build the query first")
params = {"json": json.dumps(self.query_params)}
headers = {"Authorization": f'Bearer {self.credentials["API_KEY"]}'}
response = handle_api_response(
url=self.API_ENDPOINT, params=params, headers=headers, timeout=timeout
)
return response.json()
viadot.sources.cloud_for_customers.CloudForCustomers (Source)
__init__(self, *args, *, report_url=None, url=None, endpoint=None, params=None, env='QA', credentials=None, **kwargs)
special
Cloud for Customers connector build for fetching Odata source. See pyodata docs for an explanation how Odata works.
Parameters
report_url (str, optional): The url to the API in case of prepared report. Defaults to None.
url (str, optional): The url to the API. Defaults to None.
endpoint (str, optional): The endpoint of the API. Defaults to None.
params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format.
env (str, optional): The development environments. Defaults to 'QA'.
credentials (Dict[str, Any], optional): The credentials are populated with values from config file or this
parameter. Defaults to None than use credentials from local_config.
Source code in viadot/sources/cloud_for_customers.py
def __init__(
self,
*args,
report_url: str = None,
url: str = None,
endpoint: str = None,
params: Dict[str, Any] = None,
env: str = "QA",
credentials: Dict[str, Any] = None,
**kwargs,
):
"""Cloud for Customers connector build for fetching Odata source.
See [pyodata docs](https://pyodata.readthedocs.io/en/latest/index.html) for an explanation
how Odata works.
Parameters
----------
report_url (str, optional): The url to the API in case of prepared report. Defaults to None.
url (str, optional): The url to the API. Defaults to None.
endpoint (str, optional): The endpoint of the API. Defaults to None.
params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format.
env (str, optional): The development environments. Defaults to 'QA'.
credentials (Dict[str, Any], optional): The credentials are populated with values from config file or this
parameter. Defaults to None than use credentials from local_config.
"""
super().__init__(*args, **kwargs)
try:
DEFAULT_CREDENTIALS = local_config["CLOUD_FOR_CUSTOMERS"].get(env)
except KeyError:
DEFAULT_CREDENTIALS = None
self.credentials = credentials or DEFAULT_CREDENTIALS or {}
self.url = url or self.credentials.get("server")
self.report_url = report_url
if self.url is None and report_url is None:
raise CredentialError("One of: ('url', 'report_url') is required.")
self.is_report = bool(report_url)
self.query_endpoint = endpoint
if params:
params_merged = self.DEFAULT_PARAMS.copy()
params_merged.update(params)
self.params = params_merged
else:
self.params = self.DEFAULT_PARAMS
if self.url:
self.full_url = urljoin(self.url, self.query_endpoint)
super().__init__(*args, credentials=self.credentials, **kwargs)
get_response(self, url, params=None, timeout=(3.05, 1800))
Handle and raise Python exceptions during request. Using of url and service endpoint needs additional parameters stores in params. report_url contain additional params in their structure. In report_url scenario it can not contain params parameter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
the URL which trying to connect. |
required |
params |
Dict[str, Any] |
Additional parameters like filter, used in case of normal url. |
None |
timeout |
tuple |
the request times out. Defaults to (3.05, 60 * 30). |
(3.05, 1800) |
Returns:
Type | Description |
---|---|
Response |
requests.models.Response |
Source code in viadot/sources/cloud_for_customers.py
def get_response(
self, url: str, params: Dict[str, Any] = None, timeout: tuple = (3.05, 60 * 30)
) -> requests.models.Response:
"""Handle and raise Python exceptions during request. Using of url and service endpoint needs additional parameters
stores in params. report_url contain additional params in their structure.
In report_url scenario it can not contain params parameter.
Args:
url (str): the URL which trying to connect.
params (Dict[str, Any], optional): Additional parameters like filter, used in case of normal url.
Defaults to None used in case of report_url, which can not contain params.
timeout (tuple, optional): the request times out. Defaults to (3.05, 60 * 30).
Returns:
requests.models.Response
"""
username = self.credentials.get("username")
pw = self.credentials.get("password")
response = handle_api_response(
url=url,
params=params,
auth=(username, pw),
timeout=timeout,
)
return response
map_columns(self, url=None)
Fetch metadata from url used to column name map.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
the URL which trying to fetch metadata. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Dict[str, str] |
Property Name as key mapped to the value of sap label. |
Source code in viadot/sources/cloud_for_customers.py
def map_columns(self, url: str = None) -> Dict[str, str]:
"""Fetch metadata from url used to column name map.
Args:
url (str, optional): the URL which trying to fetch metadata. Defaults to None.
Returns:
Dict[str, str]: Property Name as key mapped to the value of sap label.
"""
column_mapping = {}
if url:
username = self.credentials.get("username")
pw = self.credentials.get("password")
response = requests.get(url, auth=(username, pw))
for sentence in response.text.split("/>"):
result = re.search(
r'(?<=Name=")([^"]+).+(sap:label=")([^"]+)+', sentence
)
if result:
key = result.groups(0)[0]
val = result.groups(0)[2]
column_mapping[key] = val
return column_mapping
response_to_entity_list(self, dirty_json, url)
Changing request json response to list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dirty_json |
Dict[str, Any] |
json from response. |
required |
url |
str |
the URL which trying to fetch metadata. |
required |
Returns:
Type | Description |
---|---|
List |
List of dictionaries. |
Source code in viadot/sources/cloud_for_customers.py
def response_to_entity_list(self, dirty_json: Dict[str, Any], url: str) -> List:
"""Changing request json response to list.
Args:
dirty_json (Dict[str, Any]): json from response.
url (str): the URL which trying to fetch metadata.
Returns:
List: List of dictionaries.
"""
metadata_url = self.change_to_meta_url(url)
column_maper_dict = self.map_columns(metadata_url)
entity_list = []
for element in dirty_json["d"]["results"]:
new_entity = {}
for key, object_of_interest in element.items():
if key not in ["__metadata", "Photo", "", "Picture"]:
if "{" not in str(object_of_interest):
new_key = column_maper_dict.get(key)
if new_key:
new_entity[new_key] = object_of_interest
else:
new_entity[key] = object_of_interest
entity_list.append(new_entity)
return entity_list
to_df(self, fields=None, if_empty='warn', dtype=None, **kwargs)
Returns records in a pandas DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fields |
List[str] |
List of fields to put in DataFrame. Defaults to None. |
None |
dtype |
dict |
The dtypes to use in the DataFrame. We catch this parameter here since |
None |
kwargs |
The parameters to pass to DataFrame constructor. |
{} |
Source code in viadot/sources/cloud_for_customers.py
def to_df(
self,
fields: List[str] = None,
if_empty: str = "warn",
dtype: dict = None,
**kwargs,
) -> pd.DataFrame:
"""Returns records in a pandas DataFrame.
Args:
fields (List[str], optional): List of fields to put in DataFrame. Defaults to None.
dtype (dict, optional): The dtypes to use in the DataFrame. We catch this parameter here since
pandas doesn't support passing dtypes (eg. as a dict) to the constructor.
kwargs: The parameters to pass to DataFrame constructor.
"""
records = self.to_records()
df = pd.DataFrame(data=records, **kwargs)
if dtype:
df = df.astype(dtype)
if fields:
return df[fields]
return df
to_records(self)
Download a list of entities in the records format
Source code in viadot/sources/cloud_for_customers.py
def to_records(self) -> List[Dict[str, Any]]:
"""
Download a list of entities in the records format
"""
if self.is_report:
url = self.report_url
return self._to_records_report(url=url)
else:
url = self.full_url
return self._to_records_other(url=url)