Skip to content

API Sources

viadot.sources.uk_carbon_intensity.UKCarbonIntensity (Source)

Fetches data of Carbon Intensity of the UK Power Grid.

Documentation for this source API is located at: https://carbon-intensity.github.io/api-definitions/#carbon-intensity-api-v2-0-0

Parameters

api_url : str, optional The URL endpoint to call, by default None

to_df(self, if_empty='warn')

Returns a pandas DataFrame with flattened data

Returns:

Type Description
pandas.DataFrame

A Pandas DataFrame

Source code in viadot/sources/uk_carbon_intensity.py
def to_df(self, if_empty: str = "warn"):
    """Returns a pandas DataFrame with flattened data

    Returns:
        pandas.DataFrame: A Pandas DataFrame
    """
    from_ = []
    to = []
    forecast = []
    actual = []
    max_ = []
    average = []
    min_ = []
    index = []
    json_data = self.to_json()

    if not json_data:
        self._handle_if_empty(if_empty)

    for row in json_data["data"]:
        from_.append(row["from"])
        to.append(row["to"])
        index.append(row["intensity"]["index"])
        try:
            forecast.append(row["intensity"]["forecast"])
            actual.append(row["intensity"]["actual"])
            df = pd.DataFrame(
                {
                    "from": from_,
                    "to": to,
                    "forecast": forecast,
                    "actual": actual,
                    "index": index,
                }
            )
        except KeyError:
            max_.append(row["intensity"]["max"])
            average.append(row["intensity"]["average"])
            min_.append(row["intensity"]["min"])
            df = pd.DataFrame(
                {
                    "from": from_,
                    "to": to,
                    "max": max_,
                    "average": average,
                    "min": min_,
                }
            )
    return df

to_json(self)

Creates json file

Source code in viadot/sources/uk_carbon_intensity.py
def to_json(self):
    """Creates json file"""
    url = f"{self.API_ENDPOINT}{self.api_url}"
    headers = {"Accept": "application/json"}
    response = requests.get(url, params={}, headers=headers)
    if response.ok:
        return response.json()
    else:
        raise f"Error {response.json()}"

viadot.sources.supermetrics.Supermetrics (Source)

A class implementing the Supermetrics API.

Documentation for this API is located at: https://supermetrics.com/docs/product-api-getting-started/ Usage limits: https://supermetrics.com/docs/product-api-usage-limits/

Parameters

query_params : Dict[str, Any], optional The parameters to pass to the GET query. See https://supermetrics.com/docs/product-api-get-data/ for full specification, by default None

get_params_from_api_query(url) classmethod

Returns parmeters from API query in a dictionary

Source code in viadot/sources/supermetrics.py
@classmethod
def get_params_from_api_query(cls, url: str) -> Dict[str, Any]:
    """Returns parmeters from API query in a dictionary"""
    url_unquoted = urllib.parse.unquote(url)
    s = urllib.parse.parse_qs(url_unquoted)
    endpoint = list(s.keys())[0]
    params = s[endpoint][0]
    params_d = json.loads(params)
    return params_d

to_df(self, if_empty='warn')

Download data into a pandas DataFrame.

Note that Supermetric can calculate some fields on the fly and alias them in the returned result. For example, if the query requests the position field, Supermetric may return an Average position caclulated field. For this reason we take columns names from the actual results rather than from input fields.

Parameters:

Name Type Description Default
if_empty str

What to do if query returned no data. Defaults to "warn".

'warn'

Returns:

Type Description
pd.DataFrame

the DataFrame containing query results

Source code in viadot/sources/supermetrics.py
def to_df(self, if_empty: str = "warn") -> pd.DataFrame:
    """Download data into a pandas DataFrame.

    Note that Supermetric can calculate some fields on the fly and alias them in the
    returned result. For example, if the query requests the `position` field,
    Supermetric may return an `Average position` caclulated field.
    For this reason we take columns names from the actual results rather than from input fields.

    Args:
        if_empty (str, optional): What to do if query returned no data. Defaults to "warn".

    Returns:
        pd.DataFrame: the DataFrame containing query results
    """
    try:
        columns = self._get_col_names()
    except ValueError:
        columns = None

    data = self.to_json()["data"]

    if data:
        df = pd.DataFrame(data[1:], columns=columns).replace("", np.nan)
    else:
        df = pd.DataFrame(columns=columns)

    if df.empty:
        self._handle_if_empty(if_empty)

    return df

to_json(self, timeout=(3.05, 1800))

