"""Extraction Transformation 'Loading' pipeline.
ETL pipeline from Provencia SQL database to Pandas DataFrame in RAM
"""
import configparser
import logging
from functools import reduce
from pathlib import Path
import pandas as pd
import pandera.pandas as pa
from pandas._typing import Dtype, DtypeArg
from pandera.typing.pandas import DataFrame
from sqlalchemy import create_engine, text
try:
from .schemas import ProvenciaETLSchema
except ImportError:
from schemas import ProvenciaETLSchema
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ----- Utilities -----
def get_db_url() -> str:
"""Reads database credentials from config.ini and construct the database url."""
# initialize config parser
config = configparser.ConfigParser()
config_path = Path(__file__).parent / "config.ini"
# read config
if not Path.exists(config_path):
msg = (
f"Configuration file not found at: {config_path}. "
"Please create it from the 'config.ini.template'."
)
raise FileNotFoundError(msg)
config.read(config_path)
# format URL
db_config = config["database"]
host = db_config.get("host")
dbname = db_config.get("dbname")
user = db_config.get("user")
port = db_config.get("port")
password = db_config.get("password")
return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
# create sql engine once as global; may be None if config.ini is absent
try:
ENGINE = create_engine(get_db_url())
except FileNotFoundError:
ENGINE = None
def sql2df(query: str, dtype: DtypeArg) -> pd.DataFrame:
"""Run sql query, load result in a dataframe."""
with ENGINE.connect() as conn:
return pd.read_sql_query(text(query), conn, dtype=dtype)
def tpl2str(lst: list[str]) -> str:
"""Transform a list of str into a string tuple.
Utility for :func:`select_from_where`
Example:
>>> tpl2str(['a', 'b', 'c'])
'(a, b, c)'
"""
return f"({', '.join(k for k in lst)})"
def select_from_where(
cols: dict[str, Dtype],
table: str,
whr: dict[str, list[str]],
) -> pd.DataFrame:
"""Select columns from a database table where a condition is met.
The condition is in the form: (
k1 in (a1, b1, c1, ...) and
k2 in (a2, b2, c2, ...) ...)
with k being keys and 'a', 'b', 'c' being the element of the associated value in
the whr dictionary
"""
cols_str = ", ".join(list(cols.keys()))
query = f"SELECT {cols_str} FROM {table}"
if len(whr) > 0:
whr_str = " AND ".join([f"{k} IN {tpl2str(v)}" for k, v in whr.items()])
query += f" WHERE {whr_str}"
return sql2df(query, dtype=cols)
class ETLError(Exception):
"""Personal Error to control problematic data.
This allows distinction between controlled error and
uncontrolled error
"""
def __init__(self, message: str) -> None:
"""Initialize the error with a message."""
super().__init__(message)
@staticmethod
def check_df(
message: str,
df: pd.DataFrame,
wrong_sr: pd.Series,
) -> None:
"""Check if any row is wrong and raise error.
Error is `message` as first line and problematic rows of df in 2nd line
Args:
message: first line of the error message
df: the DataFrame
wrong_sr: boolean serie True where df is problematic
"""
if wrong_sr.to_numpy().any():
msg = message
msg = f"\n{df[wrong_sr]}"
raise ETLError(msg)
def date_indexing(df: pd.DataFrame) -> pd.DataFrame:
"""Convert date column to pd.Timestamp, sort and set them as index."""
df["date"] = pd.to_datetime(df["date"])
# sort by date
df = df.sort_values("date")
# set date as index
return df.set_index("date", verify_integrity=True)
def format_whr(
store_ids: list[str],
product_codes: list[int],
) -> dict[str, list[str]]:
"""Make a whr dict for store_ids and product_codes."""
return {
"store_id": [f"'{store_id}'" for store_id in store_ids],
"product_code": [str(product_code) for product_code in product_codes],
}
[docs]
def get_store_ids() -> pd.DataFrame:
"""Get all the store ids and name as DataFrame.
Specifically, the store_ids in public.order table.
Returns:
pd.DataFrame: the store ids as index, the store name as column
Examples:
>>> get_store_ids() # doctest: +NORMALIZE_WHITESPACE
name
store_id
JE Seynod
KV Saint-Jeoire-Prieuré
EV Faverges
BS Annecy-le-Vieux
MO Villeurbanne
... ...
LW Thonon-les-Bains
KM Saint-Jean-de-Moirans
FW Gex
GF Grésy-sur-Aix
EM Douvaine
"""
return sql2df(
"""
SELECT DISTINCT
ord.store_id,
sto.name
FROM
public.order AS ord
JOIN
store AS sto ON ord.store_id = sto.id;
""",
dtype={"store_id": "string", "name": "string"},
).set_index("store_id")
[docs]
def get_product_codes(store_ids: list[str]) -> pd.DataFrame:
"""Get all product codes and description in specified shops.
Args:
store_ids: a list of store ids to get product codes from
Returns:
pd.DataFrame: the product_codes as index, the description as column
Example:
>>> get_product_codes(['JE', 'KV', 'EV']) # doctest: +NORMALIZE_WHITESPACE
description
product_code
2870622000000 *PO-SAUTE X 3KG
2870557000000 PORC FOIE
2870549000000 KG SAUTE PORC FRAN
2477358000000 *EL-ROTI VEAU FARCI
2276357000000 *SA-SAUCI MENAGE X3
... ...
2477424000000 *VO-CUISSE DE POULE
2443970000000 SAUCISSON TRUFFE
2280328000000 EL-CREPINETTE X 3
2224892000000 GIGOT *** ENTIER S
2610547000000 BLOC DE DINDE RA
<BLANKLINE>
[519 rows x 1 columns]
"""
st = tpl2str([f"'{s}'" for s in store_ids])
return sql2df(
f"SELECT DISTINCT "
f" ord.product_code, "
f" pro.description "
f"FROM "
f" public.order AS ord "
f"JOIN "
f" product AS pro ON ord.product_code = pro.product_code "
f"WHERE store_id IN {st};",
dtype={"product_code": "Int64", "description": "string"},
).set_index("product_code")
# ----- Extract: SQL -> DataFrame -----
def extract_order(
store_ids: list[str],
product_codes: list[int],
) -> pd.DataFrame:
"""Extract relevant columns from public.order.
Performs some renaming, type checking and type converting.
Act on the full DataFrame over every stores and product_codes
"""
df = select_from_where(
cols={
"store_id": "string",
"product_code": "Int64",
"date": "datetime64[ns]",
"delivered": "Int32",
"delivered_weight": "Float64",
"purchase_price": "Int32",
"unit_of_measurement": pd.CategoricalDtype(categories=["PI", "KG"]),
},
table="public.order",
whr=format_whr(store_ids, product_codes),
)
# Treat nan in delivered_weight, purchase_price, unit_of_measurement
isna = pd.Series(
df[["delivered_weight", "purchase_price", "unit_of_measurement"]]
.isna()
.any(axis=1)
)
if isna.any():
logger.info(
"nan detected in delivered_weight, purchase_price or unit_of_measurement:\n"
"%s",
df[isna],
)
isna_delivered = df[isna]["delivered"]
ETLError.check_df(
"detected nans are not associated with 0 delivery",
pd.DataFrame(
df[isna][
[
"delivered",
"delivered_weight",
"unit_of_measurement",
"product_code",
"store_id",
]
]
),
(isna_delivered != 0),
)
logger.info("they are associated with 0 delivery, removing them")
df = pd.DataFrame(df[~isna])
# Rename columns
# | delivered -> ordered
# | delivered_weight -> ordered_weight
# NOTE order.delivered where date=d is the delivery at date=d+1
# that is the order at date d
df = df.rename(
columns={"delivered": "ordered", "delivered_weight": "ordered_weight"},
)
# Ordered
ETLError.check_df("Orders cannot be negative", df, df["ordered"] < 0)
# Ordered_weight
ETLError.check_df("Ordered weight cannot be negative", df, df["ordered_weight"] < 0)
# Purchase price
ETLError.check_df("Purchase price cannot be negative", df, df["purchase_price"] < 0)
# Check for nans
ETLError.check_df("Nan in extracted order", df, pd.Series(df.isna().any(axis=1)))
return df
def extract_product_kpi_daily(
store_ids: list[str],
product_codes: list[int],
) -> pd.DataFrame:
"""Extract relevant columns from product_kpi_daily.
Performs type checking and type conversion.
Act on the full DataFrame over every stores and product_codes.
Args:
store_ids: list of store_ids
product_codes: list of product codes
"""
# if True raises an error when `quantity` or `gross` is negative.
error_negative_quantity_or_gross = False
# SQL extraction
df = select_from_where(
cols={
"store_id": "string",
"product_code": "Int64",
"date": "datetime64[ns]",
"quantity": "Int32",
"gross": "Int32",
},
table="product_kpi_daily",
whr=format_whr(store_ids, product_codes),
)
# Quantity
# | check for int32 conversion
ETLError.check_df(
"Quantity cannot be negative with error_negative_quantity_or_gross",
df,
(df["quantity"] < 0) & error_negative_quantity_or_gross,
)
# Gross
# | check for int32 conversion
ETLError.check_df(
"Gross cannot be negative with error_negative_quantity_or_gross",
df,
(df.gross < 0) & error_negative_quantity_or_gross,
)
# Check for nans
ETLError.check_df(
"Nan in extracted product_kpi_daily", df, pd.Series(df.isna().any(axis=1))
)
return df
def extract_stock_level(store_ids: list[str], product_codes: list[int]) -> pd.DataFrame:
"""Extract relevant columns from stock_level.
Fill all NA entries with 0
Act on the full DataFrame over every stores and product_codes
"""
df = select_from_where(
cols={
"store_id": "string",
"product_code": "Int64",
"date": "datetime64[ns]",
"wasted": "Int32",
"stolen": "Int32",
},
table="stock_level",
whr=format_whr(store_ids, product_codes),
)
# fillna with 0
df = df.fillna(0)
# sum wasted and stolen
df["lost"] = df["wasted"] + df["stolen"]
# Check for nans
ETLError.check_df(
"Nan in extracted stock_level", df, pd.Series(df.isna().any(axis=1))
)
return df.drop(columns=["wasted", "stolen"])
# ----- Transform: DataFrame -> DataFrame -----
def grp_order(df: pd.DataFrame) -> pd.DataFrame:
"""Transform DataFrame extracted from order, grouped by store_id, product_code.
Args:
df: dataframe from :func:`extract_order` grouped by store_id, product_code
Returns:
DataFrame: indexed with date, enriched with total_paid, ordered_weight
, unit_of_measurement dropped and upsampled.
"""
df = date_indexing(df)
# TOTAL_PAID
# check unit of measurements is coherent
um = pd.unique(df["unit_of_measurement"])
if len(um) != 1:
msg = f"uncoherent or empty unit_of_measurement\n{um}"
raise ETLError(msg)
um = um[0]
# | compute total_paid
if um == "KG":
df["total_paid"] = df["ordered_weight"] * df["purchase_price"]
elif um == "PI":
df["total_paid"] = (df["ordered"] * df["purchase_price"]).astype("Float64")
else:
msg = f"unknown unit {um}"
raise ETLError(msg)
# check dtype
if df["total_paid"].dtype != "Float64":
msg = f"total_paid should be Float64, got {df['total_paid'].dtype}"
raise ETLError(msg)
# CLEAN useless columns
df = df.drop(columns=["ordered_weight", "unit_of_measurement"])
# # UPSAMPLING
df_up = pd.DataFrame(index=df.asfreq("D").index)
# | purchase_price: filled with ffill
k = "purchase_price"
df_up[k] = df[k].asfreq("D", method="ffill")
# | ordered: no line => 0 order according to MK
k = "ordered"
df_up[k] = df[k].asfreq("D", fill_value=0)
# | total_paid: no line => 0 paid
k = "total_paid"
df_up[k] = df[k].asfreq("D", fill_value=0)
df = df_up
ETLError.check_df(
"Nan while transforming order", df, pd.Series(df.isna().any(axis=1))
)
return df
def grp_product_kpi_daily(df: pd.DataFrame) -> pd.DataFrame:
"""Transform DataFrame extracted from product_kpi_daily.
Args:
df: dataframe from :func:`extract_product_kpi_daily`
grouped by store_id, product_code
Returns:
pd.DataFrame: date indexed, upsampled
"""
df = date_indexing(df)
# UPSAMPLING
df_up = pd.DataFrame(index=df.asfreq("D").index)
# | sold_yd: no line => 0 sold according to MK
k = "quantity"
df_up[k] = df[k].asfreq("D", fill_value=0)
# | gross_yd: no line => 0 gross according to MK
k = "gross"
df_up[k] = df[k].asfreq("D", fill_value=0)
df = df_up
df = df.rename(columns={"quantity": "sold"})
ETLError.check_df(
"Nan while transforming product_kpi_daily", df, pd.Series(df.isna().any(axis=1))
)
return df
def grp_delivery(df_order: pd.DataFrame) -> pd.DataFrame:
"""Transform order into delivery.
This has to be done before merging so that inner will take care
of non aligned index
"""
# the delivery at day d is what has been ordered at day d-1
df = df_order["ordered"].shift(1, freq="D").to_frame("delivered")
ETLError.check_df(
"Nan while transforming delivery", df, pd.Series(df.isna().any(axis=1))
)
return df
def enrich_with_stock(df: pd.DataFrame, df_stock_level: pd.DataFrame) -> pd.DataFrame:
"""Enrich base dataframe with stock."""
# date indexing
df_stock_level = df_stock_level.groupby(["store_id", "product_code"]).apply(
date_indexing, include_groups=False
)
# upsample
# NOTE PANDAS if df_stock_level is empty, date is kept as column instead of
# moving it to index
# >>> import pandas as pd
# >>> df = pd.DataFrame(dict(id=[], date=[], a=[]))
# >>> df = df.groupby('id').apply(lambda df: df.set_index('date'),
# include_groups=False)
# >>> df.columns
if len(df_stock_level) == 0:
df_stock_level = df_stock_level.reset_index().set_index(
["store_id", "product_code", "date"]
)
df_stock_level = df_stock_level.reindex(df.index)
df_stock_level = df_stock_level.fillna(0)
# enrich base df with lost before stock computaion
df["lost"] = df_stock_level["lost"]
# compute stock in the morning
def stock_logic(df: pd.DataFrame) -> pd.DataFrame:
"""Stock computation logic."""
# groupby on index keeps store_id, product_code
df = df.droplevel(["store_id", "product_code"])
# stock variation: stock_evening - stock_morning
delta = df["delivered"] - df["sold"] - df["lost"]
# remove mean to avoid diverging stocks
# convert back to initial dtype to stay in integers
delta = (delta - delta.mean()).astype(delta.dtype)
# apply cumsum
# cumsum changes dtype to int64
stock = delta.cumsum().astype(delta.dtype)
# remove min to ensure positive stock
stock = stock - stock.min()
df = df.drop(columns=["lost"])
df["stock_evening"] = stock
return df
df = df.groupby(level=["store_id", "product_code"]).apply(
stock_logic, include_groups=False
)
ETLError.check_df(
"Nan while enriching with stock", df, pd.Series(df.isna().any(axis=1))
)
return df
[docs]
@pa.check_types
def etl_pipeline(
store_ids: list[str],
product_codes: list[int],
) -> DataFrame[ProvenciaETLSchema]:
"""Provencia SQL connector.
Args:
store_ids: the list of store ids
product_codes: the list of product codes
Returns:
DataFrame[ProvenciaETLSchema]: the Provencia ETL dataframe,
see :class:`ProvenciaETLSchema` for a description of its structure.
Examples:
>>> etl_pipeline( # doctest: +NORMALIZE_WHITESPACE
... ['JE', 'KV', 'EV'],
... [2870622000000, 2870557000000, 2870549000000])
delivered purchase_price ordered total_paid sold gross stock_evening
store_id product_code date
EV 2870549000000 2023-12-05 4 647 0 0.0 0 0 5
2023-12-06 0 647 0 0.0 0 0 5
2023-12-07 0 647 0 0.0 0 0 5
2023-12-08 0 647 0 0.0 4 1834 1
2023-12-09 0 647 0 0.0 0 0 1
... ... ... ... ... ... ... ...
2870622000000 2024-07-28 0 560 0 0.0 0 0 0
2024-07-29 0 560 0 0.0 0 0 0
2024-07-30 0 560 0 0.0 0 0 0
2024-07-31 0 560 0 0.0 0 0 0
2024-08-01 0 560 1 1680.0 0 0 0
<BLANKLINE>
[517 rows x 7 columns]
Note:
product_code=2870626000000 (foie de porc) is problematic in order table
"""
# EXTRACT SQL -> DataFrame
# columns: total_paid, purchase_price, ordered
df_order = extract_order(store_ids, product_codes)
# columns: sold_yd, gross_yd
df_product_kpi_daily = extract_product_kpi_daily(store_ids, product_codes)
# columns: lost -
df_stock_level = extract_stock_level(store_ids, product_codes)
# TRANSFORM
df_order = df_order.groupby(
["store_id", "product_code"],
).apply(grp_order, include_groups=False)
df_product_kpi_daily = df_product_kpi_daily.groupby(
["store_id", "product_code"],
).apply(grp_product_kpi_daily, include_groups=False)
df_delivery = (
df_order.reset_index(
level=["store_id", "product_code"],
)
.groupby(
["store_id", "product_code"],
)
.apply(grp_delivery, include_groups=False)
)
# MERGE with intersection
df = reduce(
lambda left, right: left.merge(
right, left_index=True, right_index=True, how="inner"
),
[df_order, df_product_kpi_daily, df_delivery],
)
# ENRICH with stock
df = enrich_with_stock(df, df_stock_level)
# checks
ETLError.check_df(
"Nan introduced by pipeline", df, pd.Series(df.isna().any(axis=1))
)
if len(df.index) == 0:
msg = "Empty DataFrame"
raise ETLError(msg)
# NOTE: This order is causal, put the process / workflow somewhere
df = df.reindex(
columns=[
"delivered",
"purchase_price",
"ordered",
"total_paid",
"sold",
"gross",
"stock_evening",
]
)
df = ProvenciaETLSchema.validate(df)
logger.info("ETL DataFrame validated")
return df