|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Tuple, Any, List |
|
|
|
|
|
import duckdb |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib |
|
|
matplotlib.use("Agg") |
|
|
import matplotlib.pyplot as plt |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard" |
|
|
TABLE_FQN = "my_db.main.masterdataset_v" |
|
|
VIEW_FQN = "my_db.main.positions_v" |
|
|
EXPORT_DIR = Path("exports") |
|
|
EXPORT_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
PRODUCT_ASSETS = [ |
|
|
"loan", "overdraft", "advances", "bills", "bill", |
|
|
"tbond", "t-bond", "tbill", "t-bill", "repo_asset" |
|
|
] |
|
|
PRODUCT_SOF = [ |
|
|
"fd", "term_deposit", "td", "savings", "current", |
|
|
"call", "repo_liab" |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_float(x, default: float = 0.0) -> float: |
|
|
try: |
|
|
if x is None or (isinstance(x, float) and np.isnan(x)): |
|
|
return default |
|
|
return float(x) |
|
|
except Exception: |
|
|
return default |
|
|
|
|
|
def zeros_like_index(index) -> pd.Series: |
|
|
return pd.Series([0] * len(index), index=index) |
|
|
|
|
|
def connect_md() -> duckdb.DuckDBPyConnection: |
|
|
token = os.environ.get("MOTHERDUCK_TOKEN", "") |
|
|
if not token: |
|
|
raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in your Space β Settings β Secrets.") |
|
|
try: |
|
|
return duckdb.connect(f"md:?motherduck_token={token}") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"MotherDuck connection failed: {e}") from e |
|
|
|
|
|
def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]: |
|
|
q = f""" |
|
|
SELECT lower(column_name) AS col |
|
|
FROM information_schema.columns |
|
|
WHERE table_schema = split_part('{table_fqn}', '.', 2) |
|
|
AND table_name = split_part('{table_fqn}', '.', 3) |
|
|
""" |
|
|
df = conn.execute(q).fetchdf() |
|
|
return df["col"].tolist() |
|
|
|
|
|
def build_view_sql(existing_cols: List[str]) -> str: |
|
|
wanted = [ |
|
|
"as_of_date", "product", "months", "segments", |
|
|
"currency", "Portfolio_value", "Interest_rate", |
|
|
"days_to_maturity" |
|
|
] |
|
|
select_list = [] |
|
|
for c in wanted: |
|
|
if c.lower() in existing_cols: |
|
|
select_list.append(c) |
|
|
else: |
|
|
|
|
|
if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"): |
|
|
select_list.append(f"CAST(NULL AS DOUBLE) AS {c}") |
|
|
else: |
|
|
select_list.append(f"CAST(NULL AS VARCHAR) AS {c}") |
|
|
|
|
|
sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF]) |
|
|
asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS]) |
|
|
|
|
|
bucket_case = ( |
|
|
f"CASE " |
|
|
f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' " |
|
|
f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' " |
|
|
f"ELSE 'Unknown' END AS bucket" |
|
|
) |
|
|
|
|
|
select_sql = ",\n ".join(select_list + [bucket_case]) |
|
|
return f""" |
|
|
CREATE OR REPLACE VIEW {VIEW_FQN} AS |
|
|
SELECT |
|
|
{select_sql} |
|
|
FROM {TABLE_FQN}; |
|
|
""" |
|
|
|
|
|
def ensure_view(conn: duckdb.DuckDBPyConnection, existing_cols: List[str]): |
|
|
required = {"product", "portfolio_value", "days_to_maturity"} |
|
|
if not required.issubset(set(existing_cols)): |
|
|
raise RuntimeError( |
|
|
f"Source table {TABLE_FQN} must contain columns {sorted(required)}; " |
|
|
f"found {sorted(existing_cols)}" |
|
|
) |
|
|
conn.execute(build_view_sql(existing_cols)) |
|
|
|
|
|
def fetch_kpis(conn: duckdb.DuckDBPyConnection) -> Tuple[float, float, float]: |
|
|
sql = f""" |
|
|
SELECT |
|
|
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1, |
|
|
COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1, |
|
|
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) |
|
|
- COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1 |
|
|
FROM {VIEW_FQN}; |
|
|
""" |
|
|
df = conn.execute(sql).fetchdf() |
|
|
if df.empty: |
|
|
return 0.0, 0.0, 0.0 |
|
|
return safe_float(df["assets_t1"].iloc[0]), safe_float(df["sof_t1"].iloc[0]), safe_float(df["net_gap_t1"].iloc[0]) |
|
|
|
|
|
def fetch_ladder(conn: duckdb.DuckDBPyConnection) -> pd.DataFrame: |
|
|
sql = f""" |
|
|
SELECT |
|
|
CASE |
|
|
WHEN days_to_maturity <= 1 THEN 'T+1' |
|
|
WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7' |
|
|
WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30' |
|
|
ELSE 'T+31+' |
|
|
END AS time_bucket, |
|
|
bucket, |
|
|
SUM(Portfolio_value) AS amount |
|
|
FROM {VIEW_FQN} |
|
|
GROUP BY 1,2 |
|
|
ORDER BY 1,2; |
|
|
""" |
|
|
df = conn.execute(sql).fetchdf() |
|
|
if df.empty: |
|
|
return pd.DataFrame({"time_bucket": [], "bucket": [], "amount": []}) |
|
|
return df |
|
|
|
|
|
def fetch_irr(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> pd.DataFrame: |
|
|
has_months = "months" in cols |
|
|
has_ir = "interest_rate" in cols |
|
|
t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0" |
|
|
if has_months: |
|
|
t_expr += " WHEN months IS NOT NULL THEN months/12.0" |
|
|
t_expr += " ELSE NULL END" |
|
|
y_expr = "(Interest_rate/100.0)" if has_ir else "0.0" |
|
|
sql = f""" |
|
|
SELECT |
|
|
bucket, |
|
|
SUM(Portfolio_value) AS pv_sum, |
|
|
SUM(Portfolio_value * {t_expr}) / NULLIF(SUM(Portfolio_value),0) AS dur_mac, |
|
|
SUM(Portfolio_value * ({t_expr})/(1+({y_expr}))) / NULLIF(SUM(Portfolio_value),0) AS dur_mod |
|
|
FROM {VIEW_FQN} |
|
|
GROUP BY bucket; |
|
|
""" |
|
|
df = conn.execute(sql).fetchdf() |
|
|
if df.empty: |
|
|
return pd.DataFrame({"bucket": [], "pv_sum": [], "dur_mac": [], "dur_mod": []}) |
|
|
return df |
|
|
|
|
|
def plot_ladder(df: pd.DataFrame): |
|
|
try: |
|
|
if df.empty: |
|
|
fig, ax = plt.subplots(figsize=(7, 3)) |
|
|
ax.text(0.5, 0.5, "No data", ha="center", va="center", fontsize=12) |
|
|
ax.axis("off") |
|
|
return fig |
|
|
|
|
|
pivot = df.pivot(index="time_bucket", columns="bucket", values="amount").fillna(0) |
|
|
order = ["T+1", "T+2..7", "T+8..30", "T+31+"] |
|
|
pivot = pivot.reindex(order) |
|
|
fig, ax = plt.subplots(figsize=(7, 4)) |
|
|
|
|
|
assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index) |
|
|
sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index) |
|
|
|
|
|
ax.bar(pivot.index, assets, label="Assets") |
|
|
ax.bar(pivot.index, -sof, label="SoF") |
|
|
ax.axhline(0, color="gray", lw=1) |
|
|
ax.set_ylabel("LKR") |
|
|
ax.set_title("Maturity Ladder (Assets vs SoF)") |
|
|
ax.legend() |
|
|
fig.tight_layout() |
|
|
return fig |
|
|
except Exception as e: |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(7, 3)) |
|
|
ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left") |
|
|
ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True) |
|
|
ax.axis("off") |
|
|
return fig |
|
|
|
|
|
def export_excel(as_of_date: str, |
|
|
assets_t1: float, |
|
|
sof_t1: float, |
|
|
net_gap_t1: float, |
|
|
ladder: pd.DataFrame, |
|
|
irr: pd.DataFrame) -> Path: |
|
|
out = EXPORT_DIR / f"alco_report_{as_of_date}.xlsx" |
|
|
with pd.ExcelWriter(out, engine="xlsxwriter") as xw: |
|
|
pd.DataFrame({ |
|
|
"as_of_date": [as_of_date], |
|
|
"assets_t1": [assets_t1], |
|
|
"sof_t1": [sof_t1], |
|
|
"net_gap_t1": [net_gap_t1], |
|
|
}).to_excel(xw, index=False, sheet_name="kpis") |
|
|
ladder.to_excel(xw, index=False, sheet_name="ladder") |
|
|
irr.to_excel(xw, index=False, sheet_name="irr") |
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_dashboard(): |
|
|
""" |
|
|
Returns: |
|
|
status (str), |
|
|
as_of (str), |
|
|
assets_t1 (float), |
|
|
sof_t1 (float), |
|
|
net_gap_t1 (float), |
|
|
fig (matplotlib fig), |
|
|
ladder_df (DataFrame), |
|
|
irr_df (DataFrame), |
|
|
excel_file (path str) |
|
|
""" |
|
|
status = "β
OK" |
|
|
try: |
|
|
conn = connect_md() |
|
|
cols = discover_columns(conn, TABLE_FQN) |
|
|
ensure_view(conn, cols) |
|
|
|
|
|
|
|
|
as_of = "N/A" |
|
|
if "as_of_date" in cols: |
|
|
tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf() |
|
|
if not tmp.empty and not pd.isna(tmp["d"].iloc[0]): |
|
|
as_of = str(tmp["d"].iloc[0])[:10] |
|
|
|
|
|
assets_t1, sof_t1, net_gap_t1 = fetch_kpis(conn) |
|
|
ladder = fetch_ladder(conn) |
|
|
irr = fetch_irr(conn, cols) |
|
|
|
|
|
fig = plot_ladder(ladder) |
|
|
xlsx_path = export_excel(as_of, assets_t1, sof_t1, net_gap_t1, ladder, irr) |
|
|
|
|
|
return ( |
|
|
status, |
|
|
as_of, |
|
|
assets_t1, |
|
|
sof_t1, |
|
|
net_gap_t1, |
|
|
fig, |
|
|
ladder, |
|
|
irr, |
|
|
str(xlsx_path), |
|
|
) |
|
|
except Exception as e: |
|
|
|
|
|
status = f"β Error: {e}" |
|
|
empty_df = pd.DataFrame() |
|
|
fig = plot_ladder(empty_df) |
|
|
return ( |
|
|
status, |
|
|
"N/A", |
|
|
0.0, |
|
|
0.0, |
|
|
0.0, |
|
|
fig, |
|
|
empty_df, |
|
|
empty_df, |
|
|
"", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title=APP_TITLE) as demo: |
|
|
gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β `{VIEW_FQN}`") |
|
|
|
|
|
status = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("π Refresh", variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
as_of = gr.Textbox(label="As of date", interactive=False) |
|
|
with gr.Row(): |
|
|
a1 = gr.Number(label="Assets T+1 (LKR)", precision=0) |
|
|
a2 = gr.Number(label="SoF T+1 (LKR)", precision=0) |
|
|
a3 = gr.Number(label="Net Gap T+1 (LKR)", precision=0) |
|
|
|
|
|
chart = gr.Plot(label="Maturity Ladder") |
|
|
ladder_df = gr.Dataframe(label="Ladder Detail") |
|
|
irr_df = gr.Dataframe(label="Interest-Rate Risk (approx)") |
|
|
excel_file = gr.File(label="Excel export", interactive=False) |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=run_dashboard, |
|
|
outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, excel_file], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|