Download query results to a dictionary. Note that Supermetrics API will sometimes hang and not return any error message, so we're adding a timeout to GET.

See requests docs for an explanation of why this timeout value will work on long-running queries but fail fast on connection issues.

Source code in viadot/sources/supermetrics.py
def to_json(self, timeout=(3.05, 60 * 30)) -> Dict[str, Any]:
    """Download query results to a dictionary.
    Note that Supermetrics API will sometimes hang and not return any error message,
    so we're adding a timeout to GET.

    See [requests docs](https://docs.python-requests.org/en/master/user/advanced/#timeouts)
    for an explanation of why this timeout value will work on long-running queries but fail fast
    on connection issues.
    """

    if not self.query_params:
        raise ValueError("Please build the query first")

    params = {"json": json.dumps(self.query_params)}
    headers = {"Authorization": f'Bearer {self.credentials["API_KEY"]}'}

    response = handle_api_response(
        url=self.API_ENDPOINT, params=params, headers=headers, timeout=timeout
    )
    return response.json()

viadot.sources.cloud_for_customers.CloudForCustomers (Source)

__init__(self, *args, *, report_url=None, url=None, endpoint=None, params=None, env='QA', credentials=None, **kwargs) special

Cloud for Customers connector build for fetching Odata source. See pyodata docs for an explanation how Odata works.

Parameters
report_url (str, optional): The url to the API in case of prepared report. Defaults to None.
url (str, optional): The url to the API. Defaults to None.
endpoint (str, optional): The endpoint of the API. Defaults to None.
params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format.
env (str, optional): The development environments. Defaults to 'QA'.
credentials (Dict[str, Any], optional): The credentials are populated with values from config file or this
parameter. Defaults to None than use credentials from local_config.
Source code in viadot/sources/cloud_for_customers.py
def __init__(
    self,
    *args,
    report_url: str = None,
    url: str = None,
    endpoint: str = None,
    params: Dict[str, Any] = None,
    env: str = "QA",
    credentials: Dict[str, Any] = None,
    **kwargs,
):
    """Cloud for Customers connector build for fetching Odata source.
    See [pyodata docs](https://pyodata.readthedocs.io/en/latest/index.html) for an explanation
    how Odata works.

    Parameters
    ----------
        report_url (str, optional): The url to the API in case of prepared report. Defaults to None.
        url (str, optional): The url to the API. Defaults to None.
        endpoint (str, optional): The endpoint of the API. Defaults to None.
        params (Dict[str, Any]): The query parameters like filter by creation date time. Defaults to json format.
        env (str, optional): The development environments. Defaults to 'QA'.
        credentials (Dict[str, Any], optional): The credentials are populated with values from config file or this
        parameter. Defaults to None than use credentials from local_config.
    """
    super().__init__(*args, **kwargs)

    try:
        DEFAULT_CREDENTIALS = local_config["CLOUD_FOR_CUSTOMERS"].get(env)
    except KeyError:
        DEFAULT_CREDENTIALS = None
    self.credentials = credentials or DEFAULT_CREDENTIALS or {}

    self.url = url or self.credentials.get("server")
    self.report_url = report_url

    if self.url is None and report_url is None:
        raise CredentialError("One of: ('url', 'report_url') is required.")

    self.is_report = bool(report_url)
    self.query_endpoint = endpoint

    if params:
        params_merged = self.DEFAULT_PARAMS.copy()
        params_merged.update(params)

        self.params = params_merged
    else:
        self.params = self.DEFAULT_PARAMS

    if self.url:
        self.full_url = urljoin(self.url, self.query_endpoint)

    super().__init__(*args, credentials=self.credentials, **kwargs)

get_response(self, url, params=None, timeout=(3.05, 1800))

Handle and raise Python exceptions during request. Using of url and service endpoint needs additional parameters stores in params. report_url contain additional params in their structure. In report_url scenario it can not contain params parameter.

Parameters:

Name Type Description Default
url str

the URL which trying to connect.

required
params Dict[str, Any]

Additional parameters like filter, used in case of normal url.

None
timeout tuple

the request times out. Defaults to (3.05, 60 * 30).

(3.05, 1800)

Returns:

Type Description
Response

requests.models.Response

