import io import os import sys import zipfile from datetime import date, timedelta import numpy as np import pandas as pd import panel as pn import plotly.express as px ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from panel_app.convert_to_excel_panel import write_dfs_to_excel from utils.utils_vars import get_physical_db pn.extension( "plotly", "tabulator", raw_css=[ ":fullscreen { background-color: white; overflow: auto; }", "::backdrop { background-color: white; }", ".plot-fullscreen-wrapper:fullscreen { padding: 20px; display: flex; flex-direction: column; }", ".plot-fullscreen-wrapper:fullscreen > * { height: 100% !important; width: 100% !important; }", ], ) def read_fileinput_to_df(file_input: pn.widgets.FileInput) -> pd.DataFrame | None: """Read a Panel FileInput (ZIP or CSV) into a DataFrame. Returns None if no file is provided. """ if file_input is None or not file_input.value: return None filename = (file_input.filename or "").lower() data = io.BytesIO(file_input.value) if filename.endswith(".zip"): with zipfile.ZipFile(data) as z: csv_files = [f for f in z.namelist() if f.lower().endswith(".csv")] if not csv_files: raise ValueError("No CSV file found in the ZIP archive") with z.open(csv_files[0]) as f: return pd.read_csv(f, encoding="latin1", sep=";", low_memory=False) elif filename.endswith(".csv"): return pd.read_csv(data, encoding="latin1", sep=";", low_memory=False) else: raise ValueError("Unsupported file format. Please upload a ZIP or CSV file.") def extract_code(name): name = name.replace(" ", "_") if isinstance(name, str) else None if name and len(name) >= 10: try: return int(name.split("_")[0]) except ValueError: return None return None def preprocess_2g(df: pd.DataFrame) -> pd.DataFrame: df = df[df["BCF name"].str.len() >= 10].copy() df["2g_data_trafic"] = ((df["TRAFFIC_PS DL"] + df["PS_UL_Load"]) / 1000).round(1) df.rename(columns={"2G_Carried Traffic": "2g_voice_trafic"}, inplace=True) df["code"] = df["BCF name"].apply(extract_code) df["code"] = pd.to_numeric(df["code"], errors="coerce") df = df[df["code"].notna()] df["code"] = df["code"].astype(int) date_format = ( "%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y" ) df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format) df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str) if "TCH availability ratio" in df.columns: df["2g_tch_avail"] = pd.to_numeric( df["TCH availability ratio"], errors="coerce" ) agg_dict = { "2g_data_trafic": "sum", "2g_voice_trafic": "sum", } if "2g_tch_avail" in df.columns: agg_dict["2g_tch_avail"] = "mean" df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict) return df def preprocess_3g(df: pd.DataFrame) -> pd.DataFrame: df = df[df["WBTS name"].str.len() >= 10].copy() df["code"] = df["WBTS name"].apply(extract_code) df["code"] = pd.to_numeric(df["code"], errors="coerce") df = df[df["code"].notna()] df["code"] = df["code"].astype(int) date_format = ( "%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y" ) df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format) df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str) df.rename( columns={ "Total CS traffic - Erl": "3g_voice_trafic", "Total_Data_Traffic": "3g_data_trafic", }, inplace=True, ) kpi_col = None for col in df.columns: if "cell availability" in str(col).lower(): kpi_col = col break if kpi_col is not None: df["3g_cell_avail"] = pd.to_numeric(df[kpi_col], errors="coerce") agg_dict = { "3g_voice_trafic": "sum", "3g_data_trafic": "sum", } if "3g_cell_avail" in df.columns: agg_dict["3g_cell_avail"] = "mean" df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict) return df def preprocess_lte(df: pd.DataFrame) -> pd.DataFrame: df = df[df["LNBTS name"].str.len() >= 10].copy() df["lte_data_trafic"] = ( df["4G/LTE DL Traffic Volume (GBytes)"] + df["4G/LTE UL Traffic Volume (GBytes)"] ) df["code"] = df["LNBTS name"].apply(extract_code) df["code"] = pd.to_numeric(df["code"], errors="coerce") df = df[df["code"].notna()] df["code"] = df["code"].astype(int) date_format = ( "%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y" ) df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format) df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str) if "Cell Avail excl BLU" in df.columns: df["lte_cell_avail"] = pd.to_numeric(df["Cell Avail excl BLU"], errors="coerce") agg_dict = {"lte_data_trafic": "sum"} if "lte_cell_avail" in df.columns: agg_dict["lte_cell_avail"] = "mean" df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict) return df def merge_and_compare(df_2g, df_3g, df_lte, pre_range, post_range, last_period_range): physical_db = get_physical_db() physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0] physical_db["code"] = ( pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int) ) physical_db = physical_db[["code", "Longitude", "Latitude", "City"]] physical_db = physical_db.drop_duplicates(subset="code") df = pd.merge(df_2g, df_3g, on=["date", "ID", "code"], how="outer") df = pd.merge(df, df_lte, on=["date", "ID", "code"], how="outer") for col in [ "2g_data_trafic", "2g_voice_trafic", "3g_voice_trafic", "3g_data_trafic", "lte_data_trafic", ]: if col not in df: df[col] = 0 kpi_masks = {} for kpi_col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"]: if kpi_col in df.columns: kpi_masks[kpi_col] = df[kpi_col].notna() df.fillna(0, inplace=True) for kpi_col, mask in kpi_masks.items(): df.loc[~mask, kpi_col] = np.nan df["total_voice_trafic"] = df["2g_voice_trafic"] + df["3g_voice_trafic"] df["total_data_trafic"] = ( df["2g_data_trafic"] + df["3g_data_trafic"] + df["lte_data_trafic"] ) df = pd.merge(df, physical_db, on=["code"], how="left") pre_start, pre_end = pd.to_datetime(pre_range[0]), pd.to_datetime(pre_range[1]) post_start, post_end = pd.to_datetime(post_range[0]), pd.to_datetime(post_range[1]) last_period_start, last_period_end = pd.to_datetime( last_period_range[0] ), pd.to_datetime(last_period_range[1]) last_period = df[ (df["date"] >= last_period_start) & (df["date"] <= last_period_end) ] def assign_period(x): if pre_start <= x <= pre_end: return "pre" if post_start <= x <= post_end: return "post" return "other" df["period"] = df["date"].apply(assign_period) comparison = df[df["period"].isin(["pre", "post"])] sum_pivot = ( comparison.groupby(["code", "period"])[ ["total_voice_trafic", "total_data_trafic"] ] .sum() .unstack() ) sum_pivot.columns = [f"{metric}_{period}" for metric, period in sum_pivot.columns] sum_pivot = sum_pivot.reset_index() sum_pivot["total_voice_trafic_diff"] = ( sum_pivot["total_voice_trafic_post"] - sum_pivot["total_voice_trafic_pre"] ) sum_pivot["total_data_trafic_diff"] = ( sum_pivot["total_data_trafic_post"] - sum_pivot["total_data_trafic_pre"] ) for metric in ["total_voice_trafic", "total_data_trafic"]: sum_pivot[f"{metric}_diff_pct"] = ( (sum_pivot.get(f"{metric}_post", 0) - sum_pivot.get(f"{metric}_pre", 0)) / sum_pivot.get(f"{metric}_pre", 1) ) * 100 sum_order = [ "code", "total_voice_trafic_pre", "total_voice_trafic_post", "total_voice_trafic_diff", "total_voice_trafic_diff_pct", "total_data_trafic_pre", "total_data_trafic_post", "total_data_trafic_diff", "total_data_trafic_diff_pct", ] sum_existing_cols = [col for col in sum_order if col in sum_pivot.columns] sum_remaining_cols = [ col for col in sum_pivot.columns if col not in sum_existing_cols ] sum_pivot = sum_pivot[sum_existing_cols + sum_remaining_cols] avg_pivot = ( comparison.groupby(["code", "period"])[ ["total_voice_trafic", "total_data_trafic"] ] .mean() .unstack() ) avg_pivot.columns = [f"{metric}_{period}" for metric, period in avg_pivot.columns] avg_pivot = avg_pivot.reset_index() avg_pivot["total_voice_trafic_diff"] = ( avg_pivot["total_voice_trafic_post"] - avg_pivot["total_voice_trafic_pre"] ) avg_pivot["total_data_trafic_diff"] = ( avg_pivot["total_data_trafic_post"] - avg_pivot["total_data_trafic_pre"] ) for metric in ["total_voice_trafic", "total_data_trafic"]: avg_pivot[f"{metric}_diff_pct"] = ( (avg_pivot.get(f"{metric}_post", 0) - avg_pivot.get(f"{metric}_pre", 0)) / avg_pivot.get(f"{metric}_pre", 1) ) * 100 avg_pivot = avg_pivot.rename( columns={ "total_voice_trafic_pre": "avg_voice_trafic_pre", "total_voice_trafic_post": "avg_voice_trafic_post", "total_voice_trafic_diff": "avg_voice_trafic_diff", "total_voice_trafic_diff_pct": "avg_voice_trafic_diff_pct", "total_data_trafic_pre": "avg_data_trafic_pre", "total_data_trafic_post": "avg_data_trafic_post", "total_data_trafic_diff": "avg_data_trafic_diff", "total_data_trafic_diff_pct": "avg_data_trafic_diff_pct", } ) avg_order = [ "code", "avg_voice_trafic_pre", "avg_voice_trafic_post", "avg_voice_trafic_diff", "avg_voice_trafic_diff_pct", "avg_data_trafic_pre", "avg_data_trafic_post", "avg_data_trafic_diff", "avg_data_trafic_diff_pct", ] avg_existing_cols = [col for col in avg_order if col in avg_pivot.columns] avg_remaining_cols = [ col for col in avg_pivot.columns if col not in avg_existing_cols ] avg_pivot = avg_pivot[avg_existing_cols + avg_remaining_cols] return df, last_period, sum_pivot.round(2), avg_pivot.round(2) def analyze_2g_availability(df: pd.DataFrame, sla_2g: float): avail_col = "2g_tch_avail" if avail_col not in df.columns or "period" not in df.columns: return None, None df_2g = df[df[avail_col].notna()].copy() df_2g = df_2g[df_2g["period"].isin(["pre", "post"])] if df_2g.empty: return None, None site_pivot = df_2g.groupby(["code", "period"])[avail_col].mean().unstack() site_pivot = site_pivot.rename( columns={"pre": "tch_avail_pre", "post": "tch_avail_post"} ) if "tch_avail_pre" not in site_pivot.columns: site_pivot["tch_avail_pre"] = pd.NA if "tch_avail_post" not in site_pivot.columns: site_pivot["tch_avail_post"] = pd.NA site_pivot["tch_avail_diff"] = ( site_pivot["tch_avail_post"] - site_pivot["tch_avail_pre"] ) site_pivot["pre_ok_vs_sla"] = site_pivot["tch_avail_pre"] >= sla_2g site_pivot["post_ok_vs_sla"] = site_pivot["tch_avail_post"] >= sla_2g site_pivot = site_pivot.reset_index() summary_rows = [] for period_label, col_name in [ ("pre", "tch_avail_pre"), ("post", "tch_avail_post"), ]: series = site_pivot[col_name].dropna() total_cells = series.shape[0] if total_cells == 0: summary_rows.append( { "period": period_label, "cells": 0, "avg_availability": pd.NA, "median_availability": pd.NA, "p05_availability": pd.NA, "p95_availability": pd.NA, "min_availability": pd.NA, "max_availability": pd.NA, "cells_ge_sla": 0, "cells_lt_sla": 0, "pct_cells_ge_sla": pd.NA, } ) continue cells_ge_sla = (series >= sla_2g).sum() cells_lt_sla = (series < sla_2g).sum() summary_rows.append( { "period": period_label, "cells": int(total_cells), "avg_availability": series.mean(), "median_availability": series.median(), "p05_availability": series.quantile(0.05), "p95_availability": series.quantile(0.95), "min_availability": series.min(), "max_availability": series.max(), "cells_ge_sla": int(cells_ge_sla), "cells_lt_sla": int(cells_lt_sla), "pct_cells_ge_sla": cells_ge_sla / total_cells * 100, } ) summary_df = pd.DataFrame(summary_rows) return summary_df, site_pivot def analyze_3g_availability(df: pd.DataFrame, sla_3g: float): avail_col = "3g_cell_avail" if avail_col not in df.columns or "period" not in df.columns: return None, None df_3g = df[df[avail_col].notna()].copy() df_3g = df_3g[df_3g["period"].isin(["pre", "post"])] if df_3g.empty: return None, None site_pivot = df_3g.groupby(["code", "period"])[avail_col].mean().unstack() site_pivot = site_pivot.rename( columns={"pre": "cell_avail_pre", "post": "cell_avail_post"} ) if "cell_avail_pre" not in site_pivot.columns: site_pivot["cell_avail_pre"] = pd.NA if "cell_avail_post" not in site_pivot.columns: site_pivot["cell_avail_post"] = pd.NA site_pivot["cell_avail_diff"] = ( site_pivot["cell_avail_post"] - site_pivot["cell_avail_pre"] ) site_pivot["pre_ok_vs_sla"] = site_pivot["cell_avail_pre"] >= sla_3g site_pivot["post_ok_vs_sla"] = site_pivot["cell_avail_post"] >= sla_3g site_pivot = site_pivot.reset_index() summary_rows = [] for period_label, col_name in [ ("pre", "cell_avail_pre"), ("post", "cell_avail_post"), ]: series = site_pivot[col_name].dropna() total_cells = series.shape[0] if total_cells == 0: summary_rows.append( { "period": period_label, "cells": 0, "avg_availability": pd.NA, "median_availability": pd.NA, "p05_availability": pd.NA, "p95_availability": pd.NA, "min_availability": pd.NA, "max_availability": pd.NA, "cells_ge_sla": 0, "cells_lt_sla": 0, "pct_cells_ge_sla": pd.NA, } ) continue cells_ge_sla = (series >= sla_3g).sum() cells_lt_sla = (series < sla_3g).sum() summary_rows.append( { "period": period_label, "cells": int(total_cells), "avg_availability": series.mean(), "median_availability": series.median(), "p05_availability": series.quantile(0.05), "p95_availability": series.quantile(0.95), "min_availability": series.min(), "max_availability": series.max(), "cells_ge_sla": int(cells_ge_sla), "cells_lt_sla": int(cells_lt_sla), "pct_cells_ge_sla": cells_ge_sla / total_cells * 100, } ) summary_df = pd.DataFrame(summary_rows) return summary_df, site_pivot def analyze_lte_availability(df: pd.DataFrame, sla_lte: float): avail_col = "lte_cell_avail" if avail_col not in df.columns or "period" not in df.columns: return None, None df_lte = df[df[avail_col].notna()].copy() df_lte = df_lte[df_lte["period"].isin(["pre", "post"])] if df_lte.empty: return None, None site_pivot = df_lte.groupby(["code", "period"])[avail_col].mean().unstack() site_pivot = site_pivot.rename( columns={"pre": "lte_avail_pre", "post": "lte_avail_post"} ) if "lte_avail_pre" not in site_pivot.columns: site_pivot["lte_avail_pre"] = pd.NA if "lte_avail_post" not in site_pivot.columns: site_pivot["lte_avail_post"] = pd.NA site_pivot["lte_avail_diff"] = ( site_pivot["lte_avail_post"] - site_pivot["lte_avail_pre"] ) site_pivot["pre_ok_vs_sla"] = site_pivot["lte_avail_pre"] >= sla_lte site_pivot["post_ok_vs_sla"] = site_pivot["lte_avail_post"] >= sla_lte site_pivot = site_pivot.reset_index() summary_rows = [] for period_label, col_name in [ ("pre", "lte_avail_pre"), ("post", "lte_avail_post"), ]: series = site_pivot[col_name].dropna() total_cells = series.shape[0] if total_cells == 0: summary_rows.append( { "period": period_label, "cells": 0, "avg_availability": pd.NA, "median_availability": pd.NA, "p05_availability": pd.NA, "p95_availability": pd.NA, "min_availability": pd.NA, "max_availability": pd.NA, "cells_ge_sla": 0, "cells_lt_sla": 0, "pct_cells_ge_sla": pd.NA, } ) continue cells_ge_sla = (series >= sla_lte).sum() cells_lt_sla = (series < sla_lte).sum() summary_rows.append( { "period": period_label, "cells": int(total_cells), "avg_availability": series.mean(), "median_availability": series.median(), "p05_availability": series.quantile(0.05), "p95_availability": series.quantile(0.95), "min_availability": series.min(), "max_availability": series.max(), "cells_ge_sla": int(cells_ge_sla), "cells_lt_sla": int(cells_lt_sla), "pct_cells_ge_sla": cells_ge_sla / total_cells * 100, } ) summary_df = pd.DataFrame(summary_rows) return summary_df, site_pivot def analyze_multirat_availability( df: pd.DataFrame, sla_2g: float, sla_3g: float, sla_lte: float ): if "period" not in df.columns: return None rat_cols = [] if "2g_tch_avail" in df.columns: rat_cols.append("2g_tch_avail") if "3g_cell_avail" in df.columns: rat_cols.append("3g_cell_avail") if "lte_cell_avail" in df.columns: rat_cols.append("lte_cell_avail") if not rat_cols: return None agg_dict = {col: "mean" for col in rat_cols} df_pre = df[df["period"] == "pre"] df_post = df[df["period"] == "post"] pre = df_pre.groupby("code", as_index=False).agg(agg_dict) post = df_post.groupby("code", as_index=False).agg(agg_dict) rename_map_pre = { "2g_tch_avail": "2g_avail_pre", "3g_cell_avail": "3g_avail_pre", "lte_cell_avail": "lte_avail_pre", } rename_map_post = { "2g_tch_avail": "2g_avail_post", "3g_cell_avail": "3g_avail_post", "lte_cell_avail": "lte_avail_post", } pre = pre.rename(columns=rename_map_pre) post = post.rename(columns=rename_map_post) multi = pd.merge(pre, post, on="code", how="outer") if not df_post.empty and { "total_voice_trafic", "total_data_trafic", }.issubset(df_post.columns): post_traffic = ( df_post.groupby("code", as_index=False)[ ["total_voice_trafic", "total_data_trafic"] ] .sum() .rename( columns={ "total_voice_trafic": "post_total_voice_trafic", "total_data_trafic": "post_total_data_trafic", } ) ) multi = pd.merge(multi, post_traffic, on="code", how="left") if "City" in df.columns: city_df = df[["code", "City"]].drop_duplicates("code") multi = pd.merge(multi, city_df, on="code", how="left") def _ok_flag(series: pd.Series, sla: float) -> pd.Series: if series.name not in multi.columns: return pd.Series([pd.NA] * len(multi), index=multi.index) ok = multi[series.name] >= sla ok = ok.where(multi[series.name].notna(), pd.NA) return ok if "2g_avail_post" in multi.columns: multi["ok_2g_post"] = _ok_flag(multi["2g_avail_post"], sla_2g) if "3g_avail_post" in multi.columns: multi["ok_3g_post"] = _ok_flag(multi["3g_avail_post"], sla_3g) if "lte_avail_post" in multi.columns: multi["ok_lte_post"] = _ok_flag(multi["lte_avail_post"], sla_lte) def classify_row(row): rats_status = [] for rat, col in [ ("2G", "ok_2g_post"), ("3G", "ok_3g_post"), ("LTE", "ok_lte_post"), ]: if col in row and not pd.isna(row[col]): rats_status.append((rat, bool(row[col]))) if not rats_status: return "No RAT data" bad_rats = [rat for rat, ok in rats_status if not ok] if not bad_rats: return "OK all RAT" if len(bad_rats) == 1: return f"Degraded {bad_rats[0]} only" return "Degraded multi-RAT (" + ",".join(bad_rats) + ")" multi["post_multirat_status"] = multi.apply(classify_row, axis=1) ordered_cols = ["code"] if "City" in multi.columns: ordered_cols.append("City") for col in [ "2g_avail_pre", "2g_avail_post", "3g_avail_pre", "3g_avail_post", "lte_avail_pre", "lte_avail_post", "post_total_voice_trafic", "post_total_data_trafic", "ok_2g_post", "ok_3g_post", "ok_lte_post", "post_multirat_status", ]: if col in multi.columns: ordered_cols.append(col) remaining_cols = [c for c in multi.columns if c not in ordered_cols] multi = multi[ordered_cols + remaining_cols] return multi def analyze_persistent_availability( df: pd.DataFrame, multi_rat_df: pd.DataFrame, sla_2g: float, sla_3g: float, sla_lte: float, min_consecutive_days: int = 3, ) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() if "date" not in df.columns or "code" not in df.columns: return pd.DataFrame() work_df = df.copy() work_df["date_only"] = work_df["date"].dt.date site_stats = {} def _update_stats(rat_key_prefix: str, grouped: pd.DataFrame, sla: float) -> None: if grouped.empty: return for code, group in grouped.groupby("code"): group = group.sort_values("date_only") dates = pd.to_datetime(group["date_only"]).tolist() below_flags = (group["value"] < sla).tolist() max_streak = 0 current_streak = 0 total_below = 0 last_date = None for flag, current_date in zip(below_flags, dates): if flag: total_below += 1 if ( last_date is not None and current_date == last_date + timedelta(days=1) and current_streak > 0 ): current_streak += 1 else: current_streak = 1 if current_streak > max_streak: max_streak = current_streak else: current_streak = 0 last_date = current_date stats = site_stats.setdefault( code, { "code": code, "max_streak_2g": 0, "max_streak_3g": 0, "max_streak_lte": 0, "below_days_2g": 0, "below_days_3g": 0, "below_days_lte": 0, }, ) stats[f"max_streak_{rat_key_prefix}"] = max_streak stats[f"below_days_{rat_key_prefix}"] = total_below for rat_col, rat_key, sla in [ ("2g_tch_avail", "2g", sla_2g), ("3g_cell_avail", "3g", sla_3g), ("lte_cell_avail", "lte", sla_lte), ]: if rat_col in work_df.columns: g = ( work_df.dropna(subset=[rat_col]) .groupby(["code", "date_only"])[rat_col] .mean() .reset_index() ) g = g.rename(columns={rat_col: "value"}) _update_stats(rat_key, g, sla) if not site_stats: return pd.DataFrame() rows = [] for code, s in site_stats.items(): max_2g = s.get("max_streak_2g", 0) max_3g = s.get("max_streak_3g", 0) max_lte = s.get("max_streak_lte", 0) below_2g = s.get("below_days_2g", 0) below_3g = s.get("below_days_3g", 0) below_lte = s.get("below_days_lte", 0) persistent_2g = max_2g >= min_consecutive_days if max_2g else False persistent_3g = max_3g >= min_consecutive_days if max_3g else False persistent_lte = max_lte >= min_consecutive_days if max_lte else False total_below_any = below_2g + below_3g + below_lte persistent_any = persistent_2g or persistent_3g or persistent_lte rats_persistent_count = sum( [persistent_2g is True, persistent_3g is True, persistent_lte is True] ) rows.append( { "code": code, "persistent_issue_2g": persistent_2g, "persistent_issue_3g": persistent_3g, "persistent_issue_lte": persistent_lte, "max_consecutive_days_2g": max_2g, "max_consecutive_days_3g": max_3g, "max_consecutive_days_lte": max_lte, "total_below_days_2g": below_2g, "total_below_days_3g": below_3g, "total_below_days_lte": below_lte, "total_below_days_any": total_below_any, "persistent_issue_any": persistent_any, "persistent_rats_count": rats_persistent_count, } ) result = pd.DataFrame(rows) result = result[result["persistent_issue_any"] == True] if result.empty: return result if multi_rat_df is not None and not multi_rat_df.empty: cols_to_merge = [ c for c in [ "code", "City", "post_total_voice_trafic", "post_total_data_trafic", "post_multirat_status", ] if c in multi_rat_df.columns ] if cols_to_merge: result = pd.merge( result, multi_rat_df[cols_to_merge].drop_duplicates("code"), on="code", how="left", ) if "post_total_data_trafic" not in result.columns: result["post_total_data_trafic"] = 0.0 result["criticity_score"] = ( result["post_total_data_trafic"].fillna(0) * 1.0 + result["total_below_days_any"].fillna(0) * 100.0 + result["persistent_rats_count"].fillna(0) * 1000.0 ) result = result.sort_values( by=["criticity_score", "total_below_days_any"], ascending=[False, False] ) return result def monthly_data_analysis(df: pd.DataFrame): df["date"] = pd.to_datetime(df["date"]) df["month_year"] = df["date"].dt.to_period("M").astype(str) voice_trafic = df.pivot_table( index="code", columns="month_year", values="total_voice_trafic", aggfunc="sum", fill_value=0, ) voice_trafic = voice_trafic.reindex(sorted(voice_trafic.columns), axis=1) data_trafic = df.pivot_table( index="code", columns="month_year", values="total_data_trafic", aggfunc="sum", fill_value=0, ) data_trafic = data_trafic.reindex(sorted(data_trafic.columns), axis=1) return voice_trafic, data_trafic # -------------------------------------------------------------------------------------- # Global state for drill-down views & export # -------------------------------------------------------------------------------------- current_full_df: pd.DataFrame | None = None current_last_period_df: pd.DataFrame | None = None current_analysis_df: pd.DataFrame | None = None current_analysis_last_period_df: pd.DataFrame | None = None current_multi_rat_df: pd.DataFrame | None = None current_persistent_df: pd.DataFrame | None = None current_site_2g_avail: pd.DataFrame | None = None current_site_3g_avail: pd.DataFrame | None = None current_site_lte_avail: pd.DataFrame | None = None current_summary_2g_avail: pd.DataFrame | None = None current_summary_3g_avail: pd.DataFrame | None = None current_summary_lte_avail: pd.DataFrame | None = None current_monthly_voice_df: pd.DataFrame | None = None current_monthly_data_df: pd.DataFrame | None = None current_sum_pre_post_df: pd.DataFrame | None = None current_avg_pre_post_df: pd.DataFrame | None = None current_availability_summary_all_df: pd.DataFrame | None = None current_export_multi_rat_df: pd.DataFrame | None = None current_export_persistent_df: pd.DataFrame | None = None current_export_bytes: bytes | None = None # -------------------------------------------------------------------------------------- # Widgets # -------------------------------------------------------------------------------------- PLOTLY_CONFIG = {"displaylogo": False, "scrollZoom": True, "displayModeBar": True} file_2g = pn.widgets.FileInput(name="2G Traffic Report", accept=".csv,.zip") file_3g = pn.widgets.FileInput(name="3G Traffic Report", accept=".csv,.zip") file_lte = pn.widgets.FileInput(name="LTE Traffic Report", accept=".csv,.zip") pre_range = pn.widgets.DateRangePicker(name="Pre-period (from - to)") post_range = pn.widgets.DateRangePicker(name="Post-period (from - to)") last_range = pn.widgets.DateRangePicker(name="Last period (from - to)") sla_2g = pn.widgets.FloatInput(name="2G TCH availability SLA (%)", value=98.0, step=0.1) sla_3g = pn.widgets.FloatInput( name="3G Cell availability SLA (%)", value=98.0, step=0.1 ) sla_lte = pn.widgets.FloatInput( name="LTE Cell availability SLA (%)", value=98.0, step=0.1 ) number_of_top_trafic_sites = pn.widgets.IntInput( name="Number of top traffic sites", value=25 ) min_persistent_days_widget = pn.widgets.IntInput( name="Minimum consecutive days below SLA to flag persistent issue", value=3, ) top_critical_n_widget = pn.widgets.IntInput( name="Number of top critical sites to display", value=25 ) run_button = pn.widgets.Button(name="Run analysis", button_type="primary") status_pane = pn.pane.Alert( "Upload the 3 reports, select the 3 periods and click 'Run analysis'", alert_type="primary", ) summary_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) sum_pre_post_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) summary_2g_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) worst_2g_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) summary_3g_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) worst_3g_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) summary_lte_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) worst_lte_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) multi_rat_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) persistent_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) site_select = pn.widgets.AutocompleteInput( name="Select a site for detailed view (Type to search)", options={}, case_sensitive=False, search_strategy="includes", restrict=True, placeholder="Type site code or city...", ) site_traffic_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-site-traffic"], ) site_traffic_plot = pn.Column( site_traffic_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "site-traffic-wrapper"], ) site_avail_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-site-avail"], ) site_avail_plot = pn.Column( site_avail_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "site-avail-wrapper"], ) site_degraded_table = pn.widgets.Tabulator( height=200, sizing_mode="stretch_width", layout="fit_data_table", ) city_select = pn.widgets.AutocompleteInput( name="Select a City for aggregated view (Type to search)", options=[], case_sensitive=False, search_strategy="includes", restrict=True, placeholder="Type city name...", ) city_traffic_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-city-traffic"], ) city_traffic_plot = pn.Column( city_traffic_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "city-traffic-wrapper"], ) city_avail_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-city-avail"], ) city_avail_plot = pn.Column( city_avail_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "city-avail-wrapper"], ) city_degraded_table = pn.widgets.Tabulator( height=200, sizing_mode="stretch_width", layout="fit_data_table", ) daily_avail_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-daily-avail"], ) daily_avail_plot = pn.Column( daily_avail_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "daily-avail-wrapper"], ) daily_degraded_table = pn.widgets.Tabulator( height=200, sizing_mode="stretch_width", layout="fit_data_table", ) top_data_sites_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) top_voice_sites_table = pn.widgets.Tabulator( height=250, sizing_mode="stretch_width", layout="fit_data_table", ) top_data_bar_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-top-data"], ) top_data_bar_plot = pn.Column( top_data_bar_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "top-data-bar-wrapper"], ) top_voice_bar_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-top-voice"], ) top_voice_bar_plot = pn.Column( top_voice_bar_plot_pane, height=400, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "top-voice-bar-wrapper"], ) data_map_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-data-map"], ) data_map_plot = pn.Column( data_map_plot_pane, height=500, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "data-map-wrapper"], ) voice_map_plot_pane = pn.pane.Plotly( sizing_mode="stretch_both", config=PLOTLY_CONFIG, css_classes=["fullscreen-target-voice-map"], ) voice_map_plot = pn.Column( voice_map_plot_pane, height=500, sizing_mode="stretch_width", css_classes=["plot-fullscreen-wrapper", "voice-map-wrapper"], ) # Fullscreen helper logic has been replaced by client-side JS. # Fullscreen buttons for each Plotly plot site_traffic_fullscreen_btn = pn.widgets.Button( name="Full screen site traffic", button_type="default" ) site_avail_fullscreen_btn = pn.widgets.Button( name="Full screen site availability", button_type="default" ) city_traffic_fullscreen_btn = pn.widgets.Button( name="Full screen city traffic", button_type="default" ) city_avail_fullscreen_btn = pn.widgets.Button( name="Full screen city availability", button_type="default" ) daily_avail_fullscreen_btn = pn.widgets.Button( name="Full screen daily availability", button_type="default" ) top_data_fullscreen_btn = pn.widgets.Button( name="Full screen top data bar", button_type="default" ) top_voice_fullscreen_btn = pn.widgets.Button( name="Full screen top voice bar", button_type="default" ) data_map_fullscreen_btn = pn.widgets.Button( name="Full screen data map", button_type="default" ) voice_map_fullscreen_btn = pn.widgets.Button( name="Full screen voice map", button_type="default" ) multi_rat_download = pn.widgets.FileDownload( label="Download Multi-RAT table (CSV)", filename="multi_rat_availability.csv", button_type="default", ) persistent_download = pn.widgets.FileDownload( label="Download persistent issues (CSV)", filename="persistent_issues.csv", button_type="default", ) top_data_download = pn.widgets.FileDownload( label="Download top data sites (CSV)", filename="top_data_sites.csv", button_type="default", ) top_voice_download = pn.widgets.FileDownload( label="Download top voice sites (CSV)", filename="top_voice_sites.csv", button_type="default", ) export_button = pn.widgets.FileDownload( label="Download the Analysis Report", filename="Global_Trafic_Analysis_Report.xlsx", button_type="primary", ) # -------------------------------------------------------------------------------------- # Callback # -------------------------------------------------------------------------------------- def _validate_date_range(rng: tuple[date, date] | list[date], label: str) -> None: if not rng or len(rng) != 2: raise ValueError(f"Please select 2 dates for {label}.") if rng[0] is None or rng[1] is None: raise ValueError(f"Please select valid dates for {label}.") def run_analysis(event=None): # event param required by on_click try: status_pane.object = "Running analysis..." status_pane.alert_type = "primary" global current_full_df, current_last_period_df global current_analysis_df, current_analysis_last_period_df global current_multi_rat_df, current_persistent_df global current_site_2g_avail, current_site_3g_avail, current_site_lte_avail global current_summary_2g_avail, current_summary_3g_avail, current_summary_lte_avail global current_monthly_voice_df, current_monthly_data_df global current_sum_pre_post_df, current_avg_pre_post_df global current_availability_summary_all_df global current_export_multi_rat_df, current_export_persistent_df global current_export_bytes # Basic validations if not (file_2g.value and file_3g.value and file_lte.value): raise ValueError("Please upload all 3 traffic reports (2G, 3G, LTE).") _validate_date_range(pre_range.value, "pre-period") _validate_date_range(post_range.value, "post-period") _validate_date_range(last_range.value, "last period") # Simple check on overlapping pre/post (same logic as Streamlit version, but lighter) pre_start, pre_end = pre_range.value post_start, post_end = post_range.value if pre_start == post_start and pre_end == post_end: raise ValueError("Pre and post periods are the same.") if pre_start < post_start and pre_end > post_end: raise ValueError("Pre and post periods are overlapping.") df_2g = read_fileinput_to_df(file_2g) df_3g = read_fileinput_to_df(file_3g) df_lte = read_fileinput_to_df(file_lte) if df_2g is None or df_3g is None or df_lte is None: raise ValueError("Failed to read one or more input files.") summary = pd.DataFrame( { "Dataset": ["2G", "3G", "LTE"], "Rows": [len(df_2g), len(df_3g), len(df_lte)], "Columns": [df_2g.shape[1], df_3g.shape[1], df_lte.shape[1]], } ) summary_table.value = summary df_2g_clean = preprocess_2g(df_2g) df_3g_clean = preprocess_3g(df_3g) df_lte_clean = preprocess_lte(df_lte) full_df, last_period, sum_pre_post_analysis, avg_pre_post_analysis = ( merge_and_compare( df_2g_clean, df_3g_clean, df_lte_clean, pre_range.value, post_range.value, last_range.value, ) ) monthly_voice_df, monthly_data_df = monthly_data_analysis(full_df) analysis_df = full_df # Persist global state for later drill-down / export current_full_df = full_df current_last_period_df = last_period current_analysis_df = analysis_df current_analysis_last_period_df = last_period current_monthly_voice_df = monthly_voice_df current_monthly_data_df = monthly_data_df current_sum_pre_post_df = sum_pre_post_analysis current_avg_pre_post_df = avg_pre_post_analysis sum_pre_post_table.value = sum_pre_post_analysis summary_2g_avail, site_2g_avail = analyze_2g_availability( analysis_df, float(sla_2g.value) ) if summary_2g_avail is not None: summary_2g_table.value = summary_2g_avail.round(2) worst_sites_2g = site_2g_avail.sort_values("tch_avail_post").head(25) worst_2g_table.value = worst_sites_2g.round(2) else: summary_2g_table.value = pd.DataFrame() worst_2g_table.value = pd.DataFrame() current_summary_2g_avail = summary_2g_avail current_site_2g_avail = site_2g_avail if summary_2g_avail is not None else None summary_3g_avail, site_3g_avail = analyze_3g_availability( analysis_df, float(sla_3g.value) ) if summary_3g_avail is not None: summary_3g_table.value = summary_3g_avail.round(2) worst_sites_3g = site_3g_avail.sort_values("cell_avail_post").head(25) worst_3g_table.value = worst_sites_3g.round(2) else: summary_3g_table.value = pd.DataFrame() worst_3g_table.value = pd.DataFrame() current_summary_3g_avail = summary_3g_avail current_site_3g_avail = site_3g_avail if summary_3g_avail is not None else None summary_lte_avail, site_lte_avail = analyze_lte_availability( analysis_df, float(sla_lte.value) ) if summary_lte_avail is not None: summary_lte_table.value = summary_lte_avail.round(2) worst_sites_lte = site_lte_avail.sort_values("lte_avail_post").head(25) worst_lte_table.value = worst_sites_lte.round(2) else: summary_lte_table.value = pd.DataFrame() worst_lte_table.value = pd.DataFrame() current_summary_lte_avail = summary_lte_avail current_site_lte_avail = ( site_lte_avail if summary_lte_avail is not None else None ) # Build availability summary across RATs for export availability_frames = [] if summary_2g_avail is not None: tmp = summary_2g_avail.copy() tmp["RAT"] = "2G" availability_frames.append(tmp) if summary_3g_avail is not None: tmp = summary_3g_avail.copy() tmp["RAT"] = "3G" availability_frames.append(tmp) if summary_lte_avail is not None: tmp = summary_lte_avail.copy() tmp["RAT"] = "LTE" availability_frames.append(tmp) current_availability_summary_all_df = ( pd.concat(availability_frames, ignore_index=True) if availability_frames else pd.DataFrame() ) multi_rat_df = analyze_multirat_availability( analysis_df, float(sla_2g.value), float(sla_3g.value), float(sla_lte.value), ) if multi_rat_df is not None: multi_rat_table.value = multi_rat_df.round(2) else: multi_rat_table.value = pd.DataFrame() current_multi_rat_df = multi_rat_df if multi_rat_df is not None else None # Persistent availability (UI uses configurable threshold, export keeps 3 days) persistent_df = pd.DataFrame() if multi_rat_df is not None: persistent_df = analyze_persistent_availability( analysis_df, multi_rat_df, float(sla_2g.value), float(sla_3g.value), float(sla_lte.value), int(min_persistent_days_widget.value), ) current_persistent_df = ( persistent_df if persistent_df is not None and not persistent_df.empty else None ) # Export-specific multi-RAT & persistent (based on full_df as in Streamlit app) export_multi_rat_base = analyze_multirat_availability( full_df, float(sla_2g.value), float(sla_3g.value), float(sla_lte.value), ) current_export_multi_rat_df = ( export_multi_rat_base if export_multi_rat_base is not None else pd.DataFrame() ) export_persistent_tmp = pd.DataFrame() if export_multi_rat_base is not None: export_persistent_tmp = analyze_persistent_availability( full_df, export_multi_rat_base, float(sla_2g.value), float(sla_3g.value), float(sla_lte.value), 3, ) current_export_persistent_df = ( export_persistent_tmp if export_persistent_tmp is not None and not export_persistent_tmp.empty else pd.DataFrame() ) # Precompute export bytes so the download button is instant current_export_bytes = _build_export_bytes() # Update all drill-down & map views _update_site_controls() _update_city_controls() _update_daily_availability_view() _update_top_sites_and_maps() _update_persistent_table_view() status_pane.alert_type = "success" status_pane.object = "Analysis completed." except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error: {exc}" run_button.on_click(run_analysis) def _update_site_controls() -> None: """Populate site selection widget based on current_analysis_df and refresh view.""" if current_analysis_df is None or current_analysis_df.empty: site_select.options = {} site_select.value = None site_traffic_plot_pane.object = None site_avail_plot_pane.object = None site_degraded_table.value = pd.DataFrame() return sites_df = ( current_analysis_df[["code", "City"]] .drop_duplicates() .sort_values(by=["City", "code"]) ) options: dict[str, int] = {} for _, row in sites_df.iterrows(): label = ( f"{row['City']}_{row['code']}" if pd.notna(row["City"]) else str(row["code"]) ) options[label] = int(row["code"]) site_select.options = options if options and site_select.value not in options.values(): # When options is a dict, Select.value is the mapped value (code) site_select.value = next(iter(options.values())) _update_site_view() def _update_site_view(event=None) -> None: # noqa: D401, ARG001 """Update site drill-down plots and table from current_analysis_df and site_select.""" if current_analysis_df is None or current_analysis_df.empty: site_traffic_plot_pane.object = None site_avail_plot_pane.object = None site_degraded_table.value = pd.DataFrame() return selected_code = site_select.value if selected_code is None: site_traffic_plot_pane.object = None site_avail_plot_pane.object = None site_degraded_table.value = pd.DataFrame() return site_detail_df = current_analysis_df[ current_analysis_df["code"] == int(selected_code) ].copy() if site_detail_df.empty: site_traffic_plot_pane.object = None site_avail_plot_pane.object = None site_degraded_table.value = pd.DataFrame() return site_detail_df = site_detail_df.sort_values("date") # Traffic over time traffic_cols = [ col for col in ["total_voice_trafic", "total_data_trafic"] if col in site_detail_df.columns ] first_row = site_detail_df.iloc[0] site_label = f"{first_row['code']}" if pd.notna(first_row.get("City")): site_label += f" ({first_row['City']})" if traffic_cols: traffic_long = site_detail_df[["date"] + traffic_cols].melt( id_vars="date", value_vars=traffic_cols, var_name="metric", value_name="value", ) fig_traffic = px.line( traffic_long, x="date", y="value", color="metric", color_discrete_sequence=px.colors.qualitative.Plotly, ) fig_traffic.update_layout( title=f"Traffic Evolution - Site: {site_label}", template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) site_traffic_plot_pane.object = fig_traffic else: site_traffic_plot_pane.object = None # Availability over time per RAT avail_cols: list[str] = [] rename_map: dict[str, str] = {} if "2g_tch_avail" in site_detail_df.columns: avail_cols.append("2g_tch_avail") rename_map["2g_tch_avail"] = "2G" if "3g_cell_avail" in site_detail_df.columns: avail_cols.append("3g_cell_avail") rename_map["3g_cell_avail"] = "3G" if "lte_cell_avail" in site_detail_df.columns: avail_cols.append("lte_cell_avail") rename_map["lte_cell_avail"] = "LTE" if avail_cols: avail_df = site_detail_df[["date"] + avail_cols].copy() avail_df = avail_df.rename(columns=rename_map) value_cols = [c for c in avail_df.columns if c != "date"] avail_long = avail_df.melt( id_vars="date", value_vars=value_cols, var_name="RAT", value_name="availability", ) fig_avail = px.line( avail_long, x="date", y="availability", color="RAT", color_discrete_sequence=px.colors.qualitative.Plotly, ) fig_avail.update_layout( title=f"Availability vs SLA - Site: {site_label}", template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) site_avail_plot_pane.object = fig_avail # Days with availability below SLA per RAT site_detail_df["date_only"] = site_detail_df["date"].dt.date degraded_rows_site: list[dict] = [] for rat_col, rat_name, sla_value in [ ("2g_tch_avail", "2G", float(sla_2g.value)), ("3g_cell_avail", "3G", float(sla_3g.value)), ("lte_cell_avail", "LTE", float(sla_lte.value)), ]: if rat_col in site_detail_df.columns: daily_site = ( site_detail_df.groupby("date_only")[rat_col].mean().dropna() ) mask = daily_site < sla_value for d, val in daily_site[mask].items(): degraded_rows_site.append( { "RAT": rat_name, "date": d, "avg_availability": val, "SLA": sla_value, } ) if degraded_rows_site: degraded_site_df = pd.DataFrame(degraded_rows_site) site_degraded_table.value = degraded_site_df.round(2) else: site_degraded_table.value = pd.DataFrame() else: site_avail_plot_pane.object = None site_degraded_table.value = pd.DataFrame() def _update_city_controls() -> None: """Populate city selection widget based on current_analysis_df and refresh view.""" if current_analysis_df is None or current_analysis_df.empty: city_select.options = [] city_select.value = None city_traffic_plot_pane.object = None city_avail_plot_pane.object = None city_degraded_table.value = pd.DataFrame() return if ( "City" not in current_analysis_df.columns or not current_analysis_df["City"].notna().any() ): city_select.options = [] city_select.value = None city_traffic_plot_pane.object = None city_avail_plot_pane.object = pd.DataFrame() city_degraded_table.value = pd.DataFrame() return cities_df = ( current_analysis_df[["City"]].dropna().drop_duplicates().sort_values(by="City") ) options = cities_df["City"].tolist() city_select.options = options if options and city_select.value not in options: city_select.value = options[0] _update_city_view() def _update_city_view(event=None) -> None: # noqa: D401, ARG001 """Update city drill-down plots and degraded days table based on city_select.""" if current_analysis_df is None or current_analysis_df.empty: city_traffic_plot_pane.object = None city_avail_plot_pane.object = None city_degraded_table.value = pd.DataFrame() return selected_city = city_select.value if not selected_city: city_traffic_plot_pane.object = None city_avail_plot_pane.object = None city_degraded_table.value = pd.DataFrame() return city_detail_df = current_analysis_df[ current_analysis_df["City"] == selected_city ].copy() if city_detail_df.empty: city_traffic_plot_pane.object = None city_avail_plot_pane.object = None city_degraded_table.value = pd.DataFrame() return city_detail_df = city_detail_df.sort_values("date") # Traffic aggregated at city level traffic_cols_city = [ col for col in ["total_voice_trafic", "total_data_trafic"] if col in city_detail_df.columns ] if traffic_cols_city: city_traffic = ( city_detail_df.groupby("date")[traffic_cols_city].sum().reset_index() ) traffic_long_city = city_traffic.melt( id_vars="date", value_vars=traffic_cols_city, var_name="metric", value_name="value", ) fig_traffic_city = px.line( traffic_long_city, x="date", y="value", color="metric", color_discrete_sequence=px.colors.qualitative.Plotly, ) fig_traffic_city.update_layout( title=f"Total Traffic Evolution - City: {selected_city}", template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) city_traffic_plot_pane.object = fig_traffic_city else: city_traffic_plot_pane.object = None # Availability aggregated at city level avail_cols_city: list[str] = [] rename_map_city: dict[str, str] = {} if "2g_tch_avail" in city_detail_df.columns: avail_cols_city.append("2g_tch_avail") rename_map_city["2g_tch_avail"] = "2G" if "3g_cell_avail" in city_detail_df.columns: avail_cols_city.append("3g_cell_avail") rename_map_city["3g_cell_avail"] = "3G" if "lte_cell_avail" in city_detail_df.columns: avail_cols_city.append("lte_cell_avail") rename_map_city["lte_cell_avail"] = "LTE" if avail_cols_city: avail_city_df = city_detail_df[["date"] + avail_cols_city].copy() avail_city_df = avail_city_df.rename(columns=rename_map_city) value_cols_city = [c for c in avail_city_df.columns if c != "date"] avail_long_city = avail_city_df.melt( id_vars="date", value_vars=value_cols_city, var_name="RAT", value_name="availability", ) fig_avail_city = px.line( avail_long_city, x="date", y="availability", color="RAT", color_discrete_sequence=px.colors.qualitative.Plotly, ) fig_avail_city.update_layout( title=f"Availability vs SLA - City: {selected_city}", template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) city_avail_plot_pane.object = fig_avail_city city_detail_df["date_only"] = city_detail_df["date"].dt.date degraded_rows_city: list[dict] = [] for rat_col, rat_name, sla_value in [ ("2g_tch_avail", "2G", float(sla_2g.value)), ("3g_cell_avail", "3G", float(sla_3g.value)), ("lte_cell_avail", "LTE", float(sla_lte.value)), ]: if rat_col in city_detail_df.columns: daily_city = ( city_detail_df.groupby("date_only")[rat_col].mean().dropna() ) mask_city = daily_city < sla_value for d, val in daily_city[mask_city].items(): degraded_rows_city.append( { "RAT": rat_name, "date": d, "avg_availability": val, "SLA": sla_value, } ) if degraded_rows_city: degraded_city_df = pd.DataFrame(degraded_rows_city) city_degraded_table.value = degraded_city_df.round(2) else: city_degraded_table.value = pd.DataFrame() else: city_avail_plot_pane.object = None city_degraded_table.value = pd.DataFrame() def _update_daily_availability_view() -> None: """Daily average availability per RAT over the full analysis_df.""" if current_analysis_df is None or current_analysis_df.empty: daily_avail_plot_pane.object = None daily_degraded_table.value = pd.DataFrame() return temp_df = current_analysis_df.copy() if not any( col in temp_df.columns for col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"] ): daily_avail_plot_pane.object = None daily_degraded_table.value = pd.DataFrame() return temp_df["date_only"] = temp_df["date"].dt.date agg_dict: dict[str, str] = {} if "2g_tch_avail" in temp_df.columns: agg_dict["2g_tch_avail"] = "mean" if "3g_cell_avail" in temp_df.columns: agg_dict["3g_cell_avail"] = "mean" if "lte_cell_avail" in temp_df.columns: agg_dict["lte_cell_avail"] = "mean" daily_avail = ( temp_df.groupby("date_only", as_index=False).agg(agg_dict) if agg_dict else pd.DataFrame() ) if daily_avail.empty: daily_avail_plot_pane.object = None daily_degraded_table.value = pd.DataFrame() return rename_map: dict[str, str] = {} if "2g_tch_avail" in daily_avail.columns: rename_map["2g_tch_avail"] = "2G" if "3g_cell_avail" in daily_avail.columns: rename_map["3g_cell_avail"] = "3G" if "lte_cell_avail" in daily_avail.columns: rename_map["lte_cell_avail"] = "LTE" daily_avail = daily_avail.rename(columns=rename_map) value_cols = [c for c in daily_avail.columns if c != "date_only"] if not value_cols: daily_avail_plot_pane.object = None daily_degraded_table.value = pd.DataFrame() return daily_melt = daily_avail.melt( id_vars="date_only", value_vars=value_cols, var_name="RAT", value_name="availability", ) fig = px.line( daily_melt, x="date_only", y="availability", color="RAT", markers=True, color_discrete_sequence=px.colors.qualitative.Plotly, ) fig.update_layout( template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) daily_avail_plot_pane.object = fig degraded_rows: list[dict] = [] for rat_name, sla_value in [ ("2G", float(sla_2g.value)), ("3G", float(sla_3g.value)), ("LTE", float(sla_lte.value)), ]: if rat_name in daily_avail.columns: series = daily_avail[rat_name] mask = series < sla_value for d, val in zip(daily_avail.loc[mask, "date_only"], series[mask]): degraded_rows.append( { "RAT": rat_name, "date": d, "avg_availability": val, "SLA": sla_value, } ) if degraded_rows: degraded_df = pd.DataFrame(degraded_rows) daily_degraded_table.value = degraded_df.round(2) else: daily_degraded_table.value = pd.DataFrame() def _update_top_sites_and_maps() -> None: """Top traffic sites and geographic maps based on last analysis period.""" if current_analysis_last_period_df is None or current_analysis_last_period_df.empty: top_data_sites_table.value = pd.DataFrame() top_voice_sites_table.value = pd.DataFrame() top_data_bar_plot_pane.object = None top_voice_bar_plot_pane.object = None data_map_plot_pane.object = None voice_map_plot_pane.object = None return df = current_analysis_last_period_df n = int(number_of_top_trafic_sites.value or 25) # Top sites by data traffic top_sites = ( df.groupby(["code", "City"])["total_data_trafic"] .sum() .sort_values(ascending=False) .head(n) ) top_data_sites_table.value = top_sites.sort_values(ascending=True).reset_index() fig_data = px.bar( top_sites.reset_index(), y=top_sites.reset_index()[["City", "code"]].agg( lambda x: "_".join(map(str, x)), axis=1 ), x="total_data_trafic", title=f"Top {n} sites by data traffic", orientation="h", text="total_data_trafic", color_discrete_sequence=px.colors.qualitative.Plotly, ) fig_data.update_layout( template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) top_data_bar_plot_pane.object = fig_data # Top sites by voice traffic top_sites_voice = ( df.groupby(["code", "City"])["total_voice_trafic"] .sum() .sort_values(ascending=False) .head(n) ) top_voice_sites_table.value = top_sites_voice.sort_values( ascending=True ).reset_index() fig_voice = px.bar( top_sites_voice.reset_index(), y=top_sites_voice.reset_index()[["City", "code"]].agg( lambda x: "_".join(map(str, x)), axis=1 ), x="total_voice_trafic", title=f"Top {n} sites by voice traffic", orientation="h", text="total_voice_trafic", color_discrete_sequence=px.colors.qualitative.Plotly, ) fig_voice.update_layout( template="plotly_white", plot_bgcolor="white", paper_bgcolor="white", ) top_voice_bar_plot_pane.object = fig_voice # Maps if {"Latitude", "Longitude"}.issubset(df.columns): min_size = 5 max_size = 40 # Data traffic map df_data = ( df.groupby(["code", "City", "Latitude", "Longitude"])["total_data_trafic"] .sum() .reset_index() ) if not df_data.empty: traffic_data_min = df_data["total_data_trafic"].min() traffic_data_max = df_data["total_data_trafic"].max() if traffic_data_max > traffic_data_min: df_data["bubble_size"] = df_data["total_data_trafic"].apply( lambda x: min_size + (max_size - min_size) * (x - traffic_data_min) / (traffic_data_max - traffic_data_min) ) else: df_data["bubble_size"] = min_size custom_blue_red = [ [0.0, "#4292c6"], [0.2, "#2171b5"], [0.4, "#084594"], [0.6, "#cb181d"], [0.8, "#a50f15"], [1.0, "#67000d"], ] fig_map_data = px.scatter_map( df_data, lat="Latitude", lon="Longitude", color="total_data_trafic", size="bubble_size", color_continuous_scale=custom_blue_red, size_max=max_size, zoom=10, height=600, title="Data traffic distribution", hover_data={"code": True, "total_data_trafic": True}, hover_name="code", text=[str(x) for x in df_data["code"]], ) fig_map_data.update_layout( mapbox_style="open-street-map", coloraxis_colorbar=dict(title="Total Data Traffic (MB)"), coloraxis=dict(cmin=traffic_data_min, cmax=traffic_data_max), font=dict(size=10, color="black"), ) data_map_plot_pane.object = fig_map_data else: data_map_plot_pane.object = None # Voice traffic map df_voice = ( df.groupby(["code", "City", "Latitude", "Longitude"])["total_voice_trafic"] .sum() .reset_index() ) if not df_voice.empty: traffic_voice_min = df_voice["total_voice_trafic"].min() traffic_voice_max = df_voice["total_voice_trafic"].max() if traffic_voice_max > traffic_voice_min: df_voice["bubble_size"] = df_voice["total_voice_trafic"].apply( lambda x: min_size + (max_size - min_size) * (x - traffic_voice_min) / (traffic_voice_max - traffic_voice_min) ) else: df_voice["bubble_size"] = min_size custom_blue_red = [ [0.0, "#4292c6"], [0.2, "#2171b5"], [0.4, "#084594"], [0.6, "#cb181d"], [0.8, "#a50f15"], [1.0, "#67000d"], ] fig_map_voice = px.scatter_map( df_voice, lat="Latitude", lon="Longitude", color="total_voice_trafic", size="bubble_size", color_continuous_scale=custom_blue_red, size_max=max_size, zoom=10, height=600, title="Voice traffic distribution", hover_data={"code": True, "total_voice_trafic": True}, hover_name="code", text=[str(x) for x in df_voice["code"]], ) fig_map_voice.update_layout( mapbox_style="open-street-map", coloraxis_colorbar=dict(title="Total Voice Traffic (MB)"), coloraxis=dict(cmin=traffic_voice_min, cmax=traffic_voice_max), font=dict(size=10, color="black"), ) voice_map_plot_pane.object = fig_map_voice else: voice_map_plot_pane.object = None else: data_map_plot_pane.object = None voice_map_plot_pane.object = None def _update_persistent_table_view(event=None) -> None: # noqa: D401, ARG001 """Update persistent issues table based on current_persistent_df and top_critical_n.""" if current_persistent_df is None or current_persistent_df.empty: persistent_table.value = pd.DataFrame() return n = int(top_critical_n_widget.value or 25) persistent_table.value = current_persistent_df.head(n).round(2) def _recompute_persistent_from_widget(event=None) -> None: # noqa: ARG001 """Recompute persistent issues when the minimum consecutive days widget changes.""" global current_persistent_df if ( current_analysis_df is None or current_analysis_df.empty or current_multi_rat_df is None or current_multi_rat_df.empty ): current_persistent_df = None persistent_table.value = pd.DataFrame() return persistent_df = analyze_persistent_availability( current_analysis_df, current_multi_rat_df, float(sla_2g.value), float(sla_3g.value), float(sla_lte.value), int(min_persistent_days_widget.value), ) current_persistent_df = ( persistent_df if persistent_df is not None and not persistent_df.empty else None ) _update_persistent_table_view() def _build_export_bytes() -> bytes: """Build Excel report bytes mirroring the Streamlit export structure.""" if current_full_df is None: return b"" dfs: list[pd.DataFrame] = [ current_full_df, ( current_sum_pre_post_df if current_sum_pre_post_df is not None else pd.DataFrame() ), ( current_avg_pre_post_df if current_avg_pre_post_df is not None else pd.DataFrame() ), ( current_monthly_voice_df if current_monthly_voice_df is not None else pd.DataFrame() ), ( current_monthly_data_df if current_monthly_data_df is not None else pd.DataFrame() ), ( current_availability_summary_all_df if current_availability_summary_all_df is not None else pd.DataFrame() ), current_site_2g_avail if current_site_2g_avail is not None else pd.DataFrame(), current_site_3g_avail if current_site_3g_avail is not None else pd.DataFrame(), ( current_site_lte_avail if current_site_lte_avail is not None else pd.DataFrame() ), ( current_export_multi_rat_df if current_export_multi_rat_df is not None else pd.DataFrame() ), ( current_export_persistent_df if current_export_persistent_df is not None else pd.DataFrame() ), ] sheet_names = [ "Global_Trafic_Analysis", "Sum_pre_post_analysis", "Avg_pre_post_analysis", "Monthly_voice_analysis", "Monthly_data_analysis", "Availability_Summary_All_RAT", "TwoG_Availability_By_Site", "ThreeG_Availability_By_Site", "LTE_Availability_By_Site", "MultiRAT_Availability_By_Site", "Top_Critical_Sites", ] return write_dfs_to_excel(dfs, sheet_names, index=True) def _export_callback() -> bytes: # Use cached bytes from the last completed analysis to make download instant data = current_export_bytes or b"" if not data: return io.BytesIO() # FileDownload expects a file path or file-like object, not raw bytes return io.BytesIO(data) def _df_to_csv_bytes(df: pd.DataFrame | None) -> io.BytesIO: if df is None or getattr(df, "empty", True): # handles None and empty DataFrame return io.BytesIO() return io.BytesIO(df.to_csv(index=False).encode("utf-8")) def _download_multi_rat_table() -> io.BytesIO: value = getattr(multi_rat_table, "value", None) return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None) def _download_persistent_table() -> io.BytesIO: value = getattr(persistent_table, "value", None) return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None) def _download_top_data_sites() -> io.BytesIO: value = getattr(top_data_sites_table, "value", None) return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None) def _download_top_voice_sites() -> io.BytesIO: value = getattr(top_voice_sites_table, "value", None) return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None) # Client-side Fullscreen JS logic # We target the specific CSS class assigned to each plot pane. # Client-side Fullscreen JS logic with Shadow DOM support _JS_FULLSCREEN = """ function findDeep(root, cls) { if (!root) return null; if (root.classList && root.classList.contains(cls)) return root; if (root.shadowRoot) { var found = findDeep(root.shadowRoot, cls); if (found) return found; } var children = root.children; if (children) { for (var i = 0; i < children.length; i++) { var found = findDeep(children[i], cls); if (found) return found; } } return null; } var el = findDeep(document.body, target_class); if (el) { if (el.requestFullscreen) { el.requestFullscreen(); } else if (el.webkitRequestFullscreen) { el.webkitRequestFullscreen(); } else if (el.msRequestFullscreen) { el.msRequestFullscreen(); } } else { // Debug info alert("Impossible de passer en plein écran : élément '" + target_class + "' introuvable même après recherche approfondie (Shadow DOM)."); } """ # Reactive bindings for drill-down controls & export site_select.param.watch(_update_site_view, "value") city_select.param.watch(_update_city_view, "value") top_critical_n_widget.param.watch(_update_persistent_table_view, "value") number_of_top_trafic_sites.param.watch(_update_top_sites_and_maps, "value") min_persistent_days_widget.param.watch(_recompute_persistent_from_widget, "value") export_button.callback = _export_callback multi_rat_download.callback = _download_multi_rat_table persistent_download.callback = _download_persistent_table top_data_download.callback = _download_top_data_sites top_voice_download.callback = _download_top_voice_sites site_traffic_fullscreen_btn.js_on_click( args={"target_class": "site-traffic-wrapper"}, code=_JS_FULLSCREEN, ) site_avail_fullscreen_btn.js_on_click( args={"target_class": "site-avail-wrapper"}, code=_JS_FULLSCREEN, ) city_traffic_fullscreen_btn.js_on_click( args={"target_class": "city-traffic-wrapper"}, code=_JS_FULLSCREEN, ) city_avail_fullscreen_btn.js_on_click( args={"target_class": "city-avail-wrapper"}, code=_JS_FULLSCREEN, ) daily_avail_fullscreen_btn.js_on_click( args={"target_class": "daily-avail-wrapper"}, code=_JS_FULLSCREEN, ) top_data_fullscreen_btn.js_on_click( args={"target_class": "top-data-bar-wrapper"}, code=_JS_FULLSCREEN, ) top_voice_fullscreen_btn.js_on_click( args={ "target_class": "top-voice-bar-wrapper", }, code=_JS_FULLSCREEN, ) data_map_fullscreen_btn.js_on_click( args={"target_class": "data-map-wrapper"}, code=_JS_FULLSCREEN, ) voice_map_fullscreen_btn.js_on_click( args={"target_class": "voice-map-wrapper"}, code=_JS_FULLSCREEN, ) # -------------------------------------------------------------------------------------- # Material Template layout # -------------------------------------------------------------------------------------- template = pn.template.MaterialTemplate( title="📊 Global Trafic Analysis - Panel (2G / 3G / LTE)", ) # Ensure the template modal is large enough for fullscreen charts # Modal CSS override removed as we switched to native fullscreen. sidebar_content = pn.Column( """This Panel app is a migration of the existing Streamlit-based global traffic analysis. Upload the 3 traffic reports (2G / 3G / LTE), configure the analysis periods and SLAs, then run the analysis. In this first step, the app only validates the pipeline and shows a lightweight summary of the inputs.\nFull KPIs and visualizations will be added progressively.""", "---", file_2g, file_3g, file_lte, "---", pre_range, post_range, last_range, "---", sla_2g, sla_3g, sla_lte, "---", number_of_top_trafic_sites, min_persistent_days_widget, top_critical_n_widget, "---", run_button, ) main_content = pn.Column( status_pane, pn.pane.Markdown("## Input datasets summary"), summary_table, pn.layout.Divider(), pn.pane.Markdown("## Summary Analysis Pre / Post"), sum_pre_post_table, pn.layout.Divider(), pn.pane.Markdown("## Availability vs SLA (per RAT)"), pn.Tabs( ( "2G", pn.Column( summary_2g_table, pn.pane.Markdown("Worst 25 sites"), worst_2g_table ), ), ( "3G", pn.Column( summary_3g_table, pn.pane.Markdown("Worst 25 sites"), worst_3g_table ), ), ( "LTE", pn.Column( summary_lte_table, pn.pane.Markdown("Worst 25 sites"), worst_lte_table ), ), ), pn.layout.Divider(), pn.pane.Markdown("## Multi-RAT Availability (post-period)"), multi_rat_table, multi_rat_download, pn.layout.Divider(), pn.pane.Markdown("## Persistent availability issues (critical sites)"), persistent_table, persistent_download, pn.layout.Divider(), pn.pane.Markdown("## Site drill-down: traffic and availability over time"), site_select, site_traffic_plot, site_traffic_fullscreen_btn, site_avail_plot, site_avail_fullscreen_btn, site_degraded_table, pn.layout.Divider(), pn.pane.Markdown("## City drill-down: traffic and availability over time"), city_select, city_traffic_plot, city_traffic_fullscreen_btn, city_avail_plot, city_avail_fullscreen_btn, city_degraded_table, pn.layout.Divider(), pn.pane.Markdown("## Daily average availability per RAT"), daily_avail_plot, daily_avail_fullscreen_btn, daily_degraded_table, pn.layout.Divider(), pn.pane.Markdown("## Top traffic sites and geographic maps (last period)"), pn.Row( pn.Column( pn.pane.Markdown("### Top sites by data traffic"), top_data_sites_table, top_data_download, top_data_bar_plot, top_data_fullscreen_btn, ), pn.Column( pn.pane.Markdown("### Top sites by voice traffic"), top_voice_sites_table, top_voice_download, top_voice_bar_plot, top_voice_fullscreen_btn, ), ), pn.Row( pn.Column( pn.pane.Markdown("### Data traffic map"), data_map_plot, data_map_fullscreen_btn, ), pn.Column( pn.pane.Markdown("### Voice traffic map"), voice_map_plot, voice_map_fullscreen_btn, ), ), pn.layout.Divider(), pn.pane.Markdown("## Export"), export_button, ) def get_page_components(): return sidebar_content, main_content if __name__ == "__main__": template.sidebar.append(sidebar_content) template.main.append(main_content) template.servable()