import gradio as gr import pandas as pd import numpy as np import os import re from typing import Dict, Tuple, List, Optional import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.io as pio # ====================================== # 設定(添付CSVの既定パス:必要に応じて変更可) # ====================================== DEFAULT_CSV_PATH = "/mnt/data/mock_data_id_9999.csv" # ====================================== # ユーティリティ # ====================================== def normalize(s: str) -> str: return str(s).replace("\u3000", " ").replace("\n", "").replace("\r", "").strip() def try_read_csv_3header(path_or_file) -> pd.DataFrame: """ 3行ヘッダーCSVを読み込む(cp932/utf-8-sig フォールバック)。 1列目は timestamp として datetime 変換。 2列目以降は (ID, ItemName, ProcessName) の3段。 """ last_err = None for enc in ["cp932", "utf-8-sig", "utf-8"]: try: df = pd.read_csv(path_or_file, header=[0, 1, 2], encoding=enc) break except Exception as e: last_err = e df = None if df is None: raise last_err # 先頭列を timestamp に ts = pd.to_datetime(df.iloc[:, 0], errors="coerce") df = df.drop(df.columns[0], axis=1) df.insert(0, "timestamp", ts) # 列名はタプルのまま保持(timestampは str) # ただし内部処理用に文字列連結も作成できるように関数を用意 return df def col_tuple_to_str(col) -> str: if isinstance(col, tuple): return "_".join([str(x) for x in col if x]) return str(col) def build_index_maps(df: pd.DataFrame): """ プロセス(3行目=タプルの3つ目)→ 該当列情報 の辞書を作る。 各列は (col_tuple, id, item, process, col_str) """ process_map = {} for col in df.columns: if col == "timestamp": continue if isinstance(col, tuple) and len(col) >= 3: col_id, item_name, process_name = str(col[0]), str(col[1]), str(col[2]) else: # 非タプル(安全策) parts = str(col).split("_") if len(parts) >= 3: col_id, item_name, process_name = parts[0], "_".join(parts[1:-1]), parts[-1] else: # プロセスが分からない列はスキップ continue rec = { "col_tuple": col, "id": col_id, "item": item_name, "process": process_name, "col_str": col_tuple_to_str(col), } process_map.setdefault(process_name, []).append(rec) # プロセス候補・アイテム候補を返すために使う processes = sorted(list(process_map.keys()), key=lambda x: normalize(x)) return process_map, processes def extract_measure_tag(item_name: str) -> str: """ 項目名末尾の計測項目タグを抽出。 例: "処理水 有機物 分析値 [mg/L]" → "mg/L" "原水 TOC" → "TOC" "導電率(電気伝導度) [mS/cm]" → "mS/cm" 優先順: 1) [...] の中身 2) 全角/半角スペース区切りの末尾語(英字混在や記号含む) """ s = normalize(item_name) m = re.search(r"\[([^\[\]]+)\]\s*$", s) if m: return m.group(1).strip() # 角括弧がなければ末尾語 tokens = re.split(r"\s+", s) if tokens: return tokens[-1] return s # ====================================== # しきい値ハンドリング # ====================================== def try_read_thresholds_excel(file) -> Optional[pd.DataFrame]: """ しきい値Excel(任意)を読み込み。 想定カラム: ColumnID, ItemName, ProcessNo_ProcessName, LL, L, H, HH, Important(任意) """ if file is None: return None df = pd.read_excel(file) df.columns = [normalize(c) for c in df.columns] # 必須カラム確認(最低限) needed = {"ColumnID", "ItemName", "ProcessNo_ProcessName"} if not needed.issubset(set(df.columns)): # 列名が違う場合の簡易吸収 rename_map = {} for k in list(df.columns): nk = normalize(str(k)) if nk.lower() in ["columnid", "colid", "id"]: rename_map[k] = "ColumnID" elif nk.lower() in ["itemname", "item", "name"]: rename_map[k] = "ItemName" elif nk.lower() in ["processno_processname", "process", "processname"]: rename_map[k] = "ProcessNo_ProcessName" if rename_map: df = df.rename(columns=rename_map) # 数値化 for c in ["LL", "L", "H", "HH"]: if c in df.columns: df[c] = pd.to_numeric(df[c], errors="coerce") if "Important" in df.columns: df["Important"] = ( df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) ) return df def build_threshold_lookup(thr_df: Optional[pd.DataFrame]) -> Dict[Tuple[str, str, str], Tuple[float, float, float, float]]: """ キー: (ColumnID, ItemName, ProcessNo_ProcessName) → (LL, L, H, HH) """ lookup = {} if thr_df is None or thr_df.empty: return lookup for _, r in thr_df.iterrows(): colid = normalize(str(r.get("ColumnID", ""))) item = normalize(str(r.get("ItemName", ""))) proc = normalize(str(r.get("ProcessNo_ProcessName", ""))) LL = r.get("LL", np.nan) L = r.get("L", np.nan) H = r.get("H", np.nan) HH = r.get("HH", np.nan) lookup[(colid, item, proc)] = (LL, L, H, HH) return lookup def auto_threshold(series: pd.Series) -> Tuple[float, float, float, float]: """ 自動しきい値: mean ± std(LL/L/H/HH の2段に同じ幅を割当) 例: L=mean-std, LL=mean-2std, H=mean+std, HH=mean+2std """ s = series.dropna() if len(s) < 5: return (np.nan, np.nan, np.nan, np.nan) m = float(s.mean()) sd = float(s.std(ddof=1)) if len(s) >= 2 else 0.0 return (m - 2*sd, m - sd, m + sd, m + 2*sd) def judge_status(value, LL, L, H, HH) -> str: if pd.notna(LL) and value <= LL: return "LL" if pd.notna(L) and value <= L: return "L" if pd.notna(HH) and value >= HH: return "HH" if pd.notna(H) and value >= H: return "H" return "OK" # カラー(点の色):閾値逸脱を強調 STATUS_COLOR = { "LL": "#2b6cb0", # 青系 "L": "#63b3ed", # 水色 "OK": "#a0aec0", # グレー "H": "#f6ad55", # 橙 "HH": "#e53e3e", # 赤 } # 線色(系列ライン):列ごとに安定色 LINE_COLOR = "#4a5568" # 濃いグレー # ====================================== # 図作成(既存:グルーピングごとに個別のFigureを返す) # ====================================== def make_trend_figs( df: pd.DataFrame, process_map: Dict[str, List[dict]], process_name: str, selected_items: List[str], thr_df: Optional[pd.DataFrame], thr_mode: str, # "excel" or "auto" date_min: Optional[str] = None, date_max: Optional[str] = None, ) -> List[go.Figure]: """ 計測項目タグごと(extract_measure_tag)に図を分けて生成。 selected_items は「2行目(ItemName)」の値。 """ if df is None or process_name is None or process_name == "": return [] # 対象プロセスの列レコード recs = process_map.get(process_name, []) if not recs: return [] # 2行目(ItemName)で絞り込み selected_items_set = set([normalize(x) for x in (selected_items or [])]) recs = [r for r in recs if normalize(r["item"]) in selected_items_set] if not recs: return [] # 日付範囲フィルタ dfw = df.copy() if date_min: dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)] if date_max: dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)] if dfw.empty: return [] # しきい値参照 thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {} # 測定項目タグごとにグループ化 groups: Dict[str, List[dict]] = {} for r in recs: tag = extract_measure_tag(r["item"]) groups.setdefault(tag, []).append(r) figs = [] for tag, cols in groups.items(): fig = go.Figure() # 各列を描画 for r in cols: col = r["col_tuple"] col_str = r["col_str"] if col not in dfw.columns: # まれにヘッダー崩れなど if col_str in dfw.columns: series = dfw[col_str] else: continue else: series = dfw[col] # 値 x = dfw["timestamp"] y = pd.to_numeric(series, errors="coerce") # しきい値決定 if thr_mode == "excel": key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"])) LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan)) # Excelに見つからない場合は自動にフォールバック if all(pd.isna(v) for v in [LL, L, H, HH]): LL, L, H, HH = auto_threshold(y) else: LL, L, H, HH = auto_threshold(y) # 状態ごとに点色を決める colors = [] for v in y: if pd.isna(v): colors.append("rgba(0,0,0,0)") else: st = judge_status(v, LL, L, H, HH) colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"])) # 下地のライン(視認性のため薄色) fig.add_trace(go.Scatter( x=x, y=y, mode="lines", name=f"{r['item']} ({r['id']})", line=dict(color=LINE_COLOR, width=1.5), hovertemplate="%{x}
%{y}"+f"{r['item']} ({r['id']})"+"" )) # 色付きマーカーで逸脱強調 fig.add_trace(go.Scatter( x=x, y=y, mode="markers", name=f"{r['item']} markers", marker=dict(size=6, color=colors), showlegend=False, hovertemplate="%{x}
%{y}" )) # しきい値ガイド(あれば) def add_hline(val, label): if pd.notna(val): fig.add_hline(y=float(val), line=dict(width=1, dash="dot"), annotation_text=label, annotation_position="top left") add_hline(LL, "LL") add_hline(L, "L") add_hline(H, "H") add_hline(HH, "HH") fig.update_layout( title=f"{process_name} | 計測項目: {tag}", xaxis_title="timestamp", yaxis_title=tag, legend_title="系列", margin=dict(l=10, r=10, t=40, b=10), hovermode="x unified", ) figs.append(fig) return figs # ====================================== # 新規:サブプロット1枚でまとめる図 # ====================================== def make_trend_figure( df: pd.DataFrame, process_map: Dict[str, List[dict]], process_name: str, selected_items: List[str], thr_df: Optional[pd.DataFrame], thr_mode: str, # "excel" or "auto" date_min: Optional[str] = None, date_max: Optional[str] = None, ) -> Optional[go.Figure]: if df is None or not process_name: return None recs = process_map.get(process_name, []) if not recs: return None selected_items_set = set([normalize(x) for x in (selected_items or [])]) recs = [r for r in recs if normalize(r["item"]) in selected_items_set] if not recs: return None dfw = df.copy() if date_min: dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)] if date_max: dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)] if dfw.empty: return None thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {} # 計測項目タグでグルーピング groups: Dict[str, List[dict]] = {} for r in recs: tag = extract_measure_tag(r["item"]) groups.setdefault(tag, []).append(r) tags = list(groups.keys()) if not tags: return None rows = len(tags) # rows が多いときは、Plotly の制約: vertical_spacing <= 1/(rows-1) if rows <= 1: vspace = 0.03 else: max_vs = (1.0 / (rows - 1)) - 1e-4 # ほんの少しだけマージンを取る vspace = max(0.0, min(0.03, max_vs)) fig = make_subplots( rows=rows, cols=1, shared_xaxes=True, vertical_spacing=vspace, subplot_titles=[f"{process_name} | 計測項目: {t}" for t in tags] ) row_idx = 1 for tag in tags: cols = groups[tag] for r in cols: col = r["col_tuple"] col_str = r["col_str"] if col in dfw.columns: series = dfw[col] elif col_str in dfw.columns: series = dfw[col_str] else: continue x = dfw["timestamp"] y = pd.to_numeric(series, errors="coerce") if thr_mode == "excel": key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"])) LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan)) if all(pd.isna(v) for v in [LL, L, H, HH]): LL, L, H, HH = auto_threshold(y) else: LL, L, H, HH = auto_threshold(y) # ライン fig.add_trace( go.Scatter( x=x, y=y, mode="lines", name=f"{r['item']} ({r['id']})", line=dict(color=LINE_COLOR, width=1.5), hovertemplate="%{x}
%{y}"+f"{r['item']} ({r['id']})"+"" ), row=row_idx, col=1 ) # マーカー(色分け) colors = [] for v in y: if pd.isna(v): colors.append("rgba(0,0,0,0)") else: st = judge_status(v, LL, L, H, HH) colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"])) fig.add_trace( go.Scatter( x=x, y=y, mode="markers", name=f"{r['item']} markers", marker=dict(size=6, color=colors), showlegend=False, hovertemplate="%{x}
%{y}" ), row=row_idx, col=1 ) # しきい値ガイド for val, label in [(LL, "LL"), (L, "L"), (H, "H"), (HH, "HH")]: if pd.notna(val): fig.add_hline( y=float(val), line=dict(width=1, dash="dot"), annotation_text=label, annotation_position="top left", row=row_idx, col=1 ) row_idx += 1 fig.update_layout( title=f"{process_name} | 計測項目タグごとのトレンド", xaxis_title="timestamp", showlegend=True, margin=dict(l=10, r=10, t=40, b=10), hovermode="x unified", height=max(400, 260 * len(tags)), ) return fig # ====================================== # 新規:計測項目タグごとに個別Figure # ====================================== def make_trend_figs_by_tag( df: pd.DataFrame, process_map: Dict[str, List[dict]], process_name: str, selected_items: List[str], thr_df: Optional[pd.DataFrame], thr_mode: str, date_min: Optional[str] = None, date_max: Optional[str] = None, ) -> Dict[str, go.Figure]: if df is None or not process_name: return {} recs = process_map.get(process_name, []) if not recs: return {} selected_items_set = set([normalize(x) for x in (selected_items or [])]) recs = [r for r in recs if normalize(r["item"]) in selected_items_set] if not recs: return {} dfw = df.copy() if date_min: dfw = dfw[dfw["timestamp"] >= pd.to_datetime(date_min)] if date_max: dfw = dfw[dfw["timestamp"] <= pd.to_datetime(date_max)] if dfw.empty: return {} thr_lookup = build_threshold_lookup(thr_df) if thr_mode == "excel" else {} groups: Dict[str, List[dict]] = {} for r in recs: tag = extract_measure_tag(r["item"]) groups.setdefault(tag, []).append(r) out: Dict[str, go.Figure] = {} for tag, cols in groups.items(): fig = go.Figure() for r in cols: col = r["col_tuple"] col_str = r["col_str"] if col in dfw.columns: series = dfw[col] elif col_str in dfw.columns: series = dfw[col_str] else: continue x = dfw["timestamp"] y = pd.to_numeric(series, errors="coerce") if thr_mode == "excel": key = (normalize(r["id"]), normalize(r["item"]), normalize(r["process"])) LL, L, H, HH = thr_lookup.get(key, (np.nan, np.nan, np.nan, np.nan)) if all(pd.isna(v) for v in [LL, L, H, HH]): LL, L, H, HH = auto_threshold(y) else: LL, L, H, HH = auto_threshold(y) fig.add_trace(go.Scatter( x=x, y=y, mode="lines", name=f"{r['item']} ({r['id']})", line=dict(color=LINE_COLOR, width=1.5), hovertemplate="%{x}
%{y}"+f"{r['item']} ({r['id']})"+"" )) colors = [] for v in y: if pd.isna(v): colors.append("rgba(0,0,0,0)") else: st = judge_status(v, LL, L, H, HH) colors.append(STATUS_COLOR.get(st, STATUS_COLOR["OK"])) fig.add_trace(go.Scatter( x=x, y=y, mode="markers", name=f"{r['item']} markers", marker=dict(size=6, color=colors), showlegend=False, hovertemplate="%{x}
%{y}" )) for val, label in [(LL, "LL"), (L, "L"), (H, "H"), (HH, "HH")]: if pd.notna(val): fig.add_hline(y=float(val), line=dict(width=1, dash="dot"), annotation_text=label, annotation_position="top left") fig.update_layout( title=f"{process_name} | 計測項目: {tag}", xaxis_title="timestamp", yaxis_title=tag, legend_title="系列", margin=dict(l=10, r=10, t=40, b=10), hovermode="x unified", ) out[tag] = fig return out def figures_to_html(figs_by_tag: Dict[str, go.Figure]) -> str: """ 各 Figure を SVG に変換してインラインで並べる(