Source code in viadot/sources/cloud_for_customers.py
def get_response(
    self, url: str, params: Dict[str, Any] = None, timeout: tuple = (3.05, 60 * 30)
) -> requests.models.Response:
    """Handle and raise Python exceptions during request. Using of url and service endpoint needs additional parameters
       stores in params. report_url contain additional params in their structure.
       In report_url scenario it can not contain params parameter.

    Args:
        url (str): the URL which trying to connect.
        params (Dict[str, Any], optional): Additional parameters like filter, used in case of normal url.
        Defaults to None used in case of report_url, which can not contain params.
        timeout (tuple, optional): the request times out. Defaults to (3.05, 60 * 30).

    Returns:
        requests.models.Response
    """
    username = self.credentials.get("username")
    pw = self.credentials.get("password")
    response = handle_api_response(
        url=url,
        params=params,
        auth=(username, pw),
        timeout=timeout,
    )
    return response

map_columns(self, url=None)

Fetch metadata from url used to column name map.

Parameters:

Name Type Description Default
url str

the URL which trying to fetch metadata. Defaults to None.

None

Returns:

Type Description
Dict[str, str]

Property Name as key mapped to the value of sap label.

Source code in viadot/sources/cloud_for_customers.py
def map_columns(self, url: str = None) -> Dict[str, str]:

    """Fetch metadata from url used to column name map.

    Args:
        url (str, optional): the URL which trying to fetch metadata. Defaults to None.

    Returns:
        Dict[str, str]: Property Name as key mapped to the value of sap label.
    """
    column_mapping = {}
    if url:
        username = self.credentials.get("username")
        pw = self.credentials.get("password")
        response = requests.get(url, auth=(username, pw))
        for sentence in response.text.split("/>"):
            result = re.search(
                r'(?<=Name=")([^"]+).+(sap:label=")([^"]+)+', sentence
            )
            if result:
                key = result.groups(0)[0]
                val = result.groups(0)[2]
                column_mapping[key] = val
    return column_mapping

response_to_entity_list(self, dirty_json, url)

Changing request json response to list.

Parameters:

Name Type Description Default
dirty_json Dict[str, Any]

json from response.

required
url str

the URL which trying to fetch metadata.

required

Returns:

Type Description
List

List of dictionaries.

Source code in viadot/sources/cloud_for_customers.py
def response_to_entity_list(self, dirty_json: Dict[str, Any], url: str) -> List:

    """Changing request json response to list.

    Args:
        dirty_json (Dict[str, Any]): json from response.
        url (str): the URL which trying to fetch metadata.

    Returns:
        List: List of dictionaries.
    """

    metadata_url = self.change_to_meta_url(url)
    column_maper_dict = self.map_columns(metadata_url)
    entity_list = []
    for element in dirty_json["d"]["results"]:
        new_entity = {}
        for key, object_of_interest in element.items():
            if key not in ["__metadata", "Photo", "", "Picture"]:
                if "{" not in str(object_of_interest):
                    new_key = column_maper_dict.get(key)
                    if new_key:
                        new_entity[new_key] = object_of_interest
                    else:
                        new_entity[key] = object_of_interest
        entity_list.append(new_entity)
    return entity_list

to_df(self, fields=None, if_empty='warn', dtype=None, **kwargs)

Returns records in a pandas DataFrame.

Parameters:

Name Type Description Default
fields List[str]

List of fields to put in DataFrame. Defaults to None.

None
dtype dict

The dtypes to use in the DataFrame. We catch this parameter here since

None
kwargs

The parameters to pass to DataFrame constructor.

{}
Source code in viadot/sources/cloud_for_customers.py
def to_df(
    self,
    fields: List[str] = None,
    if_empty: str = "warn",
    dtype: dict = None,
    **kwargs,
) -> pd.DataFrame:
    """Returns records in a pandas DataFrame.
    Args:
        fields (List[str], optional): List of fields to put in DataFrame. Defaults to None.
        dtype (dict, optional): The dtypes to use in the DataFrame. We catch this parameter here since
        pandas doesn't support passing dtypes (eg. as a dict) to the constructor.
        kwargs: The parameters to pass to DataFrame constructor.
    """
    records = self.to_records()
    df = pd.DataFrame(data=records, **kwargs)
    if dtype:
        df = df.astype(dtype)
    if fields:
        return df[fields]
    return df

to_records(self)

Download a list of entities in the records format

Source code in viadot/sources/cloud_for_customers.py
def to_records(self) -> List[Dict[str, Any]]:
    """
    Download a list of entities in the records format
    """
    if self.is_report:
        url = self.report_url
        return self._to_records_report(url=url)
    else:
        url = self.full_url
        return self._to_records_other(url=url)