|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import os |
|
|
import re |
|
|
from typing import Dict, Tuple, List, Optional |
|
|
import plotly.graph_objects as go |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_CSV_PATH = "/mnt/data/mock_data_id_9999.csv" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalize(s: str) -> str: |
|
|
return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() |
|
|
|
|
|
def try_read_csv_3header(path_or_file) -> pd.DataFrame: |
|
|
""" |
|
|
3行ヘッダーCSVを読み込む(cp932/utf-8-sig フォールバック)。 |
|
|
1列目は timestamp として datetime 変換。 |
|
|
2列目以降は (ID, ItemName, ProcessName) の3段。 |
|
|
""" |
|
|
last_err = None |
|
|
for enc in ["cp932", "utf-8-sig", "utf-8"]: |
|
|
try: |
|
|
df = pd.read_csv(path_or_file, header=[0, 1, 2], encoding=enc) |
|
|
break |
|
|
except Exception as e: |
|
|
last_err = e |
|
|
df = None |
|
|
if df is None: |
|
|
raise last_err |
|
|
|
|
|
|
|
|
ts = pd.to_datetime(df.iloc[:, 0], errors="coerce") |
|
|
df = df.drop(df.columns[0], axis=1) |
|
|
df.insert(0, "timestamp", ts) |
|
|
|
|
|
|
|
|
|
|
|
return df |
|
|
|
|
|
def col_tuple_to_str(col) -> str: |
|
|
if isinstance(col, tuple): |
|
|
return "_".join([str(x) for x in col if x]) |
|
|
return str(col) |
|
|
|
|
|
def build_index_maps(df: pd.DataFrame): |
|
|
""" |
|
|
プロセス(3行目=タプルの3つ目)→ 該当列情報 の辞書を作る。 |
|
|
各列は (col_tuple, id, item, process, col_str) |
|
|
""" |
|
|
process_map = {} |
|
|
for col in df.columns: |
|
|
if col == "timestamp": |
|
|
continue |
|
|
if isinstance(col, tuple) and len(col) >= 3: |
|
|
col_id, item_name, process_name = str(col[0]), str(col[1]), str(col[2]) |
|
|
else: |
|
|
|
|
|
parts = str(col).split("_") |
|
|
if len(parts) >= 3: |
|
|
col_id, item_name, process_name = parts[0], "_".join(parts[1:-1]), parts[-1] |
|
|
else: |
|
|
|
|
|
continue |
|
|
rec = { |
|
|
"col_tuple": col, |
|
|
"id": col_id, |
|
|
"item": item_name, |
|
|
"process": process_name, |
|
|
"col_str": col_tuple_to_str(col), |
|
|
} |
|
|
process_map.setdefault(process_name, []).append(rec) |
|
|
|
|
|
processes = sorted(list(process_map.keys()), key=lambda x: normalize(x)) |
|
|
return process_map, processes |
|
|
|
|
|
def extract_measure_tag(item_name: str) -> str: |
|
|
""" |
|
|
項目名末尾の計測項目タグを抽出。 |
|
|
例: |
|
|
"処理水 有機物 分析値 [mg/L]" → "mg/L" |
|
|
"原水 TOC" → "TOC" |
|
|
"導電率(電気伝導度) [mS/cm]" → "mS/cm" |
|
|
優先順: |
|
|
1) [...] の中身 |
|
|
2) 全角/半角スペース区切りの末尾語(英字混在や記号含む) |
|
|
""" |
|
|
s = normalize(item_name) |
|
|
m = re.search(r"\[([^\[\]]+)\]\s*$", s) |
|
|
if m: |
|
|
return m.group(1).strip() |
|
|
|
|
|
tokens = re.split(r"\s+", s) |
|
|
if tokens: |
|
|
return tokens[-1] |
|
|
return s |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def try_read_thresholds_excel(file) -> Optional[pd.DataFrame]: |
|
|
""" |
|
|
しきい値Excel(任意)を読み込み。 |
|
|
想定カラム: ColumnID, ItemName, ProcessNo_ProcessName, LL, L, H, HH, Important(任意) |
|
|
""" |
|
|
if file is None: |
|
|
return None |
|
|
df = pd.read_excel(file) |
|
|
df.columns = [normalize(c) for c in df.columns] |
|
|
|
|
|
needed = {"ColumnID", "ItemName", "ProcessNo_ProcessName"} |
|
|
if not needed.issubset(set(df.columns)): |
|
|
|
|
|
rename_map = {} |
|
|
for k in list(df.columns): |
|
|
nk = normalize(str(k)) |
|
|
if nk.lower() in ["columnid", "colid", "id"]: |
|
|
rename_map[k] = "ColumnID" |
|
|
elif nk.lower() in ["itemname", "item", "name"]: |
|
|
rename_map[k] = "ItemName" |
|
|
elif nk.lower() in ["processno_processname", "process", "processname"]: |
|
|
rename_map[k] = "ProcessNo_ProcessName" |
|
|
if rename_map: |
|
|
df = df.rename(columns=rename_map) |
|
|
|
|
|
for c in ["LL", "L", "H", "HH"]: |
|
|
if c in df.columns: |
|
|
df[c] = pd.to_numeric(df[c], errors="coerce") |
|
|
if "Important" in df.columns: |
|
|
df["Important"] = ( |
|
|
df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) |
|
|
) |
|
|
return df |
|
|
|
|
|
def build_threshold_lookup(thr_df: Optional[pd.DataFrame]) -> Dict[Tuple[str, str, str], Tuple[float, float, float, float]]: |
|
|
""" |
|
|
キー: (ColumnID, ItemName, ProcessNo_ProcessName) → (LL, L, H, HH) |
|
|
""" |
|
|
lookup = {} |
|
|
if thr_df is None or thr_df.empty: |
|
|
return lookup |
|
|
for _, r in thr_df.iterrows(): |
|
|
colid = normalize(str(r.get("ColumnID", ""))) |
|
|
item = normalize(str(r.get("ItemName", ""))) |
|
|
proc = normalize(str(r.get("ProcessNo_ProcessName", ""))) |
|
|
LL = r.get("LL", np.nan) |
|
|
L = r.get("L", np.nan) |
|
|
H = r.get("H", np.nan) |
|
|
HH = r.get("HH", np.nan) |
|
|
lookup[(colid, item, proc)] = (LL, L, H, HH) |
|
|
return lookup |
|
|
|
|
|
def auto_threshold(series: pd.Series) -> Tuple[float, float, float, float]: |
|
|
""" |
|
|
自動しきい値: mean ± std(LL/L/H/HH の2段に同じ幅を割当) |
|
|
例: L=mean-std, LL=mean-2std, H=mean+std, HH=mean+2std |
|
|
""" |
|
|
s = series.dropna() |
|
|
if len(s) < 5: |
|
|
return (np.nan, np.nan, np.nan, np.nan) |
|
|
m = float(s.mean()) |
|
|
sd = float(s.std(ddof=1)) if len(s) >= 2 else 0.0 |
|
|
return (m - 2*sd, m - sd, m + sd, m + 2*sd) |
|
|
|
|
|
def judge_status(value, LL, L, H, HH) -> str: |
|
|
if pd.notna(LL) and value <= LL: |
|
|
return "LL" |
|
|
if pd.notna(L) and value <= L: |
|
|
return "L" |
|
|
if pd.notna(HH) and value >= HH: |
|
|
return "HH" |
|
|
if pd.notna(H) and value >= H: |
|
|
return "H" |
|
|
return "OK" |
|
|
|
|
|
|
|
|
STATUS_COLOR = { |
|
|
"LL": "#2b6cb0", |
|
|
"L": "#63b3ed", |
|
|
"OK": "#a0aec0", |
|
|
"H": "#f6ad55", |
|
|
"HH": "#e53e3e", |
|
|
} |
|
|
|
|
|
|
|
|
LINE_COLOR = "#4a5568" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_trend_figs( |
|
|
df: pd.DataFrame, |
|
|
process_map: Dict[str, List[dict]], |
|
|
process_name: str, |
|
|
selected_items: List[str], |
|
|
thr_df: Optional[pd.DataFrame], |
|
|
thr_mode: str, |
|
|
date_min: Optional[str] = None, |
|
|
date_max: Optional[str] = None, |
|
|
) -> List[go.Figure]: |
|
|
""" |
|
|
計測項目タグごと(extract_measure_tag)に図を分けて生成。 |
|
|
selected_items は「2行目(ItemName)」の値。 |
|
|
""" |
|
|
if df is None or process_name is None or process_name == "": |
|
|
return [] |
|
|
|
|
|
|
|
|
recs = process_map.get(process_name, []) |
|
|
if not recs: |
|
|
return [] |
|
|
|
|
|
|
|
|
selected_items_set = set([normalize(x) for x in (selected_items or [])]) |
|
|
recs = [r for r in recs if normalize(r["item"]) in selected_items_set] |
|
|
if not recs: |
|
|
return [] |
|
|
|
|
|
|
|
|
dfw = df.copy() |
|
|
if date_min: |
|
|
dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)] |
|
|
if date_max: |
|
|
dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)] |
|
|
if dfw.empty: |
|
|
return [] |
|
|
|
|
|
|
|
|
thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {} |
|
|
|
|
|
|
|
|
groups: Dict[str, List[dict]] = {} |
|
|
for r in recs: |
|
|
tag = extract_measure_tag(r["item"]) |
|
|
groups.setdefault(tag, []).append(r) |
|
|
|
|
|
figs = [] |
|
|
for tag, cols in groups.items(): |
|
|
fig = go.Figure() |
|
|
|
|
|
for r in cols: |
|
|
col = r["col_tuple"] |
|
|
col_str = r["col_str"] |
|
|
if col not in dfw.columns: |
|
|
|
|
|
if col_str in dfw.columns: |
|
|
series = dfw[col_str] |
|
|
else: |
|
|
continue |
|
|
else: |
|
|
series = dfw[col] |
|
|
|
|
|
|
|
|
x = dfw["timestamp"] |
|
|
y = pd.to_numeric(series, errors="coerce") |
|
|
|
|
|
|
|
|
if thr_mode == "excel": |
|
|
key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"])) |
|
|
LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan)) |
|
|
|
|
|
if all(pd.isna(v) for v in [LL, L, H, HH]): |
|
|
LL, L, H, HH = auto_threshold(y) |
|
|
else: |
|
|
LL, L, H, HH = auto_threshold(y) |
|
|
|
|
|
|
|
|
colors = [] |
|
|
for v in y: |
|
|
if pd.isna(v): |
|
|
colors.append("rgba(0,0,0,0)") |
|
|
else: |
|
|
st = judge_status(v, LL, L, H, HH) |
|
|
colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"])) |
|
|
|
|
|
|
|
|
fig.add_trace(go.Scatter( |
|
|
x=x, y=y, mode="lines", |
|
|
name=f"{r['item']} ({r['id']})", |
|
|
line=dict(color=LINE_COLOR, width=1.5), |
|
|
hovertemplate="%{x}<br>%{y}<extra>"+f"{r['item']} ({r['id']})"+"</extra>" |
|
|
)) |
|
|
|
|
|
fig.add_trace(go.Scatter( |
|
|
x=x, y=y, mode="markers", |
|
|
name=f"{r['item']} markers", |
|
|
marker=dict(size=6, color=colors), |
|
|
showlegend=False, |
|
|
hovertemplate="%{x}<br>%{y}<extra></extra>" |
|
|
)) |
|
|
|
|
|
|
|
|
def add_hline(val, label): |
|
|
if pd.notna(val): |
|
|
fig.add_hline(y=float(val), line=dict(width=1, dash="dot"), |
|
|
annotation_text=label, annotation_position="top left") |
|
|
|
|
|
add_hline(LL, "LL") |
|
|
add_hline(L, "L") |
|
|
add_hline(H, "H") |
|
|
add_hline(HH, "HH") |
|
|
|
|
|
fig.update_layout( |
|
|
title=f"{process_name} | 計測項目: {tag}", |
|
|
xaxis_title="timestamp", |
|
|
yaxis_title=tag, |
|
|
legend_title="系列", |
|
|
margin=dict(l=10, r=10, t=40, b=10), |
|
|
hovermode="x unified", |
|
|
) |
|
|
figs.append(fig) |
|
|
|
|
|
return figs |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
G_DF: Optional[pd.DataFrame] = None |
|
|
G_PROCESS_MAP = {} |
|
|
G_PROCESSES = [] |
|
|
G_THRESHOLDS_DF: Optional[pd.DataFrame] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_default_csv(): |
|
|
""" |
|
|
起動時にデフォルトCSVが存在すれば読み込む。 |
|
|
""" |
|
|
global G_DF, G_PROCESS_MAP, G_PROCESSES |
|
|
if os.path.exists(DEFAULT_CSV_PATH): |
|
|
try: |
|
|
df = try_read_csv_3header(DEFAULT_CSV_PATH) |
|
|
G_DF = df |
|
|
G_PROCESS_MAP, G_PROCESSES = build_index_maps(df) |
|
|
return f"✅ 既定CSVを読み込みました: {DEFAULT_CSV_PATH}", gr.update(choices=G_PROCESSES, value=(G_PROCESSES[0] if G_PROCESSES else None)), G_PROCESSES |
|
|
except Exception as e: |
|
|
return f"⚠ 既定CSV読み込み失敗: {e}", gr.update(), [] |
|
|
return "ℹ CSVをアップロードしてください。", gr.update(), [] |
|
|
|
|
|
def on_csv_upload(file): |
|
|
""" |
|
|
CSVアップロード → パース → プロセス候補更新 |
|
|
""" |
|
|
global G_DF, G_PROCESS_MAP, G_PROCESSES |
|
|
if file is None: |
|
|
return "⚠ ファイルが選択されていません。", gr.update(choices=[]), [] |
|
|
try: |
|
|
df = try_read_csv_3header(file.name if hasattr(file, "name") else file) |
|
|
G_DF = df |
|
|
G_PROCESS_MAP, G_PROCESSES = build_index_maps(df) |
|
|
return f"✅ CSV読み込み: {df.shape[0]}行 × {df.shape[1]}列", gr.update(choices=G_PROCESSES, value=(G_PROCESSES[0] if G_PROCESSES else None)), G_PROCESSES |
|
|
except Exception as e: |
|
|
return f"❌ 読み込みエラー: {e}", gr.update(choices=[]), [] |
|
|
|
|
|
def on_thr_upload(file): |
|
|
""" |
|
|
しきい値Excelアップロード → メモリ更新 |
|
|
""" |
|
|
global G_THRESHOLDS_DF |
|
|
if file is None: |
|
|
G_THRESHOLDS_DF = None |
|
|
return "ℹ しきい値ファイルなし(自動しきい値が使われます)" |
|
|
try: |
|
|
thr = try_read_thresholds_excel(file.name if hasattr(file, "name") else file) |
|
|
G_THRESHOLDS_DF = thr |
|
|
return f"✅ しきい値を読み込みました({thr.shape[0]}件)" |
|
|
except Exception as e: |
|
|
G_THRESHOLDS_DF = None |
|
|
return f"❌ しきい値読み込みエラー: {e}" |
|
|
|
|
|
def update_items(process_name: str): |
|
|
""" |
|
|
プロセス選択に応じて、項目(2行目)候補を返す。 |
|
|
""" |
|
|
if not process_name or process_name not in G_PROCESS_MAP: |
|
|
return gr.update(choices=[], value=[]) |
|
|
items = sorted(list({rec["item"] for rec in G_PROCESS_MAP[process_name]}), key=lambda x: normalize(x)) |
|
|
|
|
|
return gr.update(choices=items, value=items) |
|
|
|
|
|
def render_figs(process_name: str, items: List[str], thr_mode: str, date_min, date_max): |
|
|
""" |
|
|
図を生成して返す(複数図)。GradioではList[plotly.Figure]を直接返せる。 |
|
|
""" |
|
|
if G_DF is None: |
|
|
return "⚠ データ未読み込み", [] |
|
|
if not process_name: |
|
|
return "⚠ プロセスを選択してください", [] |
|
|
if not items: |
|
|
return "⚠ 項目を選択してください", [] |
|
|
|
|
|
figs = make_trend_figs( |
|
|
G_DF, G_PROCESS_MAP, process_name, items, G_THRESHOLDS_DF, thr_mode, date_min, date_max |
|
|
) |
|
|
if not figs: |
|
|
return "⚠ 図を生成できませんでした(データ無し or 条件不一致)", [] |
|
|
return f"✅ {process_name}: {len(figs)}枚のトレンド図を生成しました(計測項目タグごと)", figs |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(css=""" |
|
|
.gradio-container {overflow: auto !important;} |
|
|
""") as demo: |
|
|
gr.Markdown("## トレンドグラフ専用アプリ(3行ヘッダー対応・プロセス別・計測項目タグ別・閾値色分け)") |
|
|
|
|
|
with gr.Row(): |
|
|
csv_uploader = gr.File(label="① 時系列CSV(3行ヘッダー)", file_count="single", file_types=[".csv"]) |
|
|
thr_uploader = gr.File(label="② 閾値Excel(任意: LL/L/H/HH)", file_count="single", file_types=[".xlsx", ".xls"]) |
|
|
|
|
|
with gr.Row(): |
|
|
thr_mode = gr.Radio( |
|
|
["excel(アップロード優先・無ければ自動)", "自動(平均±標準偏差)"], |
|
|
value="excel(アップロード優先・無ければ自動)", |
|
|
label="しきい値モード" |
|
|
) |
|
|
date_min = gr.Textbox(label="抽出開始日時(任意)例: 2024-07-01 00:00") |
|
|
date_max = gr.Textbox(label="抽出終了日時(任意)例: 2024-07-31 23:59") |
|
|
|
|
|
status_csv = gr.Markdown() |
|
|
status_thr = gr.Markdown() |
|
|
|
|
|
process_dd = gr.Dropdown(label="対象プロセス(3行ヘッダーの3行目)", choices=[]) |
|
|
items_cb = gr.CheckboxGroup(label="表示する項目(3行ヘッダーの2行目)", choices=[], value=[]) |
|
|
|
|
|
with gr.Row(): |
|
|
btn_render = gr.Button("トレンド図を生成", variant="primary") |
|
|
|
|
|
msg = gr.Markdown() |
|
|
plots = gr.Plotly(label="トレンド図(計測項目タグごとに複数枚)", height=540, every=1, interactive=True, show_label=True, scale=100, container=True, visible=True, elem_id="plot_container", elem_classes=["w-full"], ) |
|
|
|
|
|
|
|
|
|
|
|
init_msg, init_proc_update, _ = initialize_default_csv() |
|
|
status_csv.value = init_msg |
|
|
process_dd.value = init_proc_update.value |
|
|
process_dd.choices = init_proc_update.choices |
|
|
|
|
|
|
|
|
csv_uploader.change( |
|
|
on_csv_upload, |
|
|
inputs=[csv_uploader], |
|
|
outputs=[status_csv, process_dd, gr.State()], |
|
|
) |
|
|
|
|
|
|
|
|
thr_uploader.change( |
|
|
on_thr_upload, |
|
|
inputs=[thr_uploader], |
|
|
outputs=[status_thr], |
|
|
) |
|
|
|
|
|
|
|
|
process_dd.change( |
|
|
update_items, |
|
|
inputs=[process_dd], |
|
|
outputs=[items_cb], |
|
|
) |
|
|
|
|
|
|
|
|
def _thr_mode_key(s): |
|
|
return "excel" if s.startswith("excel") else "auto" |
|
|
|
|
|
btn_render.click( |
|
|
fn=lambda proc, items, mode, dmin, dmax: render_figs(proc, items, _thr_mode_key(mode), dmin, dmax), |
|
|
inputs=[process_dd, items_cb, thr_mode, date_min, date_max], |
|
|
outputs=[msg, plots], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.launch() |
|
|
|