|
|
import os |
|
|
import sys |
|
|
import traceback |
|
|
from pathlib import Path |
|
|
from typing import List, Tuple, Any |
|
|
|
|
|
import duckdb |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib |
|
|
matplotlib.use("Agg") |
|
|
import matplotlib.pyplot as plt |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard" |
|
|
TABLE_FQN = "my_db.main.masterdataset_v" |
|
|
VIEW_FQN = "my_db.main.positions_v" |
|
|
|
|
|
PRODUCT_ASSETS = [ |
|
|
"loan", "overdraft", "advances", "bills", "bill", |
|
|
"tbond", "t-bond", "tbill", "t-bill", "repo_asset", "assets" |
|
|
] |
|
|
PRODUCT_SOF = [ |
|
|
"fd", "term_deposit", "td", "savings", "current", |
|
|
"call", "repo_liab" |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def connect_md() -> duckdb.DuckDBPyConnection: |
|
|
token = os.environ.get("MOTHERDUCK_TOKEN", "") |
|
|
if not token: |
|
|
raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space β Settings β Secrets.") |
|
|
return duckdb.connect(f"md:?motherduck_token={token}") |
|
|
|
|
|
def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]: |
|
|
|
|
|
try: |
|
|
df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf() |
|
|
name_col = "column_name" if "column_name" in df.columns else df.columns[0] |
|
|
return [str(c).lower() for c in df[name_col].tolist()] |
|
|
except Exception: |
|
|
df = conn.execute( |
|
|
f""" |
|
|
SELECT lower(column_name) AS col |
|
|
FROM information_schema.columns |
|
|
WHERE table_catalog = split_part('{table_fqn}', '.', 1) |
|
|
AND table_schema = split_part('{table_fqn}', '.', 2) |
|
|
AND table_name = split_part('{table_fqn}', '.', 3) |
|
|
""" |
|
|
).fetchdf() |
|
|
return df["col"].tolist() |
|
|
|
|
|
def build_view_sql(existing_cols: List[str]) -> str: |
|
|
wanted = [ |
|
|
"as_of_date", "product", "months", "segments", |
|
|
"currency", "Portfolio_value", "Interest_rate", |
|
|
"days_to_maturity" |
|
|
] |
|
|
sel = [] |
|
|
for c in wanted: |
|
|
if c.lower() in existing_cols: |
|
|
sel.append(c) |
|
|
else: |
|
|
if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"): |
|
|
sel.append(f"CAST(NULL AS DOUBLE) AS {c}") |
|
|
else: |
|
|
sel.append(f"CAST(NULL AS VARCHAR) AS {c}") |
|
|
|
|
|
sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF]) |
|
|
asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS]) |
|
|
|
|
|
bucket_case = ( |
|
|
f"CASE " |
|
|
f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' " |
|
|
f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' " |
|
|
f"ELSE 'Unknown' END AS bucket" |
|
|
) |
|
|
select_sql = ",\n ".join(sel + [bucket_case]) |
|
|
return f""" |
|
|
CREATE OR REPLACE VIEW {VIEW_FQN} AS |
|
|
SELECT |
|
|
{select_sql} |
|
|
FROM {TABLE_FQN}; |
|
|
""" |
|
|
|
|
|
def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None: |
|
|
required = {"product", "portfolio_value", "days_to_maturity"} |
|
|
if not required.issubset(set(cols)): |
|
|
raise RuntimeError( |
|
|
f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}" |
|
|
) |
|
|
conn.execute(build_view_sql(cols)) |
|
|
|
|
|
def safe_num(x) -> float: |
|
|
try: |
|
|
return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x) |
|
|
except Exception: |
|
|
return 0.0 |
|
|
|
|
|
def zeros_like_index(index) -> pd.Series: |
|
|
return pd.Series([0] * len(index), index=index) |
|
|
|
|
|
def plot_ladder(df: pd.DataFrame): |
|
|
try: |
|
|
if df is None or df.empty: |
|
|
fig, ax = plt.subplots(figsize=(7, 3)) |
|
|
ax.text(0.5, 0.5, "No data", ha="center", va="center") |
|
|
ax.axis("off") |
|
|
return fig |
|
|
pivot = df.pivot(index="time_bucket", columns="bucket", values="Amount (LKR Mn)").fillna(0) |
|
|
order = ["T+1", "T+2..7", "T+8..30", "T+31+"] |
|
|
pivot = pivot.reindex(order) |
|
|
fig, ax = plt.subplots(figsize=(7, 4)) |
|
|
assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index) |
|
|
sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index) |
|
|
ax.bar(pivot.index, assets, label="Assets") |
|
|
ax.bar(pivot.index, -sof, label="SoF") |
|
|
ax.axhline(0, color="gray", lw=1) |
|
|
ax.set_ylabel("LKR (Mn)") |
|
|
ax.set_title("Maturity Ladder (Assets vs SoF)") |
|
|
ax.legend() |
|
|
fig.tight_layout() |
|
|
return fig |
|
|
except Exception as e: |
|
|
fig, ax = plt.subplots(figsize=(7, 3)) |
|
|
ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left") |
|
|
ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True) |
|
|
ax.axis("off") |
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KPI_SQL = f""" |
|
|
SELECT |
|
|
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1, |
|
|
COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1, |
|
|
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) |
|
|
- COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1 |
|
|
FROM {VIEW_FQN}; |
|
|
""" |
|
|
|
|
|
LADDER_SQL = f""" |
|
|
SELECT |
|
|
CASE |
|
|
WHEN days_to_maturity <= 1 THEN 'T+1' |
|
|
WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7' |
|
|
WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30' |
|
|
ELSE 'T+31+' |
|
|
END AS time_bucket, |
|
|
bucket, |
|
|
SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)" |
|
|
FROM {VIEW_FQN} |
|
|
GROUP BY 1,2 |
|
|
ORDER BY 1,2; |
|
|
""" |
|
|
|
|
|
GAP_DRIVERS_SQL = f""" |
|
|
SELECT |
|
|
product, |
|
|
bucket, |
|
|
SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)" |
|
|
FROM {VIEW_FQN} |
|
|
WHERE days_to_maturity <= 1 |
|
|
GROUP BY 1, 2 |
|
|
ORDER BY 3 DESC; |
|
|
""" |
|
|
|
|
|
def irr_sql(cols: List[str]) -> str: |
|
|
has_months = "months" in cols |
|
|
has_ir = "interest_rate" in cols |
|
|
t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0" |
|
|
if has_months: |
|
|
t_expr += " WHEN months IS NOT NULL THEN months/12.0" |
|
|
t_expr += " ELSE NULL END" |
|
|
y_expr = "(Interest_rate/100.0)" if has_ir else "0.0" |
|
|
return f""" |
|
|
WITH irr_calcs AS ( |
|
|
SELECT |
|
|
bucket, |
|
|
Portfolio_value AS pv, |
|
|
-- Modified Duration = Macaulay Duration / (1 + yield) |
|
|
-- We approximate Macaulay Duration with time-to-maturity in years (t_expr) |
|
|
({t_expr}) / (1 + {y_expr}) AS mod_dur |
|
|
FROM {VIEW_FQN} |
|
|
) |
|
|
SELECT |
|
|
bucket, |
|
|
SUM(pv) / 1000000.0 AS "Portfolio Value (LKR Mn)", |
|
|
-- BPV (DV01) = SUM(Portfolio Value * Modified Duration * 0.0001) |
|
|
SUM(pv * mod_dur * 0.0001) AS "BPV (DV01)" |
|
|
FROM irr_calcs |
|
|
GROUP BY bucket; |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_dashboard(scenario: str) -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame, str, pd.DataFrame]: |
|
|
""" |
|
|
Returns: |
|
|
status, as_of, a1_text, a2_text, a3_text, figure, ladder_df, irr_df, |
|
|
explain_text, drivers_df |
|
|
""" |
|
|
try: |
|
|
conn = connect_md() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stressed_view_fqn = f"{VIEW_FQN}_stressed" |
|
|
runoff_factor = 1.0 |
|
|
rate_shock_bps = 0.0 |
|
|
|
|
|
if scenario == "Liquidity Stress: High Deposit Runoff": |
|
|
runoff_factor = 0.8 |
|
|
elif scenario == "IRR Stress: Rate Shock (+200bps)": |
|
|
rate_shock_bps = 200.0 |
|
|
|
|
|
scenario_sql = f""" |
|
|
CREATE OR REPLACE TEMP VIEW {stressed_view_fqn} AS |
|
|
SELECT *, |
|
|
CASE WHEN lower(product) IN ('savings', 'fd', 'td', 'term_deposit') THEN Portfolio_value * {runoff_factor} ELSE Portfolio_value END AS stressed_pv |
|
|
FROM {VIEW_FQN}; |
|
|
""" |
|
|
conn.execute(scenario_sql) |
|
|
|
|
|
|
|
|
cols = discover_columns(conn, TABLE_FQN) |
|
|
ensure_view(conn, cols) |
|
|
|
|
|
|
|
|
as_of = "N/A" |
|
|
if "as_of_date" in cols: |
|
|
tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf() |
|
|
if not tmp.empty and not pd.isna(tmp["d"].iloc[0]): |
|
|
as_of = str(tmp["d"].iloc[0])[:10] |
|
|
|
|
|
|
|
|
|
|
|
kpi_sql_stressed = KPI_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
|
|
kpi = conn.execute(kpi_sql_stressed).fetchdf() |
|
|
assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
|
|
|
|
|
|
ladder_sql_stressed = LADDER_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
|
|
drivers_sql_stressed = GAP_DRIVERS_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
|
|
irr_sql_stressed = irr_sql(cols).replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
|
|
|
|
|
ladder = conn.execute(ladder_sql_stressed).fetchdf() |
|
|
irr = conn.execute(irr_sql_stressed).fetchdf() |
|
|
drivers = conn.execute(drivers_sql_stressed).fetchdf() |
|
|
|
|
|
|
|
|
ladder_display = ladder.copy() |
|
|
if "Amount (LKR Mn)" in ladder.columns: |
|
|
ladder_display["Amount (LKR Mn)"] = ladder_display["Amount (LKR Mn)"].map('{:,.2f}'.format) |
|
|
else: |
|
|
ladder_display = pd.DataFrame() |
|
|
|
|
|
|
|
|
irr_display = irr.copy() |
|
|
if not irr_display.empty: |
|
|
irr_display["Portfolio Value (LKR Mn)"] = irr_display["Portfolio Value (LKR Mn)"].map('{:,.2f}'.format) |
|
|
irr_display["BPV (DV01)"] = irr_display["BPV (DV01)"].map('{:,.2f}'.format) |
|
|
|
|
|
if "Amount (LKR Mn)" in drivers.columns: |
|
|
drivers_display = drivers.copy() |
|
|
drivers_display["Amount (LKR Mn)"] = drivers_display["Amount (LKR Mn)"].map('{:,.2f}'.format) |
|
|
else: |
|
|
drivers_display = pd.DataFrame() |
|
|
|
|
|
|
|
|
fig = plot_ladder(ladder) |
|
|
|
|
|
|
|
|
assets_t1_mn_str = f"{(assets_t1 / 1_000_000):,.2f}" |
|
|
sof_t1_mn_str = f"{(sof_t1 / 1_000_000):,.2f}" |
|
|
net_gap_mn_str = f"{(net_gap / 1_000_000):,.2f}" |
|
|
gap_sign_str = "positive" if net_gap >= 0 else "negative" |
|
|
|
|
|
a1_text = f"The amount of Assets maturing tomorrow (T+1) is **LKR {assets_t1_mn_str} Mn**." |
|
|
a2_text = f"The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is **LKR {sof_t1_mn_str} Mn**." |
|
|
a3_text = f"The resulting Net Liquidity Gap for tomorrow (T+1) is **LKR {net_gap_mn_str} Mn**." |
|
|
|
|
|
|
|
|
sof_drivers = drivers[drivers["bucket"] == "SoF"] |
|
|
asset_drivers = drivers[drivers["bucket"] == "Assets"] |
|
|
top_sof_prod = sof_drivers.iloc[0] if not sof_drivers.empty else None |
|
|
top_asset_prod = asset_drivers.iloc[0] if not asset_drivers.empty else None |
|
|
|
|
|
explain_text = f"### Why is the T+1 Gap {gap_sign_str}?\n\n" |
|
|
if top_sof_prod is not None: |
|
|
explain_text += f"* **Largest Liability Maturity:** The largest outflow comes from `{top_sof_prod['product']}`, with **LKR {top_sof_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n" |
|
|
else: |
|
|
explain_text += "* **Largest Liability Maturity:** No significant liabilities are maturing tomorrow.\n" |
|
|
|
|
|
if top_asset_prod is not None: |
|
|
explain_text += f"* **Largest Asset Inflow:** The largest inflow comes from `{top_asset_prod['product']}`, with **LKR {top_asset_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n" |
|
|
else: |
|
|
explain_text += "* **Largest Asset Inflow:** No significant assets are maturing to provide inflows tomorrow.\n" |
|
|
|
|
|
|
|
|
explain_text += "* **Seasonal Pattern:** Analysis not possible without relevant time-series features in the source data." |
|
|
|
|
|
|
|
|
if scenario == "IRR Stress: Rate Shock (+200bps)" and not irr.empty: |
|
|
net_bpv = irr["BPV (DV01)"].sum() |
|
|
eve_impact = net_bpv * rate_shock_bps |
|
|
eve_impact_mn = eve_impact / 1_000_000 |
|
|
explain_text += f"\n\n### IRR Stress Scenario Impact\n* A **+{rate_shock_bps:.0f} bps** rate shock is projected to change the portfolio's Economic Value by **LKR {eve_impact_mn:,.2f} Mn**." |
|
|
|
|
|
|
|
|
status = f"β
OK (as of {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')})" |
|
|
return ( |
|
|
status, |
|
|
as_of, |
|
|
a1_text, |
|
|
a2_text, |
|
|
a3_text, |
|
|
fig, |
|
|
ladder_display, |
|
|
irr_display, |
|
|
explain_text, |
|
|
drivers_display, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
tb = traceback.format_exc() |
|
|
empty_df = pd.DataFrame() |
|
|
fig = plot_ladder(empty_df) |
|
|
return ( |
|
|
f"β Error: {e}\n\n{tb}", |
|
|
"N/A", |
|
|
"0", |
|
|
"0", |
|
|
"0", |
|
|
fig, |
|
|
empty_df, |
|
|
empty_df, |
|
|
"Analysis could not be performed.", |
|
|
empty_df, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title=APP_TITLE) as demo: |
|
|
gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β `{VIEW_FQN}`") |
|
|
|
|
|
status = gr.Textbox(label="Status", interactive=False, lines=8) |
|
|
|
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("π Refresh", variant="primary") |
|
|
theme_btn = gr.Button("π Toggle Theme") |
|
|
theme_btn.click( |
|
|
None, |
|
|
None, |
|
|
_js="() => { document.querySelector('html').classList.toggle('dark'); }" |
|
|
) |
|
|
|
|
|
scenario_dd = gr.Dropdown( |
|
|
label="Select Stress Scenario", |
|
|
choices=["Baseline", "Liquidity Stress: High Deposit Runoff", "IRR Stress: Rate Shock (+200bps)"], |
|
|
value="Baseline" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
as_of = gr.Textbox(label="As of date", interactive=False) |
|
|
|
|
|
a1 = gr.Markdown("The amount of Assets maturing tomorrow (T+1) is...") |
|
|
a2 = gr.Markdown("The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is...") |
|
|
a3 = gr.Markdown("The resulting Net Liquidity Gap for tomorrow (T+1) is...") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
chart = gr.Plot(label="Maturity Ladder") |
|
|
ladder_df = gr.Dataframe(label="Ladder Detail") |
|
|
irr_df = gr.Dataframe( |
|
|
label="Interest-Rate Risk (BPV/DV01)", |
|
|
headers=["Bucket", "Portfolio Value (LKR Mn)", "BPV (DV01)"] |
|
|
) |
|
|
with gr.Column(scale=1): |
|
|
explain_text = gr.Markdown("Analysis of the T+1 gap will appear here...") |
|
|
drivers_df = gr.Dataframe( |
|
|
label="T+1 Gap Drivers (Top Products)", |
|
|
headers=["Product", "Bucket", "Amount (LKR Mn)"], |
|
|
) |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=run_dashboard, |
|
|
inputs=[scenario_dd], |
|
|
outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, explain_text, drivers_df], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|