Source code for examples.provencia.etl

"""Extraction Transformation 'Loading' pipeline.

ETL pipeline from Provencia SQL database to Pandas DataFrame in RAM
"""

import configparser
import logging
from functools import reduce
from pathlib import Path

import pandas as pd
import pandera.pandas as pa
from pandas._typing import Dtype, DtypeArg
from pandera.typing.pandas import DataFrame
from sqlalchemy import create_engine, text

try:
    from .schemas import ProvenciaETLSchema
except ImportError:
    from schemas import ProvenciaETLSchema

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ----- Utilities -----


def get_db_url() -> str:
    """Reads database credentials from config.ini and construct the database url."""
    # initialize config parser
    config = configparser.ConfigParser()
    config_path = Path(__file__).parent / "config.ini"

    # read config
    if not Path.exists(config_path):
        msg = (
            f"Configuration file not found at: {config_path}. "
            "Please create it from the 'config.ini.template'."
        )
        raise FileNotFoundError(msg)
    config.read(config_path)

    # format URL
    db_config = config["database"]
    host = db_config.get("host")
    dbname = db_config.get("dbname")
    user = db_config.get("user")
    port = db_config.get("port")
    password = db_config.get("password")
    return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"


# create sql engine once as global; may be None if config.ini is absent
try:
    ENGINE = create_engine(get_db_url())
except FileNotFoundError:
    ENGINE = None


def sql2df(query: str, dtype: DtypeArg) -> pd.DataFrame:
    """Run sql query, load result in a dataframe."""
    with ENGINE.connect() as conn:
        return pd.read_sql_query(text(query), conn, dtype=dtype)


def tpl2str(lst: list[str]) -> str:
    """Transform a list of str into a string tuple.

    Utility for :func:`select_from_where`

    Example:
        >>> tpl2str(['a', 'b', 'c'])
        '(a, b, c)'
    """
    return f"({', '.join(k for k in lst)})"


def select_from_where(
    cols: dict[str, Dtype],
    table: str,
    whr: dict[str, list[str]],
) -> pd.DataFrame:
    """Select columns from a database table where a condition is met.

    The condition is in the form: (
        k1 in (a1, b1, c1, ...) and
        k2 in (a2, b2, c2, ...) ...)
    with k being keys and 'a', 'b', 'c' being the element of the associated value in
    the whr dictionary
    """
    cols_str = ", ".join(list(cols.keys()))
    query = f"SELECT {cols_str} FROM {table}"
    if len(whr) > 0:
        whr_str = " AND ".join([f"{k} IN {tpl2str(v)}" for k, v in whr.items()])
        query += f" WHERE {whr_str}"

    return sql2df(query, dtype=cols)


class ETLError(Exception):
    """Personal Error to control problematic data.

    This allows distinction between controlled error and
    uncontrolled error
    """

    def __init__(self, message: str) -> None:
        """Initialize the error with a message."""
        super().__init__(message)

    @staticmethod
    def check_df(
        message: str,
        df: pd.DataFrame,
        wrong_sr: pd.Series,
    ) -> None:
        """Check if any row is wrong and raise error.

        Error is `message` as first line and problematic rows of df in 2nd line

        Args:
            message: first line of the error message
            df: the DataFrame
            wrong_sr: boolean serie True where df is problematic
        """
        if wrong_sr.to_numpy().any():
            msg = message
            msg = f"\n{df[wrong_sr]}"
            raise ETLError(msg)


def date_indexing(df: pd.DataFrame) -> pd.DataFrame:
    """Convert date column to pd.Timestamp, sort and set them as index."""
    df["date"] = pd.to_datetime(df["date"])
    # sort by date
    df = df.sort_values("date")
    # set date as index
    return df.set_index("date", verify_integrity=True)


def format_whr(
    store_ids: list[str],
    product_codes: list[int],
) -> dict[str, list[str]]:
    """Make a whr dict for store_ids and product_codes."""
    return {
        "store_id": [f"'{store_id}'" for store_id in store_ids],
        "product_code": [str(product_code) for product_code in product_codes],
    }


