ALM_LLM / app.py
AshenH's picture
Update app.py
a2fac34 verified
raw
history blame
15.7 kB
import os
import sys
import traceback
from pathlib import Path
from typing import List, Tuple, Any
import duckdb
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use("Agg") # headless for Spaces
import matplotlib.pyplot as plt
import gradio as gr
# =========================
# Basic configuration
# =========================
APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard"
TABLE_FQN = "my_db.main.masterdataset_v" # source table
VIEW_FQN = "my_db.main.positions_v" # normalized view created by this app
PRODUCT_ASSETS = [
"loan", "overdraft", "advances", "bills", "bill",
"tbond", "t-bond", "tbill", "t-bill", "repo_asset", "assets"
]
PRODUCT_SOF = [
"fd", "term_deposit", "td", "savings", "current",
"call", "repo_liab"
]
# =========================
# Helpers
# =========================
def connect_md() -> duckdb.DuckDBPyConnection:
token = os.environ.get("MOTHERDUCK_TOKEN", "")
if not token:
raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space β†’ Settings β†’ Secrets.")
return duckdb.connect(f"md:?motherduck_token={token}")
def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]:
# Try DESCRIBE first (fast), fall back to information_schema
try:
df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf()
name_col = "column_name" if "column_name" in df.columns else df.columns[0]
return [str(c).lower() for c in df[name_col].tolist()]
except Exception:
df = conn.execute(
f"""
SELECT lower(column_name) AS col
FROM information_schema.columns
WHERE table_catalog = split_part('{table_fqn}', '.', 1)
AND table_schema = split_part('{table_fqn}', '.', 2)
AND table_name = split_part('{table_fqn}', '.', 3)
"""
).fetchdf()
return df["col"].tolist()
def build_view_sql(existing_cols: List[str]) -> str:
wanted = [
"as_of_date", "product", "months", "segments",
"currency", "Portfolio_value", "Interest_rate",
"days_to_maturity"
]
sel = []
for c in wanted:
if c.lower() in existing_cols:
sel.append(c)
else:
if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"):
sel.append(f"CAST(NULL AS DOUBLE) AS {c}")
else:
sel.append(f"CAST(NULL AS VARCHAR) AS {c}")
sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF])
asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS])
bucket_case = (
f"CASE "
f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' "
f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' "
f"ELSE 'Unknown' END AS bucket"
)
select_sql = ",\n ".join(sel + [bucket_case])
return f"""
CREATE OR REPLACE VIEW {VIEW_FQN} AS
SELECT
{select_sql}
FROM {TABLE_FQN};
"""
def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None:
required = {"product", "portfolio_value", "days_to_maturity"}
if not required.issubset(set(cols)):
raise RuntimeError(
f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}"
)
conn.execute(build_view_sql(cols))
def safe_num(x) -> float:
try:
return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x)
except Exception:
return 0.0
def zeros_like_index(index) -> pd.Series:
return pd.Series([0] * len(index), index=index)
def plot_ladder(df: pd.DataFrame):
try:
if df is None or df.empty:
fig, ax = plt.subplots(figsize=(7, 3))
ax.text(0.5, 0.5, "No data", ha="center", va="center")
ax.axis("off")
return fig
pivot = df.pivot(index="time_bucket", columns="bucket", values="Amount (LKR Mn)").fillna(0)
order = ["T+1", "T+2..7", "T+8..30", "T+31+"]
pivot = pivot.reindex(order)
fig, ax = plt.subplots(figsize=(7, 4))
assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index)
sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index)
ax.bar(pivot.index, assets, label="Assets")
ax.bar(pivot.index, -sof, label="SoF")
ax.axhline(0, color="gray", lw=1)
ax.set_ylabel("LKR (Mn)")
ax.set_title("Maturity Ladder (Assets vs SoF)")
ax.legend()
fig.tight_layout()
return fig
except Exception as e:
fig, ax = plt.subplots(figsize=(7, 3))
ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left")
ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True)
ax.axis("off")
return fig
# =========================
# Query fragments
# =========================
KPI_SQL = f"""
SELECT
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1,
COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1,
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0)
- COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1
FROM {VIEW_FQN};
"""
LADDER_SQL = f"""
SELECT
CASE
WHEN days_to_maturity <= 1 THEN 'T+1'
WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7'
WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30'
ELSE 'T+31+'
END AS time_bucket,
bucket,
SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)"
FROM {VIEW_FQN}
GROUP BY 1,2
ORDER BY 1,2;
"""
GAP_DRIVERS_SQL = f"""
SELECT
product,
bucket,
SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)"
FROM {VIEW_FQN}
WHERE days_to_maturity <= 1
GROUP BY 1, 2
ORDER BY 3 DESC;
"""
def irr_sql(cols: List[str]) -> str:
has_months = "months" in cols
has_ir = "interest_rate" in cols
t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0"
if has_months:
t_expr += " WHEN months IS NOT NULL THEN months/12.0"
t_expr += " ELSE NULL END"
y_expr = "(Interest_rate/100.0)" if has_ir else "0.0"
return f"""
WITH irr_calcs AS (
SELECT
bucket,
Portfolio_value AS pv,
-- Modified Duration = Macaulay Duration / (1 + yield)
-- We approximate Macaulay Duration with time-to-maturity in years (t_expr)
({t_expr}) / (1 + {y_expr}) AS mod_dur
FROM {VIEW_FQN}
)
SELECT
bucket,
SUM(pv) / 1000000.0 AS "Portfolio Value (LKR Mn)",
-- BPV (DV01) = SUM(Portfolio Value * Modified Duration * 0.0001)
SUM(pv * mod_dur * 0.0001) AS "BPV (DV01)"
FROM irr_calcs
GROUP BY bucket;
"""
# =========================
# Dashboard callback
# =========================
def run_dashboard(scenario: str) -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame, str, pd.DataFrame]:
"""
Returns:
status, as_of, a1_text, a2_text, a3_text, figure, ladder_df, irr_df,
explain_text, drivers_df
"""
try:
conn = connect_md()
# --- Scenario Application ---
# Create a temporary view with scenario adjustments.
# Subsequent queries will use this stressed view.
stressed_view_fqn = f"{VIEW_FQN}_stressed"
runoff_factor = 1.0
rate_shock_bps = 0.0
if scenario == "Liquidity Stress: High Deposit Runoff":
runoff_factor = 0.8 # 20% runoff
elif scenario == "IRR Stress: Rate Shock (+200bps)":
rate_shock_bps = 200.0
scenario_sql = f"""
CREATE OR REPLACE TEMP VIEW {stressed_view_fqn} AS
SELECT *,
CASE WHEN lower(product) IN ('savings', 'fd', 'td', 'term_deposit') THEN Portfolio_value * {runoff_factor} ELSE Portfolio_value END AS stressed_pv
FROM {VIEW_FQN};
"""
conn.execute(scenario_sql)
# 1) Discover columns & build view
cols = discover_columns(conn, TABLE_FQN)
ensure_view(conn, cols)
# 2) As-of (optional)
as_of = "N/A"
if "as_of_date" in cols:
tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf()
if not tmp.empty and not pd.isna(tmp["d"].iloc[0]):
as_of = str(tmp["d"].iloc[0])[:10]
# 3) KPIs
# Modify queries to use the stressed view and value column
kpi_sql_stressed = KPI_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
kpi = conn.execute(kpi_sql_stressed).fetchdf()
assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0
sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0
net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0
# 4) Ladder, IRR, and Gap Drivers
ladder_sql_stressed = LADDER_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
drivers_sql_stressed = GAP_DRIVERS_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
irr_sql_stressed = irr_sql(cols).replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv")
ladder = conn.execute(ladder_sql_stressed).fetchdf()
irr = conn.execute(irr_sql_stressed).fetchdf()
drivers = conn.execute(drivers_sql_stressed).fetchdf()
# Create display copies of dataframes and format them for the UI
ladder_display = ladder.copy()
if "Amount (LKR Mn)" in ladder.columns:
ladder_display["Amount (LKR Mn)"] = ladder_display["Amount (LKR Mn)"].map('{:,.2f}'.format)
else:
ladder_display = pd.DataFrame()
# Format IRR table
irr_display = irr.copy()
if not irr_display.empty:
irr_display["Portfolio Value (LKR Mn)"] = irr_display["Portfolio Value (LKR Mn)"].map('{:,.2f}'.format)
irr_display["BPV (DV01)"] = irr_display["BPV (DV01)"].map('{:,.2f}'.format)
if "Amount (LKR Mn)" in drivers.columns:
drivers_display = drivers.copy()
drivers_display["Amount (LKR Mn)"] = drivers_display["Amount (LKR Mn)"].map('{:,.2f}'.format)
else:
drivers_display = pd.DataFrame()
# 5) Chart
fig = plot_ladder(ladder)
# 6) Explanations
assets_t1_mn_str = f"{(assets_t1 / 1_000_000):,.2f}"
sof_t1_mn_str = f"{(sof_t1 / 1_000_000):,.2f}"
net_gap_mn_str = f"{(net_gap / 1_000_000):,.2f}"
gap_sign_str = "positive" if net_gap >= 0 else "negative"
a1_text = f"The amount of Assets maturing tomorrow (T+1) is **LKR {assets_t1_mn_str} Mn**."
a2_text = f"The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is **LKR {sof_t1_mn_str} Mn**."
a3_text = f"The resulting Net Liquidity Gap for tomorrow (T+1) is **LKR {net_gap_mn_str} Mn**."
# Build "Why" text
sof_drivers = drivers[drivers["bucket"] == "SoF"]
asset_drivers = drivers[drivers["bucket"] == "Assets"]
top_sof_prod = sof_drivers.iloc[0] if not sof_drivers.empty else None
top_asset_prod = asset_drivers.iloc[0] if not asset_drivers.empty else None
explain_text = f"### Why is the T+1 Gap {gap_sign_str}?\n\n"
if top_sof_prod is not None:
explain_text += f"* **Largest Liability Maturity:** The largest outflow comes from `{top_sof_prod['product']}`, with **LKR {top_sof_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n"
else:
explain_text += "* **Largest Liability Maturity:** No significant liabilities are maturing tomorrow.\n"
if top_asset_prod is not None:
explain_text += f"* **Largest Asset Inflow:** The largest inflow comes from `{top_asset_prod['product']}`, with **LKR {top_asset_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n"
else:
explain_text += "* **Largest Asset Inflow:** No significant assets are maturing to provide inflows tomorrow.\n"
# Note: The data source does not contain features for seasonal analysis (e.g., day_of_week, is_month_end).
explain_text += "* **Seasonal Pattern:** Analysis not possible without relevant time-series features in the source data."
# Add scenario explanation for IRR stress
if scenario == "IRR Stress: Rate Shock (+200bps)" and not irr.empty:
net_bpv = irr["BPV (DV01)"].sum()
eve_impact = net_bpv * rate_shock_bps
eve_impact_mn = eve_impact / 1_000_000
explain_text += f"\n\n### IRR Stress Scenario Impact\n* A **+{rate_shock_bps:.0f} bps** rate shock is projected to change the portfolio's Economic Value by **LKR {eve_impact_mn:,.2f} Mn**."
status = f"βœ… OK (as of {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')})"
return (
status,
as_of,
a1_text,
a2_text,
a3_text,
fig,
ladder_display,
irr_display,
explain_text,
drivers_display,
)
except Exception as e:
tb = traceback.format_exc()
empty_df = pd.DataFrame()
fig = plot_ladder(empty_df)
return (
f"❌ Error: {e}\n\n{tb}",
"N/A",
"0",
"0",
"0",
fig,
empty_df,
empty_df,
"Analysis could not be performed.",
empty_df,
)
# =========================
# Build Gradio UI
# =========================
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β†’ `{VIEW_FQN}`")
status = gr.Textbox(label="Status", interactive=False, lines=8)
with gr.Row():
refresh_btn = gr.Button("πŸ”„ Refresh", variant="primary")
theme_btn = gr.Button("πŸŒ— Toggle Theme")
theme_btn.click(
None,
None,
_js="() => { document.querySelector('html').classList.toggle('dark'); }"
)
scenario_dd = gr.Dropdown(
label="Select Stress Scenario",
choices=["Baseline", "Liquidity Stress: High Deposit Runoff", "IRR Stress: Rate Shock (+200bps)"],
value="Baseline"
)
with gr.Row():
as_of = gr.Textbox(label="As of date", interactive=False)
a1 = gr.Markdown("The amount of Assets maturing tomorrow (T+1) is...")
a2 = gr.Markdown("The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is...")
a3 = gr.Markdown("The resulting Net Liquidity Gap for tomorrow (T+1) is...")
with gr.Row():
with gr.Column(scale=2):
chart = gr.Plot(label="Maturity Ladder")
ladder_df = gr.Dataframe(label="Ladder Detail")
irr_df = gr.Dataframe(
label="Interest-Rate Risk (BPV/DV01)",
headers=["Bucket", "Portfolio Value (LKR Mn)", "BPV (DV01)"]
)
with gr.Column(scale=1):
explain_text = gr.Markdown("Analysis of the T+1 gap will appear here...")
drivers_df = gr.Dataframe(
label="T+1 Gap Drivers (Top Products)",
headers=["Product", "Bucket", "Amount (LKR Mn)"],
)
refresh_btn.click(
fn=run_dashboard,
inputs=[scenario_dd],
outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, explain_text, drivers_df],
)
if __name__ == "__main__":
demo.launch()