db_query / apps /kpi_analysis /trafic_analysis.py
DavMelchi's picture
Add city-level traffic and availability drill-down with daily aggregation and SLA comparison across all RATs
092401b
import io
import zipfile
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.express as px
import streamlit as st
from utils.convert_to_excel import convert_dfs, save_dataframe
from utils.utils_vars import get_physical_db
def read_uploaded_file(uploaded_file):
"""Read uploaded file, handling both ZIP and CSV formats.
Args:
uploaded_file: Uploaded file object from Streamlit
Returns:
pd.DataFrame: DataFrame containing the data from the uploaded file
"""
if uploaded_file.name.endswith(".zip"):
with zipfile.ZipFile(io.BytesIO(uploaded_file.getvalue())) as z:
# Get the first CSV file in the zip
csv_files = [f for f in z.namelist() if f.lower().endswith(".csv")]
if not csv_files:
raise ValueError("No CSV file found in the ZIP archive")
with z.open(csv_files[0]) as f:
return pd.read_csv(f, encoding="latin1", sep=";", low_memory=False)
elif uploaded_file.name.endswith(".csv"):
return pd.read_csv(uploaded_file, encoding="latin1", sep=";", low_memory=False)
else:
raise ValueError("Unsupported file format. Please upload a ZIP or CSV file.")
class TraficAnalysis:
last_period_df: pd.DataFrame = None
############### PROCESSING ###############
def extract_code(name):
name = name.replace(" ", "_") if isinstance(name, str) else None
if name and len(name) >= 10:
try:
return int(name.split("_")[0])
except ValueError:
return None
return None
def preprocess_2g(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["BCF name"].str.len() >= 10].copy()
df["2g_data_trafic"] = ((df["TRAFFIC_PS DL"] + df["PS_UL_Load"]) / 1000).round(1)
df.rename(columns={"2G_Carried Traffic": "2g_voice_trafic"}, inplace=True)
df["code"] = df["BCF name"].apply(extract_code)
df["code"] = pd.to_numeric(df["code"], errors="coerce")
df = df[df["code"].notna()]
df["code"] = df["code"].astype(int)
date_format = (
"%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y"
)
df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format)
df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str)
if "TCH availability ratio" in df.columns:
df["2g_tch_avail"] = pd.to_numeric(
df["TCH availability ratio"], errors="coerce"
)
agg_dict = {
"2g_data_trafic": "sum",
"2g_voice_trafic": "sum",
}
if "2g_tch_avail" in df.columns:
agg_dict["2g_tch_avail"] = "mean"
df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict)
return df
def preprocess_3g(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["WBTS name"].str.len() >= 10].copy()
df["code"] = df["WBTS name"].apply(extract_code)
df["code"] = pd.to_numeric(df["code"], errors="coerce")
df = df[df["code"].notna()]
df["code"] = df["code"].astype(int)
date_format = (
"%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y"
)
df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format)
df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str)
df.rename(
columns={
"Total CS traffic - Erl": "3g_voice_trafic",
"Total_Data_Traffic": "3g_data_trafic",
},
inplace=True,
)
kpi_col = None
for col in df.columns:
if "cell availability" in str(col).lower():
kpi_col = col
break
if kpi_col is not None:
df["3g_cell_avail"] = pd.to_numeric(df[kpi_col], errors="coerce")
agg_dict = {
"3g_voice_trafic": "sum",
"3g_data_trafic": "sum",
}
if "3g_cell_avail" in df.columns:
agg_dict["3g_cell_avail"] = "mean"
df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict)
return df
def preprocess_lte(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["LNBTS name"].str.len() >= 10].copy()
df["lte_data_trafic"] = (
df["4G/LTE DL Traffic Volume (GBytes)"]
+ df["4G/LTE UL Traffic Volume (GBytes)"]
)
df["code"] = df["LNBTS name"].apply(extract_code)
df["code"] = pd.to_numeric(df["code"], errors="coerce")
df = df[df["code"].notna()]
df["code"] = df["code"].astype(int)
date_format = (
"%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y"
)
df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format)
df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str)
if "Cell Avail excl BLU" in df.columns:
df["lte_cell_avail"] = pd.to_numeric(df["Cell Avail excl BLU"], errors="coerce")
agg_dict = {"lte_data_trafic": "sum"}
if "lte_cell_avail" in df.columns:
agg_dict["lte_cell_avail"] = "mean"
df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict)
return df
############################## ANALYSIS ################
def merge_and_compare(df_2g, df_3g, df_lte, pre_range, post_range, last_period_range):
# Load physical database
physical_db = get_physical_db()
physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0]
physical_db["code"] = (
pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int)
)
physical_db = physical_db[["code", "Longitude", "Latitude", "City"]]
physical_db = physical_db.drop_duplicates(subset="code")
df = pd.merge(df_2g, df_3g, on=["date", "ID", "code"], how="outer")
df = pd.merge(df, df_lte, on=["date", "ID", "code"], how="outer")
# print(df)
for col in [
"2g_data_trafic",
"2g_voice_trafic",
"3g_voice_trafic",
"3g_data_trafic",
"lte_data_trafic",
]:
if col not in df:
df[col] = 0
kpi_masks = {}
for kpi_col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"]:
if kpi_col in df.columns:
kpi_masks[kpi_col] = df[kpi_col].notna()
df.fillna(0, inplace=True)
for kpi_col, mask in kpi_masks.items():
df.loc[~mask, kpi_col] = np.nan
df["total_voice_trafic"] = df["2g_voice_trafic"] + df["3g_voice_trafic"]
df["total_data_trafic"] = (
df["2g_data_trafic"] + df["3g_data_trafic"] + df["lte_data_trafic"]
)
df = pd.merge(df, physical_db, on=["code"], how="left")
# Assign period based on date range
pre_start, pre_end = pd.to_datetime(pre_range[0]), pd.to_datetime(pre_range[1])
post_start, post_end = pd.to_datetime(post_range[0]), pd.to_datetime(post_range[1])
last_period_start, last_period_end = pd.to_datetime(
last_period_range[0]
), pd.to_datetime(last_period_range[1])
last_period = df[
(df["date"] >= last_period_start) & (df["date"] <= last_period_end)
]
def assign_period(date):
if pre_start <= date <= pre_end:
return "pre"
elif post_start <= date <= post_end:
return "post"
else:
return "other"
df["period"] = df["date"].apply(assign_period)
comparison = df[df["period"].isin(["pre", "post"])]
sum_pivot = (
comparison.groupby(["code", "period"])[
["total_voice_trafic", "total_data_trafic"]
]
.sum()
.unstack()
)
sum_pivot.columns = [f"{metric}_{period}" for metric, period in sum_pivot.columns]
sum_pivot = sum_pivot.reset_index()
# Differences
sum_pivot["total_voice_trafic_diff"] = (
sum_pivot["total_voice_trafic_post"] - sum_pivot["total_voice_trafic_pre"]
)
sum_pivot["total_data_trafic_diff"] = (
sum_pivot["total_data_trafic_post"] - sum_pivot["total_data_trafic_pre"]
)
for metric in ["total_voice_trafic", "total_data_trafic"]:
sum_pivot[f"{metric}_diff_pct"] = (
(sum_pivot.get(f"{metric}_post", 0) - sum_pivot.get(f"{metric}_pre", 0))
/ sum_pivot.get(f"{metric}_pre", 1)
) * 100
# Reorder sum_pivot columns: voice before data, pre before post
sum_order = [
"code",
"total_voice_trafic_pre",
"total_voice_trafic_post",
"total_voice_trafic_diff",
"total_voice_trafic_diff_pct",
"total_data_trafic_pre",
"total_data_trafic_post",
"total_data_trafic_diff",
"total_data_trafic_diff_pct",
]
sum_existing_cols = [col for col in sum_order if col in sum_pivot.columns]
sum_remaining_cols = [
col for col in sum_pivot.columns if col not in sum_existing_cols
]
sum_pivot = sum_pivot[sum_existing_cols + sum_remaining_cols]
avg_pivot = (
comparison.groupby(["code", "period"])[
["total_voice_trafic", "total_data_trafic"]
]
.mean()
.unstack()
)
avg_pivot.columns = [f"{metric}_{period}" for metric, period in avg_pivot.columns]
avg_pivot = avg_pivot.reset_index()
# Differences
avg_pivot["total_voice_trafic_diff"] = (
avg_pivot["total_voice_trafic_post"] - avg_pivot["total_voice_trafic_pre"]
)
avg_pivot["total_data_trafic_diff"] = (
avg_pivot["total_data_trafic_post"] - avg_pivot["total_data_trafic_pre"]
)
for metric in ["total_voice_trafic", "total_data_trafic"]:
avg_pivot[f"{metric}_diff_pct"] = (
(avg_pivot.get(f"{metric}_post", 0) - avg_pivot.get(f"{metric}_pre", 0))
/ avg_pivot.get(f"{metric}_pre", 1)
) * 100
# rename avg_pivot columns
avg_pivot = avg_pivot.rename(
columns={
"total_voice_trafic_pre": "avg_voice_trafic_pre",
"total_voice_trafic_post": "avg_voice_trafic_post",
"total_voice_trafic_diff": "avg_voice_trafic_diff",
"total_voice_trafic_diff_pct": "avg_voice_trafic_diff_pct",
"total_data_trafic_pre": "avg_data_trafic_pre",
"total_data_trafic_post": "avg_data_trafic_post",
"total_data_trafic_diff": "avg_data_trafic_diff",
"total_data_trafic_diff_pct": "avg_data_trafic_diff_pct",
}
)
# Reorder avg_pivot columns: voice before data, pre before post
avg_order = [
"code",
"avg_voice_trafic_pre",
"avg_voice_trafic_post",
"avg_voice_trafic_diff",
"avg_voice_trafic_diff_pct",
"avg_data_trafic_pre",
"avg_data_trafic_post",
"avg_data_trafic_diff",
"avg_data_trafic_diff_pct",
]
avg_existing_cols = [col for col in avg_order if col in avg_pivot.columns]
avg_remaining_cols = [
col for col in avg_pivot.columns if col not in avg_existing_cols
]
avg_pivot = avg_pivot[avg_existing_cols + avg_remaining_cols]
return df, last_period, sum_pivot.round(2), avg_pivot.round(2)
def analyze_2g_availability(df: pd.DataFrame, sla_2g: float):
avail_col = "2g_tch_avail"
if avail_col not in df.columns or "period" not in df.columns:
return None, None
df_2g = df[df[avail_col].notna()].copy()
df_2g = df_2g[df_2g["period"].isin(["pre", "post"])]
if df_2g.empty:
return None, None
site_pivot = df_2g.groupby(["code", "period"])[avail_col].mean().unstack()
site_pivot = site_pivot.rename(
columns={"pre": "tch_avail_pre", "post": "tch_avail_post"}
)
if "tch_avail_pre" not in site_pivot.columns:
site_pivot["tch_avail_pre"] = pd.NA
if "tch_avail_post" not in site_pivot.columns:
site_pivot["tch_avail_post"] = pd.NA
site_pivot["tch_avail_diff"] = (
site_pivot["tch_avail_post"] - site_pivot["tch_avail_pre"]
)
site_pivot["pre_ok_vs_sla"] = site_pivot["tch_avail_pre"] >= sla_2g
site_pivot["post_ok_vs_sla"] = site_pivot["tch_avail_post"] >= sla_2g
site_pivot = site_pivot.reset_index()
summary_rows = []
for period_label, col_name in [
("pre", "tch_avail_pre"),
("post", "tch_avail_post"),
]:
series = site_pivot[col_name].dropna()
total_cells = series.shape[0]
if total_cells == 0:
summary_rows.append(
{
"period": period_label,
"cells": 0,
"avg_availability": pd.NA,
"median_availability": pd.NA,
"p05_availability": pd.NA,
"p95_availability": pd.NA,
"min_availability": pd.NA,
"max_availability": pd.NA,
"cells_ge_sla": 0,
"cells_lt_sla": 0,
"pct_cells_ge_sla": pd.NA,
}
)
continue
cells_ge_sla = (series >= sla_2g).sum()
cells_lt_sla = (series < sla_2g).sum()
summary_rows.append(
{
"period": period_label,
"cells": int(total_cells),
"avg_availability": series.mean(),
"median_availability": series.median(),
"p05_availability": series.quantile(0.05),
"p95_availability": series.quantile(0.95),
"min_availability": series.min(),
"max_availability": series.max(),
"cells_ge_sla": int(cells_ge_sla),
"cells_lt_sla": int(cells_lt_sla),
"pct_cells_ge_sla": cells_ge_sla / total_cells * 100,
}
)
summary_df = pd.DataFrame(summary_rows)
return summary_df, site_pivot
def analyze_3g_availability(df: pd.DataFrame, sla_3g: float):
avail_col = "3g_cell_avail"
if avail_col not in df.columns or "period" not in df.columns:
return None, None
df_3g = df[df[avail_col].notna()].copy()
df_3g = df_3g[df_3g["period"].isin(["pre", "post"])]
if df_3g.empty:
return None, None
site_pivot = df_3g.groupby(["code", "period"])[avail_col].mean().unstack()
site_pivot = site_pivot.rename(
columns={"pre": "cell_avail_pre", "post": "cell_avail_post"}
)
if "cell_avail_pre" not in site_pivot.columns:
site_pivot["cell_avail_pre"] = pd.NA
if "cell_avail_post" not in site_pivot.columns:
site_pivot["cell_avail_post"] = pd.NA
site_pivot["cell_avail_diff"] = (
site_pivot["cell_avail_post"] - site_pivot["cell_avail_pre"]
)
site_pivot["pre_ok_vs_sla"] = site_pivot["cell_avail_pre"] >= sla_3g
site_pivot["post_ok_vs_sla"] = site_pivot["cell_avail_post"] >= sla_3g
site_pivot = site_pivot.reset_index()
summary_rows = []
for period_label, col_name in [
("pre", "cell_avail_pre"),
("post", "cell_avail_post"),
]:
series = site_pivot[col_name].dropna()
total_cells = series.shape[0]
if total_cells == 0:
summary_rows.append(
{
"period": period_label,
"cells": 0,
"avg_availability": pd.NA,
"median_availability": pd.NA,
"p05_availability": pd.NA,
"p95_availability": pd.NA,
"min_availability": pd.NA,
"max_availability": pd.NA,
"cells_ge_sla": 0,
"cells_lt_sla": 0,
"pct_cells_ge_sla": pd.NA,
}
)
continue
cells_ge_sla = (series >= sla_3g).sum()
cells_lt_sla = (series < sla_3g).sum()
summary_rows.append(
{
"period": period_label,
"cells": int(total_cells),
"avg_availability": series.mean(),
"median_availability": series.median(),
"p05_availability": series.quantile(0.05),
"p95_availability": series.quantile(0.95),
"min_availability": series.min(),
"max_availability": series.max(),
"cells_ge_sla": int(cells_ge_sla),
"cells_lt_sla": int(cells_lt_sla),
"pct_cells_ge_sla": cells_ge_sla / total_cells * 100,
}
)
summary_df = pd.DataFrame(summary_rows)
return summary_df, site_pivot
def analyze_lte_availability(df: pd.DataFrame, sla_lte: float):
avail_col = "lte_cell_avail"
if avail_col not in df.columns or "period" not in df.columns:
return None, None
df_lte = df[df[avail_col].notna()].copy()
df_lte = df_lte[df_lte["period"].isin(["pre", "post"])]
if df_lte.empty:
return None, None
site_pivot = df_lte.groupby(["code", "period"])[avail_col].mean().unstack()
site_pivot = site_pivot.rename(
columns={"pre": "lte_avail_pre", "post": "lte_avail_post"}
)
if "lte_avail_pre" not in site_pivot.columns:
site_pivot["lte_avail_pre"] = pd.NA
if "lte_avail_post" not in site_pivot.columns:
site_pivot["lte_avail_post"] = pd.NA
site_pivot["lte_avail_diff"] = (
site_pivot["lte_avail_post"] - site_pivot["lte_avail_pre"]
)
site_pivot["pre_ok_vs_sla"] = site_pivot["lte_avail_pre"] >= sla_lte
site_pivot["post_ok_vs_sla"] = site_pivot["lte_avail_post"] >= sla_lte
site_pivot = site_pivot.reset_index()
summary_rows = []
for period_label, col_name in [
("pre", "lte_avail_pre"),
("post", "lte_avail_post"),
]:
series = site_pivot[col_name].dropna()
total_cells = series.shape[0]
if total_cells == 0:
summary_rows.append(
{
"period": period_label,
"cells": 0,
"avg_availability": pd.NA,
"median_availability": pd.NA,
"p05_availability": pd.NA,
"p95_availability": pd.NA,
"min_availability": pd.NA,
"max_availability": pd.NA,
"cells_ge_sla": 0,
"cells_lt_sla": 0,
"pct_cells_ge_sla": pd.NA,
}
)
continue
cells_ge_sla = (series >= sla_lte).sum()
cells_lt_sla = (series < sla_lte).sum()
summary_rows.append(
{
"period": period_label,
"cells": int(total_cells),
"avg_availability": series.mean(),
"median_availability": series.median(),
"p05_availability": series.quantile(0.05),
"p95_availability": series.quantile(0.95),
"min_availability": series.min(),
"max_availability": series.max(),
"cells_ge_sla": int(cells_ge_sla),
"cells_lt_sla": int(cells_lt_sla),
"pct_cells_ge_sla": cells_ge_sla / total_cells * 100,
}
)
summary_df = pd.DataFrame(summary_rows)
return summary_df, site_pivot
def analyze_multirat_availability(
df: pd.DataFrame, sla_2g: float, sla_3g: float, sla_lte: float
):
if "period" not in df.columns:
return None
rat_cols = []
if "2g_tch_avail" in df.columns:
rat_cols.append("2g_tch_avail")
if "3g_cell_avail" in df.columns:
rat_cols.append("3g_cell_avail")
if "lte_cell_avail" in df.columns:
rat_cols.append("lte_cell_avail")
if not rat_cols:
return None
agg_dict = {col: "mean" for col in rat_cols}
df_pre = df[df["period"] == "pre"]
df_post = df[df["period"] == "post"]
pre = df_pre.groupby("code", as_index=False).agg(agg_dict)
post = df_post.groupby("code", as_index=False).agg(agg_dict)
rename_map_pre = {
"2g_tch_avail": "2g_avail_pre",
"3g_cell_avail": "3g_avail_pre",
"lte_cell_avail": "lte_avail_pre",
}
rename_map_post = {
"2g_tch_avail": "2g_avail_post",
"3g_cell_avail": "3g_avail_post",
"lte_cell_avail": "lte_avail_post",
}
pre = pre.rename(columns=rename_map_pre)
post = post.rename(columns=rename_map_post)
multi = pd.merge(pre, post, on="code", how="outer")
# Add post-period total traffic per site
if not df_post.empty and {
"total_voice_trafic",
"total_data_trafic",
}.issubset(df_post.columns):
post_traffic = (
df_post.groupby("code", as_index=False)[
["total_voice_trafic", "total_data_trafic"]
]
.sum()
.rename(
columns={
"total_voice_trafic": "post_total_voice_trafic",
"total_data_trafic": "post_total_data_trafic",
}
)
)
multi = pd.merge(multi, post_traffic, on="code", how="left")
if "City" in df.columns:
city_df = df[["code", "City"]].drop_duplicates("code")
multi = pd.merge(multi, city_df, on="code", how="left")
# Compute OK/Not OK flags vs SLA on post-period
def _ok_flag(series: pd.Series, sla: float) -> pd.Series:
if series.name not in multi.columns:
return pd.Series([pd.NA] * len(multi), index=multi.index)
ok = multi[series.name] >= sla
ok = ok.where(multi[series.name].notna(), pd.NA)
return ok
if "2g_avail_post" in multi.columns:
multi["ok_2g_post"] = _ok_flag(multi["2g_avail_post"], sla_2g)
if "3g_avail_post" in multi.columns:
multi["ok_3g_post"] = _ok_flag(multi["3g_avail_post"], sla_3g)
if "lte_avail_post" in multi.columns:
multi["ok_lte_post"] = _ok_flag(multi["lte_avail_post"], sla_lte)
def classify_row(row):
rats_status = []
for rat, col in [
("2G", "ok_2g_post"),
("3G", "ok_3g_post"),
("LTE", "ok_lte_post"),
]:
if col in row and not pd.isna(row[col]):
rats_status.append((rat, bool(row[col])))
if not rats_status:
return "No RAT data"
bad_rats = [rat for rat, ok in rats_status if not ok]
if not bad_rats:
return "OK all RAT"
if len(bad_rats) == 1:
return f"Degraded {bad_rats[0]} only"
return "Degraded multi-RAT (" + ",".join(bad_rats) + ")"
multi["post_multirat_status"] = multi.apply(classify_row, axis=1)
# Order columns for readability
ordered_cols = ["code"]
if "City" in multi.columns:
ordered_cols.append("City")
for col in [
"2g_avail_pre",
"2g_avail_post",
"3g_avail_pre",
"3g_avail_post",
"lte_avail_pre",
"lte_avail_post",
"post_total_voice_trafic",
"post_total_data_trafic",
"ok_2g_post",
"ok_3g_post",
"ok_lte_post",
"post_multirat_status",
]:
if col in multi.columns:
ordered_cols.append(col)
remaining_cols = [c for c in multi.columns if c not in ordered_cols]
multi = multi[ordered_cols + remaining_cols]
return multi
def analyze_persistent_availability(
df: pd.DataFrame,
multi_rat_df: pd.DataFrame,
sla_2g: float,
sla_3g: float,
sla_lte: float,
min_consecutive_days: int = 3,
) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame()
if "date" not in df.columns or "code" not in df.columns:
return pd.DataFrame()
work_df = df.copy()
work_df["date_only"] = work_df["date"].dt.date
site_stats = {}
def _update_stats(rat_key_prefix: str, grouped: pd.DataFrame, sla: float) -> None:
if grouped.empty:
return
for code, group in grouped.groupby("code"):
group = group.sort_values("date_only")
dates = pd.to_datetime(group["date_only"]).tolist()
below_flags = (group["value"] < sla).tolist()
max_streak = 0
current_streak = 0
total_below = 0
last_date = None
for flag, current_date in zip(below_flags, dates):
if flag:
total_below += 1
if (
last_date is not None
and current_date == last_date + timedelta(days=1)
and current_streak > 0
):
current_streak += 1
else:
current_streak = 1
if current_streak > max_streak:
max_streak = current_streak
else:
current_streak = 0
last_date = current_date
stats = site_stats.setdefault(
code,
{
"code": code,
"max_streak_2g": 0,
"max_streak_3g": 0,
"max_streak_lte": 0,
"below_days_2g": 0,
"below_days_3g": 0,
"below_days_lte": 0,
},
)
stats[f"max_streak_{rat_key_prefix}"] = max_streak
stats[f"below_days_{rat_key_prefix}"] = total_below
for rat_col, rat_key, sla in [
("2g_tch_avail", "2g", sla_2g),
("3g_cell_avail", "3g", sla_3g),
("lte_cell_avail", "lte", sla_lte),
]:
if rat_col in work_df.columns:
g = (
work_df.dropna(subset=[rat_col])
.groupby(["code", "date_only"])[rat_col]
.mean()
.reset_index()
)
g = g.rename(columns={rat_col: "value"})
_update_stats(rat_key, g, sla)
if not site_stats:
return pd.DataFrame()
rows = []
for code, s in site_stats.items():
max_2g = s.get("max_streak_2g", 0)
max_3g = s.get("max_streak_3g", 0)
max_lte = s.get("max_streak_lte", 0)
below_2g = s.get("below_days_2g", 0)
below_3g = s.get("below_days_3g", 0)
below_lte = s.get("below_days_lte", 0)
persistent_2g = max_2g >= min_consecutive_days if max_2g else False
persistent_3g = max_3g >= min_consecutive_days if max_3g else False
persistent_lte = max_lte >= min_consecutive_days if max_lte else False
total_below_any = below_2g + below_3g + below_lte
persistent_any = persistent_2g or persistent_3g or persistent_lte
rats_persistent_count = sum(
[persistent_2g is True, persistent_3g is True, persistent_lte is True]
)
rows.append(
{
"code": code,
"persistent_issue_2g": persistent_2g,
"persistent_issue_3g": persistent_3g,
"persistent_issue_lte": persistent_lte,
"max_consecutive_days_2g": max_2g,
"max_consecutive_days_3g": max_3g,
"max_consecutive_days_lte": max_lte,
"total_below_days_2g": below_2g,
"total_below_days_3g": below_3g,
"total_below_days_lte": below_lte,
"total_below_days_any": total_below_any,
"persistent_issue_any": persistent_any,
"persistent_rats_count": rats_persistent_count,
}
)
result = pd.DataFrame(rows)
result = result[result["persistent_issue_any"] == True]
if result.empty:
return result
if multi_rat_df is not None and not multi_rat_df.empty:
cols_to_merge = [
c
for c in [
"code",
"City",
"post_total_voice_trafic",
"post_total_data_trafic",
"post_multirat_status",
]
if c in multi_rat_df.columns
]
if cols_to_merge:
result = pd.merge(
result,
multi_rat_df[cols_to_merge].drop_duplicates("code"),
on="code",
how="left",
)
if "post_total_data_trafic" not in result.columns:
result["post_total_data_trafic"] = 0.0
result["criticity_score"] = (
result["post_total_data_trafic"].fillna(0) * 1.0
+ result["total_below_days_any"].fillna(0) * 100.0
+ result["persistent_rats_count"].fillna(0) * 1000.0
)
result = result.sort_values(
by=["criticity_score", "total_below_days_any"], ascending=[False, False]
)
return result
def monthly_data_analysis(df: pd.DataFrame) -> pd.DataFrame:
df["date"] = pd.to_datetime(df["date"])
# Create column 'YYYY-MM' for grouping by month while keeping the year
df["month_year"] = df["date"].dt.to_period("M").astype(str)
# Pivot : lines = code, columns = month_year, values = sum
voice_trafic = df.pivot_table(
index="code",
columns="month_year",
values="total_voice_trafic",
aggfunc="sum",
fill_value=0,
)
# Sort columns chronologically
voice_trafic = voice_trafic.reindex(sorted(voice_trafic.columns), axis=1)
data_trafic = df.pivot_table(
index="code",
columns="month_year",
values="total_data_trafic",
aggfunc="sum",
fill_value=0,
)
# Sort columns chronologically
data_trafic = data_trafic.reindex(sorted(data_trafic.columns), axis=1)
# Display result
return voice_trafic, data_trafic
############################## UI #########################
st.title("📊 Global Trafic Analysis - 2G / 3G / LTE")
doc_col, image_col = st.columns(2)
with doc_col:
st.write(
"""
The report analyzes 2G / 3G / LTE traffic :
- 2G Traffic Report in CSV format (required columns : BCF name, PERIOD_START_TIME, TRAFFIC_PS DL, PS_UL_Load)
- 3G Traffic Report in CSV format (required columns : WBTS name, PERIOD_START_TIME, Total CS traffic - Erl, Total_Data_Traffic)
- LTE Traffic Report in CSV format (required columns : LNBTS name, PERIOD_START_TIME, 4G/LTE DL Traffic Volume (GBytes), 4G/LTE UL Traffic Volume (GBytes))
"""
)
# with image_col:
# st.image("./assets/trafic_analysis.png", width=250)
upload_2g_col, upload_3g_col, upload_lte_col = st.columns(3)
with upload_2g_col:
two_g_file = st.file_uploader("Upload 2G Traffic Report", type=["csv", "zip"])
with upload_3g_col:
three_g_file = st.file_uploader("Upload 3G Traffic Report", type=["csv", "zip"])
with upload_lte_col:
lte_file = st.file_uploader("Upload LTE Traffic Report", type=["csv", "zip"])
pre_range_col, post_range_col = st.columns(2)
with pre_range_col:
pre_range = st.date_input("Pre-period (from - to)", [])
with post_range_col:
post_range = st.date_input("Post-period (from - to)", [])
last_period_range_col, number_of_top_trafic_sites_col = st.columns(2)
with last_period_range_col:
last_period_range = st.date_input("Last period (from - to)", [])
with number_of_top_trafic_sites_col:
number_of_top_trafic_sites = st.number_input(
"Number of top traffic sites", value=25
)
sla_2g_col, sla_3g_col, sla_lte_col = st.columns(3)
with sla_2g_col:
sla_2g = st.number_input("2G TCH availability SLA (%)", value=98.0)
with sla_3g_col:
sla_3g = st.number_input("3G Cell availability SLA (%)", value=98.0)
with sla_lte_col:
sla_lte = st.number_input("LTE Cell availability SLA (%)", value=98.0)
if len(pre_range) != 2 or len(post_range) != 2:
st.warning("⚠️ Please select 2 dates for each period (pre and post).")
st.stop()
if not all([two_g_file, three_g_file, lte_file]):
st.info("Please upload all 3 reports and select the comparison periods.")
st.stop()
# Warning if pre and post periode are the same
if pre_range == post_range:
st.warning("⚠️ Pre and post periode are the same.")
st.stop()
# Warning if pre and post are overlapping
if pre_range[0] < post_range[0] and pre_range[1] > post_range[1]:
st.warning(" Pre and post periode are overlapping.")
st.stop()
run_analysis = st.button(" Run Analysis")
if run_analysis:
df_2g = read_uploaded_file(two_g_file)
df_3g = read_uploaded_file(three_g_file)
df_lte = read_uploaded_file(lte_file)
df_2g_clean = preprocess_2g(df_2g)
df_3g_clean = preprocess_3g(df_3g)
df_lte_clean = preprocess_lte(df_lte)
full_df, last_period, sum_pre_post_analysis, avg_pre_post_analysis = (
merge_and_compare(
df_2g_clean,
df_3g_clean,
df_lte_clean,
pre_range,
post_range,
last_period_range,
)
)
monthly_voice_df, monthly_data_df = monthly_data_analysis(full_df)
st.session_state["full_df"] = full_df
st.session_state["last_period"] = last_period
st.session_state["sum_pre_post_analysis"] = sum_pre_post_analysis
st.session_state["avg_pre_post_analysis"] = avg_pre_post_analysis
st.session_state["monthly_voice_df"] = monthly_voice_df
st.session_state["monthly_data_df"] = monthly_data_df
if "full_df" in st.session_state:
full_df = st.session_state["full_df"]
last_period = st.session_state["last_period"]
sum_pre_post_analysis = st.session_state["sum_pre_post_analysis"]
avg_pre_post_analysis = st.session_state["avg_pre_post_analysis"]
monthly_voice_df = st.session_state["monthly_voice_df"]
monthly_data_df = st.session_state["monthly_data_df"]
full_df["week"] = full_df["date"].dt.isocalendar().week
full_df["year"] = full_df["date"].dt.isocalendar().year
analysis_df = full_df
analysis_last_period = last_period
if "City" in full_df.columns:
available_cities = full_df["City"].dropna().unique()
if len(available_cities) > 0:
selected_cities = st.multiselect(
"Filter analysis by City (optional)",
sorted(available_cities),
)
if selected_cities:
analysis_df = full_df[full_df["City"].isin(selected_cities)].copy()
analysis_last_period = last_period[
last_period["City"].isin(selected_cities)
].copy()
# Display Summary
st.success(" Analysis completed")
st.subheader(" Summary Analysis Pre / Post")
st.dataframe(sum_pre_post_analysis)
summary_2g_avail, site_2g_avail = analyze_2g_availability(analysis_df, sla_2g)
if summary_2g_avail is not None:
st.subheader("2G - TCH Availability vs SLA")
st.write(f"SLA target 2G TCH availability: {sla_2g}%")
st.dataframe(summary_2g_avail.round(2))
st.subheader("2G - TCH Availability by site (worst 25 by post-period)")
worst_sites_2g = site_2g_avail.sort_values("tch_avail_post").head(25)
st.dataframe(worst_sites_2g.round(2))
else:
st.info(
"2G TCH availability KPI not found in input report or no data for selected periods."
)
summary_3g_avail, site_3g_avail = analyze_3g_availability(analysis_df, sla_3g)
if summary_3g_avail is not None:
st.subheader("3G - Cell Availability vs SLA")
st.write(f"SLA target 3G Cell availability: {sla_3g}%")
st.dataframe(summary_3g_avail.round(2))
st.subheader("3G - Cell Availability by site (worst 25 by post-period)")
worst_sites_3g = site_3g_avail.sort_values("cell_avail_post").head(25)
st.dataframe(worst_sites_3g.round(2))
else:
st.info(
"3G Cell Availability KPI not found in input report or no data for selected periods."
)
summary_lte_avail, site_lte_avail = analyze_lte_availability(analysis_df, sla_lte)
if summary_lte_avail is not None:
st.subheader("LTE - Cell Availability vs SLA")
st.write(f"SLA target LTE Cell availability: {sla_lte}%")
st.dataframe(summary_lte_avail.round(2))
st.subheader("LTE - Cell Availability by site (worst 25 by post-period)")
worst_sites_lte = site_lte_avail.sort_values("lte_avail_post").head(25)
st.dataframe(worst_sites_lte.round(2))
else:
st.info(
"LTE Cell Availability KPI not found in input report or no data for selected periods."
)
# Multi-RAT availability view
multi_rat_df = analyze_multirat_availability(analysis_df, sla_2g, sla_3g, sla_lte)
if multi_rat_df is not None:
st.subheader("Multi-RAT Availability by site (post-period)")
st.dataframe(multi_rat_df.round(2))
worst_2g = None
if (
"2g_avail_post" in multi_rat_df.columns
and "ok_2g_post" in multi_rat_df.columns
and "post_total_data_trafic" in multi_rat_df.columns
):
tmp = multi_rat_df[
(multi_rat_df["ok_2g_post"] == False)
& multi_rat_df["post_total_data_trafic"].notna()
].copy()
if not tmp.empty:
worst_2g = tmp.sort_values(
"post_total_data_trafic", ascending=False
).head(number_of_top_trafic_sites)
worst_3g = None
if (
"3g_avail_post" in multi_rat_df.columns
and "ok_3g_post" in multi_rat_df.columns
and "post_total_data_trafic" in multi_rat_df.columns
):
tmp = multi_rat_df[
(multi_rat_df["ok_3g_post"] == False)
& multi_rat_df["post_total_data_trafic"].notna()
].copy()
if not tmp.empty:
worst_3g = tmp.sort_values(
"post_total_data_trafic", ascending=False
).head(number_of_top_trafic_sites)
worst_lte = None
if (
"lte_avail_post" in multi_rat_df.columns
and "ok_lte_post" in multi_rat_df.columns
and "post_total_data_trafic" in multi_rat_df.columns
):
tmp = multi_rat_df[
(multi_rat_df["ok_lte_post"] == False)
& multi_rat_df["post_total_data_trafic"].notna()
].copy()
if not tmp.empty:
worst_lte = tmp.sort_values(
"post_total_data_trafic", ascending=False
).head(number_of_top_trafic_sites)
st.subheader(
f"Worst high-traffic & low-availability sites by RAT (top {number_of_top_trafic_sites}, post-period)"
)
tab_2g, tab_3g, tab_lte = st.tabs(["2G", "3G", "LTE"])
with tab_2g:
if worst_2g is not None and not worst_2g.empty:
st.dataframe(worst_2g.round(2))
else:
st.info(
"No 2G sites with low availability and significant traffic in post-period."
)
with tab_3g:
if worst_3g is not None and not worst_3g.empty:
st.dataframe(worst_3g.round(2))
else:
st.info(
"No 3G sites with low availability and significant traffic in post-period."
)
with tab_lte:
if worst_lte is not None and not worst_lte.empty:
st.dataframe(worst_lte.round(2))
else:
st.info(
"No LTE sites with low availability and significant traffic in post-period."
)
st.subheader("Persistent availability issues and critical sites")
min_persistent_days = st.number_input(
"Minimum consecutive days below SLA to flag persistent issue",
min_value=2,
max_value=30,
value=3,
step=1,
)
persistent_df = analyze_persistent_availability(
analysis_df, multi_rat_df, sla_2g, sla_3g, sla_lte, int(min_persistent_days)
)
if persistent_df is not None and not persistent_df.empty:
top_critical_n = st.number_input(
"Number of top critical sites to display",
min_value=5,
max_value=200,
value=25,
step=5,
)
st.dataframe(persistent_df.head(top_critical_n).round(2))
else:
st.info(
"No persistent availability issues detected with current parameters."
)
if not analysis_df.empty:
st.subheader("Site drill-down: traffic and availability over time")
sites_df = (
analysis_df[["code", "City"]]
.drop_duplicates()
.sort_values(by=["City", "code"])
)
site_options = sites_df.apply(
lambda row: (
f"{row['City']}_{row['code']}"
if pd.notna(row["City"])
else str(row["code"])
),
axis=1,
)
site_map = dict(zip(site_options, sites_df["code"]))
selected_site_label = st.selectbox(
"Select a site for detailed view", options=site_options
)
selected_code = site_map.get(selected_site_label)
site_detail_df = analysis_df[analysis_df["code"] == selected_code].copy()
if not site_detail_df.empty:
site_detail_df = site_detail_df.sort_values("date")
traffic_cols = [
col
for col in ["total_voice_trafic", "total_data_trafic"]
if col in site_detail_df.columns
]
if traffic_cols:
traffic_long = site_detail_df[["date"] + traffic_cols].melt(
id_vars="date",
value_vars=traffic_cols,
var_name="metric",
value_name="value",
)
fig_traffic = px.line(
traffic_long,
x="date",
y="value",
color="metric",
)
st.plotly_chart(fig_traffic)
avail_cols = []
rename_map = {}
if "2g_tch_avail" in site_detail_df.columns:
avail_cols.append("2g_tch_avail")
rename_map["2g_tch_avail"] = "2G"
if "3g_cell_avail" in site_detail_df.columns:
avail_cols.append("3g_cell_avail")
rename_map["3g_cell_avail"] = "3G"
if "lte_cell_avail" in site_detail_df.columns:
avail_cols.append("lte_cell_avail")
rename_map["lte_cell_avail"] = "LTE"
if avail_cols:
avail_df = site_detail_df[["date"] + avail_cols].copy()
avail_df = avail_df.rename(columns=rename_map)
value_cols = [c for c in avail_df.columns if c != "date"]
avail_long = avail_df.melt(
id_vars="date",
value_vars=value_cols,
var_name="RAT",
value_name="availability",
)
fig_avail = px.line(
avail_long,
x="date",
y="availability",
color="RAT",
)
st.plotly_chart(fig_avail)
site_detail_df["date_only"] = site_detail_df["date"].dt.date
degraded_rows_site = []
for rat_col, rat_name, sla_value in [
("2g_tch_avail", "2G", sla_2g),
("3g_cell_avail", "3G", sla_3g),
("lte_cell_avail", "LTE", sla_lte),
]:
if rat_col in site_detail_df.columns:
daily_site = (
site_detail_df.groupby("date_only")[rat_col].mean().dropna()
)
mask = daily_site < sla_value
for d, val in daily_site[mask].items():
degraded_rows_site.append(
{
"RAT": rat_name,
"date": d,
"avg_availability": val,
"SLA": sla_value,
}
)
if degraded_rows_site:
degraded_site_df = pd.DataFrame(degraded_rows_site)
st.dataframe(degraded_site_df.round(2))
if "City" in analysis_df.columns and analysis_df["City"].notna().any():
st.subheader("City drill-down: traffic and availability over time")
cities_df = (
analysis_df[["City"]].dropna().drop_duplicates().sort_values(by="City")
)
selected_city = st.selectbox(
"Select a City for aggregated view",
options=cities_df["City"].tolist(),
)
city_detail_df = analysis_df[analysis_df["City"] == selected_city].copy()
if not city_detail_df.empty:
city_detail_df = city_detail_df.sort_values("date")
traffic_cols_city = [
col
for col in ["total_voice_trafic", "total_data_trafic"]
if col in city_detail_df.columns
]
if traffic_cols_city:
city_traffic = (
city_detail_df.groupby("date")[traffic_cols_city]
.sum()
.reset_index()
)
traffic_long_city = city_traffic.melt(
id_vars="date",
value_vars=traffic_cols_city,
var_name="metric",
value_name="value",
)
fig_traffic_city = px.line(
traffic_long_city,
x="date",
y="value",
color="metric",
)
st.plotly_chart(
fig_traffic_city,
key=f"traffic_city_{selected_city}",
)
avail_cols_city = []
rename_map_city = {}
if "2g_tch_avail" in city_detail_df.columns:
avail_cols_city.append("2g_tch_avail")
rename_map_city["2g_tch_avail"] = "2G"
if "3g_cell_avail" in city_detail_df.columns:
avail_cols_city.append("3g_cell_avail")
rename_map_city["3g_cell_avail"] = "3G"
if "lte_cell_avail" in city_detail_df.columns:
avail_cols_city.append("lte_cell_avail")
rename_map_city["lte_cell_avail"] = "LTE"
if avail_cols_city:
avail_city_df = city_detail_df[["date"] + avail_cols_city].copy()
avail_city_df = avail_city_df.rename(columns=rename_map_city)
value_cols_city = [c for c in avail_city_df.columns if c != "date"]
avail_long_city = avail_city_df.melt(
id_vars="date",
value_vars=value_cols_city,
var_name="RAT",
value_name="availability",
)
fig_avail_city = px.line(
avail_long_city,
x="date",
y="availability",
color="RAT",
)
st.plotly_chart(
fig_avail_city,
key=f"avail_city_{selected_city}",
)
city_detail_df["date_only"] = city_detail_df["date"].dt.date
degraded_rows_city = []
for rat_col, rat_name, sla_value in [
("2g_tch_avail", "2G", sla_2g),
("3g_cell_avail", "3G", sla_3g),
("lte_cell_avail", "LTE", sla_lte),
]:
if rat_col in city_detail_df.columns:
daily_city = (
city_detail_df.groupby("date_only")[rat_col]
.mean()
.dropna()
)
mask_city = daily_city < sla_value
for d, val in daily_city[mask_city].items():
degraded_rows_city.append(
{
"RAT": rat_name,
"date": d,
"avg_availability": val,
"SLA": sla_value,
}
)
if degraded_rows_city:
degraded_city_df = pd.DataFrame(degraded_rows_city)
st.dataframe(degraded_city_df.round(2))
# Temporal availability analysis - daily averages per RAT
if any(
col in analysis_df.columns
for col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"]
):
temp_df = analysis_df.copy()
temp_df["date_only"] = temp_df["date"].dt.date
agg_dict = {}
if "2g_tch_avail" in temp_df.columns:
agg_dict["2g_tch_avail"] = "mean"
if "3g_cell_avail" in temp_df.columns:
agg_dict["3g_cell_avail"] = "mean"
if "lte_cell_avail" in temp_df.columns:
agg_dict["lte_cell_avail"] = "mean"
daily_avail = (
temp_df.groupby("date_only", as_index=False).agg(agg_dict)
if agg_dict
else pd.DataFrame()
)
if not daily_avail.empty:
rename_map = {}
if "2g_tch_avail" in daily_avail.columns:
rename_map["2g_tch_avail"] = "2G"
if "3g_cell_avail" in daily_avail.columns:
rename_map["3g_cell_avail"] = "3G"
if "lte_cell_avail" in daily_avail.columns:
rename_map["lte_cell_avail"] = "LTE"
daily_avail = daily_avail.rename(columns=rename_map)
value_cols = [c for c in daily_avail.columns if c != "date_only"]
if value_cols:
daily_melt = daily_avail.melt(
id_vars="date_only",
value_vars=value_cols,
var_name="RAT",
value_name="availability",
)
st.subheader("Daily average availability per RAT")
fig = px.line(
daily_melt,
x="date_only",
y="availability",
color="RAT",
markers=True,
)
st.plotly_chart(fig)
degraded_rows = []
for rat_name, sla_value in [
("2G", sla_2g),
("3G", sla_3g),
("LTE", sla_lte),
]:
if rat_name in daily_avail.columns:
series = daily_avail[rat_name]
mask = series < sla_value
for d, val in zip(
daily_avail.loc[mask, "date_only"], series[mask]
):
degraded_rows.append(
{
"RAT": rat_name,
"date": d,
"avg_availability": val,
"SLA": sla_value,
}
)
if degraded_rows:
degraded_df = pd.DataFrame(degraded_rows)
st.subheader("Days with average availability below SLA")
st.dataframe(degraded_df.round(2))
TraficAnalysis.last_period_df = analysis_last_period
#######################################################################################################"""
#######################################################################################################
if TraficAnalysis.last_period_df is not None:
df = TraficAnalysis.last_period_df
# Get top trafics sites based on total data trafic during last period
top_sites = (
df.groupby(["code", "City"])["total_data_trafic"]
.sum()
.sort_values(ascending=False)
)
top_sites = top_sites.head(number_of_top_trafic_sites)
st.subheader(f"Top {number_of_top_trafic_sites} sites by data traffic")
chart_col, data_col = st.columns(2)
with data_col:
st.dataframe(top_sites.sort_values(ascending=True))
# chart
fig = px.bar(
top_sites.reset_index(),
y=top_sites.reset_index()[["City", "code"]].agg(
lambda x: "_".join(map(str, x)), axis=1
),
x="total_data_trafic",
title=f"Top {number_of_top_trafic_sites} sites by data traffic",
orientation="h",
text="total_data_trafic",
text_auto=True,
)
# fig.update_layout(height=600)
with chart_col:
st.plotly_chart(fig)
# Top sites by voice trafic during last period
top_sites_voice = (
df.groupby(["code", "City"])["total_voice_trafic"]
.sum()
.sort_values(ascending=False)
)
top_sites_voice = top_sites_voice.head(number_of_top_trafic_sites)
st.subheader(f"Top {number_of_top_trafic_sites} sites by voice traffic")
chart_col, data_col = st.columns(2)
with data_col:
st.dataframe(top_sites_voice.sort_values(ascending=True))
# chart
fig = px.bar(
top_sites_voice.reset_index(),
y=top_sites_voice.reset_index()[["City", "code"]].agg(
lambda x: "_".join(map(str, x)), axis=1
),
x="total_voice_trafic",
title=f"Top {number_of_top_trafic_sites} sites by voice traffic",
orientation="h",
text="total_voice_trafic",
text_auto=True,
)
# fig.update_layout(height=600)
with chart_col:
st.plotly_chart(fig)
#####################################################
min_size = 5
max_size = 40
# Map of sum of data trafic during last period
# Aggregate total data traffic
df_data = (
df.groupby(["code", "City", "Latitude", "Longitude"])["total_data_trafic"]
.sum()
.reset_index()
)
st.subheader("Map of data trafic during last period")
# Define size range
# Linear size scaling
traffic_data_min = df_data["total_data_trafic"].min()
traffic_data_max = df_data["total_data_trafic"].max()
df_data["bubble_size"] = df_data["total_data_trafic"].apply(
lambda x: min_size
+ (max_size - min_size)
* (x - traffic_data_min)
/ (traffic_data_max - traffic_data_min)
)
# Custom blue color scale: start from visible blue
custom_blue_red = [
[0.0, "#4292c6"], # light blue
[0.2, "#2171b5"],
[0.4, "#084594"], # dark blue
[0.6, "#cb181d"], # Strong red
[0.8, "#a50f15"], # Darker red
[1.0, "#67000d"], # Very dark red
]
fig = px.scatter_map(
df_data,
lat="Latitude",
lon="Longitude",
color="total_data_trafic",
size="bubble_size",
color_continuous_scale=custom_blue_red,
size_max=max_size,
zoom=10,
height=600,
title="Data traffic distribution",
hover_data={"code": True, "total_data_trafic": True},
hover_name="code",
text=[str(x) for x in df_data["code"]],
)
fig.update_layout(
mapbox_style="open-street-map",
coloraxis_colorbar=dict(title="Total Data Traffic (MB)"),
coloraxis=dict(cmin=traffic_data_min, cmax=traffic_data_max),
font=dict(size=10, color="black"),
)
st.plotly_chart(fig)
########################################################################################
# Map of sum of voice trafic during last period
# Aggregate total voice traffic
df_voice = (
df.groupby(["code", "City", "Latitude", "Longitude"])["total_voice_trafic"]
.sum()
.reset_index()
)
st.subheader("Map of voice trafic during last period")
# Linear size scaling
traffic_voice_min = df_voice["total_voice_trafic"].min()
traffic_voice_max = df_voice["total_voice_trafic"].max()
df_voice["bubble_size"] = df_voice["total_voice_trafic"].apply(
lambda x: min_size
+ (max_size - min_size)
* (x - traffic_voice_min)
/ (traffic_voice_max - traffic_voice_min)
)
fig = px.scatter_map(
df_voice,
lat="Latitude",
lon="Longitude",
color="total_voice_trafic",
size="bubble_size",
color_continuous_scale=custom_blue_red,
size_max=max_size,
zoom=10,
height=600,
title="Voice traffic distribution",
hover_data={"code": True, "total_voice_trafic": True},
hover_name="code",
text=[str(x) for x in df_voice["code"]],
)
fig.update_layout(
mapbox_style="open-street-map",
coloraxis_colorbar=dict(title="Total Voice Traffic (MB)"),
coloraxis=dict(cmin=traffic_voice_min, cmax=traffic_voice_max),
font=dict(size=10, color="black"),
)
st.plotly_chart(fig)
# Prepare availability DataFrames for export (fallback to empty if KPI missing)
summary_frames = []
if "summary_2g_avail" in locals() and summary_2g_avail is not None:
tmp = summary_2g_avail.copy()
tmp["RAT"] = "2G"
summary_frames.append(tmp)
if "summary_3g_avail" in locals() and summary_3g_avail is not None:
tmp = summary_3g_avail.copy()
tmp["RAT"] = "3G"
summary_frames.append(tmp)
if "summary_lte_avail" in locals() and summary_lte_avail is not None:
tmp = summary_lte_avail.copy()
tmp["RAT"] = "LTE"
summary_frames.append(tmp)
if summary_frames:
availability_summary_all = pd.concat(summary_frames, ignore_index=True)
else:
availability_summary_all = pd.DataFrame()
export_site_2g = (
site_2g_avail
if "site_2g_avail" in locals() and site_2g_avail is not None
else pd.DataFrame()
)
export_site_3g = (
site_3g_avail
if "site_3g_avail" in locals() and site_3g_avail is not None
else pd.DataFrame()
)
export_site_lte = (
site_lte_avail
if "site_lte_avail" in locals() and site_lte_avail is not None
else pd.DataFrame()
)
export_multi_rat_base = analyze_multirat_availability(
full_df, sla_2g, sla_3g, sla_lte
)
if export_multi_rat_base is not None:
export_multi_rat = export_multi_rat_base
else:
export_multi_rat = pd.DataFrame()
export_persistent = pd.DataFrame()
if export_multi_rat_base is not None:
export_persistent_tmp = analyze_persistent_availability(
full_df, export_multi_rat_base, sla_2g, sla_3g, sla_lte
)
if export_persistent_tmp is not None:
export_persistent = export_persistent_tmp
final_dfs = convert_dfs(
[
full_df,
sum_pre_post_analysis,
avg_pre_post_analysis,
monthly_voice_df,
monthly_data_df,
availability_summary_all,
export_site_2g,
export_site_3g,
export_site_lte,
export_multi_rat,
export_persistent,
],
[
"Global_Trafic_Analysis",
"Sum_pre_post_analysis",
"Avg_pre_post_analysis",
"Monthly_voice_analysis",
"Monthly_data_analysis",
"Availability_Summary_All_RAT",
"TwoG_Availability_By_Site",
"ThreeG_Availability_By_Site",
"LTE_Availability_By_Site",
"MultiRAT_Availability_By_Site",
"Top_Critical_Sites",
],
)
# 📥 Bouton de téléchargement
st.download_button(
on_click="ignore",
type="primary",
label="Download the Analysis Report",
data=final_dfs,
file_name=f"Global_Trafic_Analysis_Report_{datetime.now()}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)