import gradio as gr
import pandas as pd
import numpy as np
import os
import re
from typing import Dict, Tuple, List, Optional
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
# ======================================
# 設定(添付CSVの既定パス:必要に応じて変更可)
# ======================================
DEFAULT_CSV_PATH = "/mnt/data/mock_data_id_9999.csv"
# ======================================
# ユーティリティ
# ======================================
def normalize(s: str) -> str:
return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip()
def try_read_csv_3header(path_or_file) -> pd.DataFrame:
"""
3行ヘッダーCSVを読み込む(cp932/utf-8-sig フォールバック)。
1列目は timestamp として datetime 変換。
2列目以降は (ID, ItemName, ProcessName) の3段。
"""
last_err = None
for enc in ["cp932", "utf-8-sig", "utf-8"]:
try:
df = pd.read_csv(path_or_file, header=[0, 1, 2], encoding=enc)
break
except Exception as e:
last_err = e
df = None
if df is None:
raise last_err
# 先頭列を timestamp に
ts = pd.to_datetime(df.iloc[:, 0], errors="coerce")
df = df.drop(df.columns[0], axis=1)
df.insert(0, "timestamp", ts)
# 列名はタプルのまま保持(timestampは str)
# ただし内部処理用に文字列連結も作成できるように関数を用意
return df
def col_tuple_to_str(col) -> str:
if isinstance(col, tuple):
return "_".join([str(x) for x in col if x])
return str(col)
def build_index_maps(df: pd.DataFrame):
"""
プロセス(3行目=タプルの3つ目)→ 該当列情報 の辞書を作る。
各列は (col_tuple, id, item, process, col_str)
"""
process_map = {}
for col in df.columns:
if col == "timestamp":
continue
if isinstance(col, tuple) and len(col) >= 3:
col_id, item_name, process_name = str(col[0]), str(col[1]), str(col[2])
else:
# 非タプル(安全策)
parts = str(col).split("_")
if len(parts) >= 3:
col_id, item_name, process_name = parts[0], "_".join(parts[1:-1]), parts[-1]
else:
# プロセスが分からない列はスキップ
continue
rec = {
"col_tuple": col,
"id": col_id,
"item": item_name,
"process": process_name,
"col_str": col_tuple_to_str(col),
}
process_map.setdefault(process_name, []).append(rec)
# プロセス候補・アイテム候補を返すために使う
processes = sorted(list(process_map.keys()), key=lambda x: normalize(x))
return process_map, processes
def extract_measure_tag(item_name: str) -> str:
"""
項目名末尾の計測項目タグを抽出。
例:
"処理水 有機物 分析値 [mg/L]" → "mg/L"
"原水 TOC" → "TOC"
"導電率(電気伝導度) [mS/cm]" → "mS/cm"
優先順:
1) [...] の中身
2) 全角/半角スペース区切りの末尾語(英字混在や記号含む)
"""
s = normalize(item_name)
m = re.search(r"\[([^\[\]]+)\]\s*$", s)
if m:
return m.group(1).strip()
# 角括弧がなければ末尾語
tokens = re.split(r"\s+", s)
if tokens:
return tokens[-1]
return s
# ======================================
# しきい値ハンドリング
# ======================================
def try_read_thresholds_excel(file) -> Optional[pd.DataFrame]:
"""
しきい値Excel(任意)を読み込み。
想定カラム: ColumnID, ItemName, ProcessNo_ProcessName, LL, L, H, HH, Important(任意)
"""
if file is None:
return None
df = pd.read_excel(file)
df.columns = [normalize(c) for c in df.columns]
# 必須カラム確認(最低限)
needed = {"ColumnID", "ItemName", "ProcessNo_ProcessName"}
if not needed.issubset(set(df.columns)):
# 列名が違う場合の簡易吸収
rename_map = {}
for k in list(df.columns):
nk = normalize(str(k))
if nk.lower() in ["columnid", "colid", "id"]:
rename_map[k] = "ColumnID"
elif nk.lower() in ["itemname", "item", "name"]:
rename_map[k] = "ItemName"
elif nk.lower() in ["processno_processname", "process", "processname"]:
rename_map[k] = "ProcessNo_ProcessName"
if rename_map:
df = df.rename(columns=rename_map)
# 数値化
for c in ["LL", "L", "H", "HH"]:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
if "Important" in df.columns:
df["Important"] = (
df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False})
)
return df
def build_threshold_lookup(thr_df: Optional[pd.DataFrame]) -> Dict[Tuple[str, str, str], Tuple[float, float, float, float]]:
"""
キー: (ColumnID, ItemName, ProcessNo_ProcessName) → (LL, L, H, HH)
"""
lookup = {}
if thr_df is None or thr_df.empty:
return lookup
for _, r in thr_df.iterrows():
colid = normalize(str(r.get("ColumnID", "")))
item = normalize(str(r.get("ItemName", "")))
proc = normalize(str(r.get("ProcessNo_ProcessName", "")))
LL = r.get("LL", np.nan)
L = r.get("L", np.nan)
H = r.get("H", np.nan)
HH = r.get("HH", np.nan)
lookup[(colid, item, proc)] = (LL, L, H, HH)
return lookup
def auto_threshold(series: pd.Series) -> Tuple[float, float, float, float]:
"""
自動しきい値: mean ± std(LL/L/H/HH の2段に同じ幅を割当)
例: L=mean-std, LL=mean-2std, H=mean+std, HH=mean+2std
"""
s = series.dropna()
if len(s) < 5:
return (np.nan, np.nan, np.nan, np.nan)
m = float(s.mean())
sd = float(s.std(ddof=1)) if len(s) >= 2 else 0.0
return (m - 2*sd, m - sd, m + sd, m + 2*sd)
def judge_status(value, LL, L, H, HH) -> str:
if pd.notna(LL) and value <= LL:
return "LL"
if pd.notna(L) and value <= L:
return "L"
if pd.notna(HH) and value >= HH:
return "HH"
if pd.notna(H) and value >= H:
return "H"
return "OK"
# カラー(点の色):閾値逸脱を強調
STATUS_COLOR = {
"LL": "#2b6cb0", # 青系
"L": "#63b3ed", # 水色
"OK": "#a0aec0", # グレー
"H": "#f6ad55", # 橙
"HH": "#e53e3e", # 赤
}
# 線色(系列ライン):列ごとに安定色
LINE_COLOR = "#4a5568" # 濃いグレー
# ======================================
# 図作成(既存:グルーピングごとに個別のFigureを返す)
# ======================================
def make_trend_figs(
df: pd.DataFrame,
process_map: Dict[str, List[dict]],
process_name: str,
selected_items: List[str],
thr_df: Optional[pd.DataFrame],
thr_mode: str, # "excel" or "auto"
date_min: Optional[str] = None,
date_max: Optional[str] = None,
) -> List[go.Figure]:
"""
計測項目タグごと(extract_measure_tag)に図を分けて生成。
selected_items は「2行目(ItemName)」の値。
"""
if df is None or process_name is None or process_name == "":
return []
# 対象プロセスの列レコード
recs = process_map.get(process_name, [])
if not recs:
return []
# 2行目(ItemName)で絞り込み
selected_items_set = set([normalize(x) for x in (selected_items or [])])
recs = [r for r in recs if normalize(r["item"]) in selected_items_set]
if not recs:
return []
# 日付範囲フィルタ
dfw = df.copy()
if date_min:
dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)]
if date_max:
dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)]
if dfw.empty:
return []
# しきい値参照
thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {}
# 測定項目タグごとにグループ化
groups: Dict[str, List[dict]] = {}
for r in recs:
tag = extract_measure_tag(r["item"])
groups.setdefault(tag, []).append(r)
figs = []
for tag, cols in groups.items():
fig = go.Figure()
# 各列を描画
for r in cols:
col = r["col_tuple"]
col_str = r["col_str"]
if col not in dfw.columns:
# まれにヘッダー崩れなど
if col_str in dfw.columns:
series = dfw[col_str]
else:
continue
else:
series = dfw[col]
# 値
x = dfw["timestamp"]
y = pd.to_numeric(series, errors="coerce")
# しきい値決定
if thr_mode == "excel":
key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"]))
LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan))
# Excelに見つからない場合は自動にフォールバック
if all(pd.isna(v) for v in [LL, L, H, HH]):
LL, L, H, HH = auto_threshold(y)
else:
LL, L, H, HH = auto_threshold(y)
# 状態ごとに点色を決める
colors = []
for v in y:
if pd.isna(v):
colors.append("rgba(0,0,0,0)")
else:
st = judge_status(v, LL, L, H, HH)
colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"]))
# 下地のライン(視認性のため薄色)
fig.add_trace(go.Scatter(
x=x, y=y, mode="lines",
name=f"{r['item']} ({r['id']})",
line=dict(color=LINE_COLOR, width=1.5),
hovertemplate="%{x}
%{y}"+f"{r['item']} ({r['id']})"+""
))
# 色付きマーカーで逸脱強調
fig.add_trace(go.Scatter(
x=x, y=y, mode="markers",
name=f"{r['item']} markers",
marker=dict(size=6, color=colors),
showlegend=False,
hovertemplate="%{x}
%{y}"
))
# しきい値ガイド(あれば)
def add_hline(val, label):
if pd.notna(val):
fig.add_hline(y=float(val), line=dict(width=1, dash="dot"),
annotation_text=label, annotation_position="top left")
add_hline(LL, "LL")
add_hline(L, "L")
add_hline(H, "H")
add_hline(HH, "HH")
fig.update_layout(
title=f"{process_name} | 計測項目: {tag}",
xaxis_title="timestamp",
yaxis_title=tag,
legend_title="系列",
margin=dict(l=10, r=10, t=40, b=10),
hovermode="x unified",
)
figs.append(fig)
return figs
# ======================================
# 新規:サブプロット1枚でまとめる図
# ======================================
def make_trend_figure(
df: pd.DataFrame,
process_map: Dict[str, List[dict]],
process_name: str,
selected_items: List[str],
thr_df: Optional[pd.DataFrame],
thr_mode: str, # "excel" or "auto"
date_min: Optional[str] = None,
date_max: Optional[str] = None,
_force_tags: Optional[List[str]] = None, # ← 追加:ページ分割用に表示するタグを指定
) -> Optional[go.Figure]:
if df is None or not process_name:
return None
recs = process_map.get(process_name, [])
if not recs:
return None
selected_items_set = set([normalize(x) for x in (selected_items or [])])
recs = [r for r in recs if normalize(r["item"]) in selected_items_set]
if not recs:
return None
dfw = df.copy()
if date_min:
dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)]
if date_max:
dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)]
if dfw.empty:
return None
thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {}
# 計測項目タグでグルーピング
groups: Dict[str, List[dict]] = {}
for r in recs:
tag = extract_measure_tag(r["item"])
groups.setdefault(tag, []).append(r)
tags = list(groups.keys()) if _force_tags is None else _force_tags
if not tags:
return None
rows = len(tags)
# rows が多いときは、Plotly の制約: vertical_spacing <= 1/(rows-1)
if rows <= 1:
vspace = 0.03
else:
max_vs = (1.0 / (rows - 1)) - 1e-4 # ほんの少しだけマージンを取る
vspace = max(0.0, min(0.03, max_vs))
fig = make_subplots(
rows=rows, cols=1, shared_xaxes=True,
vertical_spacing=vspace,
subplot_titles=[f"{process_name} | 計測項目: {t}" for t in tags]
)
row_idx = 1
for tag in tags:
cols = groups[tag]
for r in cols:
col = r["col_tuple"]
col_str = r["col_str"]
if col in dfw.columns:
series = dfw[col]
elif col_str in dfw.columns:
series = dfw[col_str]
else:
continue
x = dfw["timestamp"]
y = pd.to_numeric(series, errors="coerce")
if thr_mode == "excel":
key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"]))
LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan))
if all(pd.isna(v) for v in [LL, L, H, HH]):
LL, L, H, HH = auto_threshold(y)
else:
LL, L, H, HH = auto_threshold(y)
# ライン
fig.add_trace(
go.Scatter(
x=x, y=y, mode="lines",
name=f"{r['item']} ({r['id']})",
line=dict(color=LINE_COLOR, width=1.5),
hovertemplate="%{x}
%{y}"+f"{r['item']} ({r['id']})"+""
),
row=row_idx, col=1
)
# マーカー(色分け)
colors = []
for v in y:
if pd.isna(v):
colors.append("rgba(0,0,0,0)")
else:
st = judge_status(v, LL, L, H, HH)
colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"]))
fig.add_trace(
go.Scatter(
x=x, y=y, mode="markers",
name=f"{r['item']} markers",
marker=dict(size=6, color=colors),
showlegend=False,
hovertemplate="%{x}
%{y}"
),
row=row_idx, col=1
)
# しきい値ガイド
for val, label in [(LL, "LL"), (L, "L"), (H, "H"), (HH, "HH")]:
if pd.notna(val):
fig.add_hline(
y=float(val), line=dict(width=1, dash="dot"),
annotation_text=label, annotation_position="top left",
row=row_idx, col=1
)
row_idx += 1
fig.update_layout(
title=f"{process_name} | 計測項目タグごとのトレンド",
xaxis_title="timestamp",
showlegend=True,
margin=dict(l=10, r=10, t=40, b=10),
hovermode="x unified",
height=max(400, 260 * len(tags)),
)
return fig
# 追加:ページ分割版(tags_per_pageごとに make_trend_figure を呼ぶだけ)
def make_trend_figure_paged(
df: pd.DataFrame,
process_map: Dict[str, List[dict]],
process_name: str,
selected_items: List[str],
thr_df: Optional[pd.DataFrame],
thr_mode: str,
date_min: Optional[str],
date_max: Optional[str],
page: int,
tags_per_page: int,
) -> Tuple[Optional[go.Figure], int, List[str]]:
# 対象タグの全一覧を作る
recs = process_map.get(process_name, [])
if not recs:
return None, 0, []
selected_items_set = set([normalize(x) for x in (selected_items or [])])
recs = [r for r in recs if normalize(r["item"]) in selected_items_set]
if not recs:
return None, 0, []
groups: Dict[str, List[dict]] = {}
for r in recs:
groups.setdefault(extract_measure_tag(r["item"]), []).append(r)
all_tags = list(groups.keys())
total_pages = max(1, int(np.ceil(len(all_tags) / max(1, tags_per_page))))
page = int(max(1, min(page, total_pages)))
start = (page - 1) * tags_per_page
end = start + tags_per_page
tags_slice = all_tags[start:end]
fig = make_trend_figure(
df, process_map, process_name, selected_items, thr_df, thr_mode, date_min, date_max, _force_tags=tags_slice
)
return fig, total_pages, all_tags
# ======================================
# 新規:計測項目タグごとに個別Figure
# ======================================
def make_trend_figs_by_tag(
df: pd.DataFrame,
process_map: Dict[str, List[dict]],
process_name: str,
selected_items: List[str],
thr_df: Optional[pd.DataFrame],
thr_mode: str,
date_min: Optional[str] = None,
date_max: Optional[str] = None,
) -> Dict[str, go.Figure]:
if df is None or not process_name:
return {}
recs = process_map.get(process_name, [])
if not recs:
return {}
selected_items_set = set([normalize(x) for x in (selected_items or [])])
recs = [r for r in recs if normalize(r["item"]) in selected_items_set]
if not recs:
return {}
dfw = df.copy()
if date_min:
dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)]
if date_max:
dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)]
if dfw.empty:
return {}
thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {}
groups: Dict[str, List[dict]] = {}
for r in recs:
tag = extract_measure_tag(r["item"])
groups.setdefault(tag, []).append(r)
out: Dict[str, go.Figure] = {}
for tag, cols in groups.items():
fig = go.Figure()
for r in cols:
col = r["col_tuple"]
col_str = r["col_str"]
if col in dfw.columns:
series = dfw[col]
elif col_str in dfw.columns:
series = dfw[col_str]
else:
continue
x = dfw["timestamp"]
y = pd.to_numeric(series, errors="coerce")
if thr_mode == "excel":
key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"]))
LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan))
if all(pd.isna(v) for v in [LL, L, H, HH]):
LL, L, H, HH = auto_threshold(y)
else:
LL, L, H, HH = auto_threshold(y)
fig.add_trace(go.Scatter(
x=x, y=y, mode="lines",
name=f"{r['item']} ({r['id']})",
line=dict(color=LINE_COLOR, width=1.5),
hovertemplate="%{x}
%{y}"+f"{r['item']} ({r['id']})"+""
))
colors = []
for v in y:
if pd.isna(v):
colors.append("rgba(0,0,0,0)")
else:
st = judge_status(v, LL, L, H, HH)
colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"]))
fig.add_trace(go.Scatter(
x=x, y=y, mode="markers",
name=f"{r['item']} markers",
marker=dict(size=6, color=colors),
showlegend=False,
hovertemplate="%{x}
%{y}"
))
for val, label in [(LL, "LL"), (L, "L"), (H, "H"), (HH, "HH")]:
if pd.notna(val):
fig.add_hline(y=float(val), line=dict(width=1, dash="dot"),
annotation_text=label, annotation_position="top left")
fig.update_layout(
title=f"{process_name} | 計測項目: {tag}",
xaxis_title="timestamp",
yaxis_title=tag,
legend_title="系列",
margin=dict(l=10, r=10, t=40, b=10),
hovermode="x unified",
)
out[tag] = fig
return out
def figures_to_html(figs_by_tag: Dict[str, go.Figure]) -> str:
"""PlotlyJS埋め込み方式(ブラウザ側で描画)"""
parts = []
first = True
for tag, fig in figs_by_tag.items():
html = pio.to_html(fig, include_plotlyjs='cdn' if first else False, full_html=False)
parts.append(
f'
'
)
first = False
return "\n".join(parts)
# ======================================
# グローバル状態(UI間共有)
# ======================================
G_DF: Optional[pd.DataFrame] = None
G_PROCESS_MAP = {}
G_PROCESSES = []
G_THRESHOLDS_DF: Optional[pd.DataFrame] = None
# ======================================
# コールバック
# ======================================
def initialize_default_csv():
"""
起動時にデフォルトCSVが存在すれば読み込む。
"""
global G_DF, G_PROCESS_MAP, G_PROCESSES
if os.path.exists(DEFAULT_CSV_PATH):
try:
df = try_read_csv_3header(DEFAULT_CSV_PATH)
G_DF = df
G_PROCESS_MAP, G_PROCESSES = build_index_maps(df)
return f"✅ 既定CSVを読み込みました: {DEFAULT_CSV_PATH}", gr.update(choices=G_PROCESSES, value=(G_PROCESSES[0] if G_PROCESSES else None)), G_PROCESSES
except Exception as e:
return f"⚠ 既定CSV読み込み失敗: {e}", gr.update(), []
return "ℹ CSVをアップロードしてください。", gr.update(), []
def on_csv_upload(file):
"""
CSVアップロード → パース → プロセス候補更新
"""
global G_DF, G_PROCESS_MAP, G_PROCESSES
if file is None:
return "⚠ ファイルが選択されていません。", gr.update(choices=[]), []
try:
df = try_read_csv_3header(file.name if hasattr(file, "name") else file)
G_DF = df
G_PROCESS_MAP, G_PROCESSES = build_index_maps(df)
return f"✅ CSV読み込み: {df.shape[0]}行 × {df.shape[1]}列", gr.update(choices=G_PROCESSES, value=(G_PROCESSES[0] if G_PROCESSES else None)), G_PROCESSES
except Exception as e:
return f"❌ 読み込みエラー: {e}", gr.update(choices=[]), []
def on_thr_upload(file):
"""
しきい値Excelアップロード → メモリ更新
"""
global G_THRESHOLDS_DF
if file is None:
G_THRESHOLDS_DF = None
return "ℹ しきい値ファイルなし(自動しきい値が使われます)"
try:
thr = try_read_thresholds_excel(file.name if hasattr(file, "name") else file)
G_THRESHOLDS_DF = thr
return f"✅ しきい値を読み込みました({thr.shape[0]}件)"
except Exception as e:
G_THRESHOLDS_DF = None
return f"❌ しきい値読み込みエラー: {e}"
def update_items(process_name: str):
"""
プロセス選択に応じて、項目(2行目)候補を返す。
"""
if not process_name or process_name not in G_PROCESS_MAP:
return gr.update(choices=[], value=[])
items = sorted(list({rec["item"] for rec in G_PROCESS_MAP[process_name]}), key=lambda x: normalize(x))
# デフォルトは全選択
return gr.update(choices=items, value=items)
def render_figs(process_name: str, items: List[str], thr_mode: str, date_min, date_max):
"""
(旧)図を生成して返す(複数図)。今は未使用だが残置。
"""
if G_DF is None:
return "⚠ データ未読み込み", []
if not process_name:
return "⚠ プロセスを選択してください", []
if not items:
return "⚠ 項目を選択してください", []
figs = make_trend_figs(
G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, thr_mode, date_min, date_max
)
if not figs:
return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", []
return f"✅ {process_name}: {len(figs)}枚のトレンド図を生成しました(計測項目タグごと)", figs
def render_any(process_name: str, items: List[str], display_mode: str, thr_mode_label: str,
date_min, date_max, page: int, tpp: int):
"""表示形式に応じて Plot を返す(個別はページ分割)。"""
if G_DF is None:
return "⚠ データ未読み込み", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
if not process_name:
return "⚠ プロセスを選択してください", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
if not items:
return "⚠ 項目を選択してください", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
mode = "excel" if str(thr_mode_label).startswith("excel") else "auto"
if str(display_mode).startswith("サブプロット"):
fig = make_trend_figure(G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, mode, date_min, date_max)
if fig is None:
return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
return "✅ トレンド図(1枚サブプロット)を生成しました", gr.update(value=fig, visible=True), gr.update(visible=False), gr.update(visible=False)
else:
fig, total_pages, all_tags = make_trend_figure_paged(
G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, mode, date_min, date_max,
page=int(page), tags_per_page=int(tpp)
)
if fig is None:
return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", gr.update(visible=False), gr.update(visible=False), gr.update(visible=False)
info = f"タグ総数: {len(all_tags)} | ページ {int(max(1,min(page,total_pages)))} / {total_pages} | タグ/ページ={int(tpp)}"
return "✅ 個別(ページ分割)を描画しました", gr.update(value=fig, visible=True), gr.update(value=info, visible=True), gr.update(visible=True)
# ======================================
# UI
# ======================================
init_msg, init_proc_update, _ = initialize_default_csv()
init_value = init_proc_update.get("value") if isinstance(init_proc_update, dict) else None
init_choices = init_proc_update.get("choices") if isinstance(init_proc_update, dict) else []
with gr.Blocks(css="""
.gradio-container {overflow: auto !important;}
""") as demo:
gr.Markdown("## トレンドグラフ専用アプリ(3行ヘッダー対応・プロセス別・計測項目タグ別・閾値色分け)")
with gr.Row():
csv_uploader = gr.File(label="① 時系列CSV(3行ヘッダー)", file_count="single", file_types=[".csv"])
thr_uploader = gr.File(label="② 閾値Excel(任意: LL/L/HH/HH)", file_count="single", file_types=[".xlsx", ".xls"])
with gr.Row():
thr_mode = gr.Radio(
["excel(アップロード優先・無ければ自動)", "自動(平均±標準偏差)"],
value="excel(アップロード優先・無ければ自動)",
label="しきい値モード"
)
date_min = gr.Textbox(label="抽出開始日時(任意)例: 2024-07-01 00:00")
date_max = gr.Textbox(label="抽出終了日時(任意)例: 2024-07-31 23:59")
# 表示形式の切り替え
display_mode = gr.Radio(
["サブプロット(1枚)", "個別(ページ分割)"],
value="サブプロット(1枚)",
label="表示形式"
)
status_csv = gr.Markdown(init_msg)
status_thr = gr.Markdown()
process_dd = gr.Dropdown(label="対象プロセス(3行ヘッダーの3行目)",
choices=init_choices, value=init_value)
items_cb = gr.CheckboxGroup(label="表示する項目(3行ヘッダーの2行目)", choices=[], value=[])
with gr.Row():
btn_render = gr.Button("トレンド図を生成", variant="primary")
msg = gr.Markdown()
# 表示領域:Plot 1枚を使い回す(個別=ページ分割も同じPlotを再描画)
plot = gr.Plot(label="トレンド図(タグ別)", show_label=True, visible=True)
# ページ分割用コントロール(個別モード時のみ使用)
with gr.Row():
tags_per_page = gr.Slider(1, 12, value=8, step=1, label="タグ/ページ(個別モード)")
page_no = gr.Number(value=1, label="ページ(1〜)", precision=0)
page_info = gr.Markdown(visible=False)
# コールバック接続
# 既定CSVの手動代入は不要(生成時に付与済み)
# 2) CSVアップロードで更新
csv_uploader.change(
on_csv_upload,
inputs=[csv_uploader],
outputs=[status_csv, process_dd, gr.State()],
)
# 3) 閾値アップロードで更新
thr_uploader.change(
on_thr_upload,
inputs=[thr_uploader],
outputs=[status_thr],
)
# 4) プロセス選択で項目候補更新
process_dd.change(
update_items,
inputs=[process_dd],
outputs=[items_cb],
)
# 5) 図生成
btn_render.click(
fn=lambda proc, items, disp_mode, mode, dmin, dmax, p, tpp: render_any(proc, items, disp_mode, mode, dmin, dmax, p, tpp),
inputs=[process_dd, items_cb, display_mode, thr_mode, date_min, date_max, page_no, tags_per_page],
outputs=[msg, plot, page_info, page_no],
)
if __name__ == "__main__":
# gradio>=5: 個別(複数枚)でHTML内の