[docs] def get_store_ids() -> pd.DataFrame: """Get all the store ids and name as DataFrame. Specifically, the store_ids in public.order table. Returns: pd.DataFrame: the store ids as index, the store name as column Examples: >>> get_store_ids() # doctest: +NORMALIZE_WHITESPACE name store_id JE Seynod KV Saint-Jeoire-Prieuré EV Faverges BS Annecy-le-Vieux MO Villeurbanne ... ... LW Thonon-les-Bains KM Saint-Jean-de-Moirans FW Gex GF Grésy-sur-Aix EM Douvaine """ return sql2df( """ SELECT DISTINCT ord.store_id, sto.name FROM public.order AS ord JOIN store AS sto ON ord.store_id = sto.id; """, dtype={"store_id": "string", "name": "string"}, ).set_index("store_id")
[docs] def get_product_codes(store_ids: list[str]) -> pd.DataFrame: """Get all product codes and description in specified shops. Args: store_ids: a list of store ids to get product codes from Returns: pd.DataFrame: the product_codes as index, the description as column Example: >>> get_product_codes(['JE', 'KV', 'EV']) # doctest: +NORMALIZE_WHITESPACE description product_code 2870622000000 *PO-SAUTE X 3KG 2870557000000 PORC FOIE 2870549000000 KG SAUTE PORC FRAN 2477358000000 *EL-ROTI VEAU FARCI 2276357000000 *SA-SAUCI MENAGE X3 ... ... 2477424000000 *VO-CUISSE DE POULE 2443970000000 SAUCISSON TRUFFE 2280328000000 EL-CREPINETTE X 3 2224892000000 GIGOT *** ENTIER S 2610547000000 BLOC DE DINDE RA <BLANKLINE> [519 rows x 1 columns] """ st = tpl2str([f"'{s}'" for s in store_ids]) return sql2df( f"SELECT DISTINCT " f" ord.product_code, " f" pro.description " f"FROM " f" public.order AS ord " f"JOIN " f" product AS pro ON ord.product_code = pro.product_code " f"WHERE store_id IN {st};", dtype={"product_code": "Int64", "description": "string"}, ).set_index("product_code")
# ----- Extract: SQL -> DataFrame ----- def extract_order( store_ids: list[str], product_codes: list[int], ) -> pd.DataFrame: """Extract relevant columns from public.order. Performs some renaming, type checking and type converting. Act on the full DataFrame over every stores and product_codes """ df = select_from_where( cols={ "store_id": "string", "product_code": "Int64", "date": "datetime64[ns]", "delivered": "Int32", "delivered_weight": "Float64", "purchase_price": "Int32", "unit_of_measurement": pd.CategoricalDtype(categories=["PI", "KG"]), }, table="public.order", whr=format_whr(store_ids, product_codes), ) # Treat nan in delivered_weight, purchase_price, unit_of_measurement isna = pd.Series( df[["delivered_weight", "purchase_price", "unit_of_measurement"]] .isna() .any(axis=1) ) if isna.any(): logger.info( "nan detected in delivered_weight, purchase_price or unit_of_measurement:\n" "%s", df[isna], ) isna_delivered = df[isna]["delivered"] ETLError.check_df( "detected nans are not associated with 0 delivery", pd.DataFrame( df[isna][ [ "delivered", "delivered_weight", "unit_of_measurement", "product_code", "store_id", ] ] ), (isna_delivered != 0), ) logger.info("they are associated with 0 delivery, removing them") df = pd.DataFrame(df[~isna]) # Rename columns # | delivered -> ordered # | delivered_weight -> ordered_weight # NOTE order.delivered where date=d is the delivery at date=d+1 # that is the order at date d df = df.rename( columns={"delivered": "ordered", "delivered_weight": "ordered_weight"}, ) # Ordered ETLError.check_df("Orders cannot be negative", df, df["ordered"] < 0) # Ordered_weight ETLError.check_df("Ordered weight cannot be negative", df, df["ordered_weight"] < 0) # Purchase price ETLError.check_df("Purchase price cannot be negative", df, df["purchase_price"] < 0) # Check for nans ETLError.check_df("Nan in extracted order", df, pd.Series(df.isna().any(axis=1))) return df def extract_product_kpi_daily( store_ids: list[str], product_codes: list[int], ) -> pd.DataFrame: """Extract relevant columns from product_kpi_daily. Performs type checking and type conversion. Act on the full DataFrame over every stores and product_codes. Args: store_ids: list of store_ids product_codes: list of product codes """ # if True raises an error when `quantity` or `gross` is negative. error_negative_quantity_or_gross = False # SQL extraction df = select_from_where( cols={ "store_id": "string", "product_code": "Int64", "date": "datetime64[ns]", "quantity": "Int32", "gross": "Int32", }, table="product_kpi_daily", whr=format_whr(store_ids, product_codes), ) # Quantity # | check for int32 conversion ETLError.check_df( "Quantity cannot be negative with error_negative_quantity_or_gross", df, (df["quantity"] < 0) & error_negative_quantity_or_gross, ) # Gross # | check for int32 conversion ETLError.check_df( "Gross cannot be negative with error_negative_quantity_or_gross", df, (df.gross < 0) & error_negative_quantity_or_gross, ) # Check for nans ETLError.check_df( "Nan in extracted product_kpi_daily", df, pd.Series(df.isna().any(axis=1)) ) return df def extract_stock_level(store_ids: list[str], product_codes: list[int]) -> pd.DataFrame: """Extract relevant columns from stock_level. Fill all NA entries with 0 Act on the full DataFrame over every stores and product_codes """ df = select_from_where( cols={ "store_id": "string", "product_code": "Int64", "date": "datetime64[ns]", "wasted": "Int32", "stolen": "Int32", }, table="stock_level", whr=format_whr(store_ids, product_codes), ) # fillna with 0 df = df.fillna(0) # sum wasted and stolen df["lost"] = df["wasted"] + df["stolen"] # Check for nans ETLError.check_df( "Nan in extracted stock_level", df, pd.Series(df.isna().any(axis=1)) ) return df.drop(columns=["wasted", "stolen"]) # ----- Transform: DataFrame -> DataFrame ----- def grp_order(df: pd.DataFrame) -> pd.DataFrame: """Transform DataFrame extracted from order, grouped by store_id, product_code. Args: df: dataframe from :func:`extract_order` grouped by store_id, product_code Returns: DataFrame: indexed with date, enriched with total_paid, ordered_weight , unit_of_measurement dropped and upsampled. """ df = date_indexing(df) # TOTAL_PAID # check unit of measurements is coherent um = pd.unique(df["unit_of_measurement"]) if len(um) != 1: msg = f"uncoherent or empty unit_of_measurement\n{um}" raise ETLError(msg) um = um[0] # | compute total_paid if um == "KG": df["total_paid"] = df["ordered_weight"] * df["purchase_price"] elif um == "PI": df["total_paid"] = (df["ordered"] * df["purchase_price"]).astype("Float64") else: msg = f"unknown unit {um}" raise ETLError(msg) # check dtype if df["total_paid"].dtype != "Float64": msg = f"total_paid should be Float64, got {df['total_paid'].dtype}" raise ETLError(msg) # CLEAN useless columns df = df.drop(columns=["ordered_weight", "unit_of_measurement"]) # # UPSAMPLING df_up = pd.DataFrame(index=df.asfreq("D").index) # | purchase_price: filled with ffill k = "purchase_price" df_up[k] = df[k].asfreq("D", method="ffill") # | ordered: no line => 0 order according to MK k = "ordered" df_up[k] = df[k].asfreq("D", fill_value=0) # | total_paid: no line => 0 paid k = "total_paid" df_up[k] = df[k].asfreq("D", fill_value=0) df = df_up ETLError.check_df( "Nan while transforming order", df, pd.Series(df.isna().any(axis=1)) ) return df def grp_product_kpi_daily(df: pd.DataFrame) -> pd.DataFrame: """Transform DataFrame extracted from product_kpi_daily. Args: df: dataframe from :func:`extract_product_kpi_daily` grouped by store_id, product_code Returns: pd.DataFrame: date indexed, upsampled """ df = date_indexing(df) # UPSAMPLING df_up = pd.DataFrame(index=df.asfreq("D").index) # | sold_yd: no line => 0 sold according to MK k = "quantity" df_up[k] = df[k].asfreq("D", fill_value=0) # | gross_yd: no line => 0 gross according to MK k = "gross" df_up[k] = df[k].asfreq("D", fill_value=0) df = df_up df = df.rename(columns={"quantity": "sold"}) ETLError.check_df( "Nan while transforming product_kpi_daily", df, pd.Series(df.isna().any(axis=1)) ) return df def grp_delivery(df_order: pd.DataFrame) -> pd.DataFrame: """Transform order into delivery. This has to be done before merging so that inner will take care of non aligned index """ # the delivery at day d is what has been ordered at day d-1 df = df_order["ordered"].shift(1, freq="D").to_frame("delivered") ETLError.check_df( "Nan while transforming delivery", df, pd.Series(df.isna().any(axis=1)) ) return df def enrich_with_stock(df: pd.DataFrame, df_stock_level: pd.DataFrame) -> pd.DataFrame: """Enrich base dataframe with stock.""" # date indexing df_stock_level = df_stock_level.groupby(["store_id", "product_code"]).apply( date_indexing, include_groups=False ) # upsample # NOTE PANDAS if df_stock_level is empty, date is kept as column instead of # moving it to index # >>> import pandas as pd # >>> df = pd.DataFrame(dict(id=[], date=[], a=[])) # >>> df = df.groupby('id').apply(lambda df: df.set_index('date'), # include_groups=False) # >>> df.columns if len(df_stock_level) == 0: df_stock_level = df_stock_level.reset_index().set_index( ["store_id", "product_code", "date"] ) df_stock_level = df_stock_level.reindex(df.index) df_stock_level = df_stock_level.fillna(0) # enrich base df with lost before stock computaion df["lost"] = df_stock_level["lost"] # compute stock in the morning def stock_logic(df: pd.DataFrame) -> pd.DataFrame: """Stock computation logic.""" # groupby on index keeps store_id, product_code df = df.droplevel(["store_id", "product_code"]) # stock variation: stock_evening - stock_morning delta = df["delivered"] - df["sold"] - df["lost"] # remove mean to avoid diverging stocks # convert back to initial dtype to stay in integers delta = (delta - delta.mean()).astype(delta.dtype) # apply cumsum # cumsum changes dtype to int64 stock = delta.cumsum().astype(delta.dtype) # remove min to ensure positive stock stock = stock - stock.min() df = df.drop(columns=["lost"]) df["stock_evening"] = stock return df df = df.groupby(level=["store_id", "product_code"]).apply( stock_logic, include_groups=False ) ETLError.check_df( "Nan while enriching with stock", df, pd.Series(df.isna().any(axis=1)) ) return df
[docs] @pa.check_types def etl_pipeline( store_ids: list[str], product_codes: list[int], ) -> DataFrame[ProvenciaETLSchema]: """Provencia SQL connector. Args: store_ids: the list of store ids product_codes: the list of product codes Returns: DataFrame[ProvenciaETLSchema]: the Provencia ETL dataframe, see :class:`ProvenciaETLSchema` for a description of its structure. Examples: >>> etl_pipeline( # doctest: +NORMALIZE_WHITESPACE ... ['JE', 'KV', 'EV'], ... [2870622000000, 2870557000000, 2870549000000]) delivered purchase_price ordered total_paid sold gross stock_evening store_id product_code date EV 2870549000000 2023-12-05 4 647 0 0.0 0 0 5 2023-12-06 0 647 0 0.0 0 0 5 2023-12-07 0 647 0 0.0 0 0 5 2023-12-08 0 647 0 0.0 4 1834 1 2023-12-09 0 647 0 0.0 0 0 1 ... ... ... ... ... ... ... ... 2870622000000 2024-07-28 0 560 0 0.0 0 0 0 2024-07-29 0 560 0 0.0 0 0 0 2024-07-30 0 560 0 0.0 0 0 0 2024-07-31 0 560 0 0.0 0 0 0 2024-08-01 0 560 1 1680.0 0 0 0 <BLANKLINE> [517 rows x 7 columns] Note: product_code=2870626000000 (foie de porc) is problematic in order table """ # EXTRACT SQL -> DataFrame # columns: total_paid, purchase_price, ordered df_order = extract_order(store_ids, product_codes) # columns: sold_yd, gross_yd df_product_kpi_daily = extract_product_kpi_daily(store_ids, product_codes) # columns: lost - df_stock_level = extract_stock_level(store_ids, product_codes) # TRANSFORM df_order = df_order.groupby( ["store_id", "product_code"], ).apply(grp_order, include_groups=False) df_product_kpi_daily = df_product_kpi_daily.groupby( ["store_id", "product_code"], ).apply(grp_product_kpi_daily, include_groups=False) df_delivery = ( df_order.reset_index( level=["store_id", "product_code"], ) .groupby( ["store_id", "product_code"], ) .apply(grp_delivery, include_groups=False) ) # MERGE with intersection df = reduce( lambda left, right: left.merge( right, left_index=True, right_index=True, how="inner" ), [df_order, df_product_kpi_daily, df_delivery], ) # ENRICH with stock df = enrich_with_stock(df, df_stock_level) # checks ETLError.check_df( "Nan introduced by pipeline", df, pd.Series(df.isna().any(axis=1)) ) if len(df.index) == 0: msg = "Empty DataFrame" raise ETLError(msg) # NOTE: This order is causal, put the process / workflow somewhere df = df.reindex( columns=[ "delivered", "purchase_price", "ordered", "total_paid", "sold", "gross", "stock_evening", ] ) df = ProvenciaETLSchema.validate(df) logger.info("ETL DataFrame validated") return df