idx-2 / utils.py
omniverse1's picture
update utils-1.5
ed302b6 verified
import yfinance as yf
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import pytz
import time
import random
import threading
from dataclasses import dataclass
# Import configurations
from config import IDX_STOCKS, IDX_MARKET_CONFIG
# --- Market Status Manager ---
@dataclass
class MarketStatus:
"""Data class to hold market status information"""
is_open: bool
status_text: str
next_trading_day: str
last_updated: str
time_until_open: str
time_until_close: str
current_time_et: str
market_name: str
market_type: str
market_symbol: str
# Global configuration for market status updates
MARKET_STATUS_UPDATE_INTERVAL_MINUTES = 10 # Update every 10 minutes
class MarketStatusManager:
"""Manages market status with periodic updates for the IDX market"""
def __init__(self):
self.update_interval = MARKET_STATUS_UPDATE_INTERVAL_MINUTES * 60 # Convert to seconds
self._statuses = {}
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._update_thread = None
self._start_update_thread()
def _get_current_market_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus:
"""Get current market status with detailed information for specific market"""
config = IDX_MARKET_CONFIG[market_key]
now = datetime.now(pytz.timezone('Asia/Jakarta')) # Use timezone-aware datetime for IDX
market_tz = pytz.timezone(config['timezone'])
market_time = now.astimezone(market_tz)
open_hour, open_minute = map(int, config['open_time'].split(':'))
close_hour, close_minute = map(int, config['close_time'].split(':'))
market_open_time = market_time.replace(hour=open_hour, minute=open_minute, second=0, microsecond=0)
market_close_time = market_time.replace(hour=close_hour, minute=close_minute, second=0, microsecond=0)
is_trading_day = market_time.weekday() in config['days']
if not is_trading_day:
is_open = False
status_text = f"{config['name']} is closed (Weekend)"
elif market_open_time <= market_time <= market_close_time:
is_open = True
status_text = f"{config['name']} is currently open"
else:
is_open = False
if market_time < market_open_time:
status_text = f"{config['name']} is closed (Before opening)"
else:
status_text = f"{config['name']} is closed (After closing)"
next_trading_day = self._get_next_trading_day(market_time, config['days'])
time_until_open = self._get_time_until_open(market_time, market_open_time, config)
time_until_close = self._get_time_until_close(market_time, market_close_time, config)
return MarketStatus(
is_open=is_open,
status_text=status_text,
next_trading_day=next_trading_day.strftime('%Y-%m-%d'),
last_updated=market_time.strftime('%Y-%m-%d %H:%M:%S %Z'),
time_until_open=time_until_open,
time_until_close=time_until_close,
current_time_et=market_time.strftime('%H:%M:%S %Z'),
market_name=config['name'],
market_type=config['type'],
market_symbol=config['symbol']
)
def _get_next_trading_day(self, current_time: datetime, trading_days: list) -> datetime:
"""Get the next trading day for a specific market"""
next_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
while next_day.weekday() not in trading_days:
next_day += timedelta(days=1)
return next_day
def _get_time_until_open(self, current_time: datetime, market_open_time: datetime, config: dict) -> str:
"""Calculate time until market opens"""
if current_time.weekday() not in config['days']:
days_until_next = 1
while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']:
days_until_next += 1
next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next)
next_open = next_trading_day.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
else:
if current_time < market_open_time:
time_diff = market_open_time - current_time
else:
days_until_next = 1
while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']:
days_until_next += 1
next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next)
next_open = next_trading_day.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
return self._format_time_delta(time_diff)
def _get_time_until_close(self, current_time: datetime, market_close_time: datetime, config: dict) -> str:
"""Calculate time until market closes"""
if current_time.weekday() not in config['days']:
return "N/A (Weekend)"
if current_time < market_close_time:
time_diff = market_close_time - current_time
return self._format_time_delta(time_diff)
else:
return "Market closed for today"
def _format_time_delta(self, time_diff: timedelta) -> str:
"""Format timedelta into human-readable string"""
total_seconds = int(time_diff.total_seconds())
if total_seconds < 0:
return "N/A"
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def _update_loop(self):
"""Background thread loop for updating market status"""
while not self._stop_event.is_set():
try:
new_statuses = {}
market_key = 'IDX_STOCKS'
new_statuses[market_key] = self._get_current_market_status(market_key)
with self._lock:
self._statuses = new_statuses
time.sleep(self.update_interval)
except Exception as e:
print(f"Error updating market status: {str(e)}")
time.sleep(60)
def _start_update_thread(self):
"""Start the background update thread"""
if self._update_thread is None or not self._update_thread.is_alive():
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def get_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus:
"""Get current market status (thread-safe)"""
with self._lock:
if market_key not in self._statuses:
self._statuses[market_key] = self._get_current_market_status(market_key)
return self._statuses[market_key]
def stop(self):
"""Stop the update thread"""
self._stop_event.set()
if self._update_thread and self._update_thread.is_alive():
self._update_thread.join(timeout=5)
# --- LAZY INITIALIZATION FIX ---
_market_status_manager_instance = None
def get_market_manager():
"""Returns the singleton MarketStatusManager instance, initializing it if necessary."""
global _market_status_manager_instance
if _market_status_manager_instance is None:
_market_status_manager_instance = MarketStatusManager()
return _market_status_manager_instance
# Hapus baris lama: market_status_manager = MarketStatusManager()
# --- LAZY INITIALIZATION FIX ---
# --- Utility Functions (Modified) ---
def retry_yfinance_request(func, max_retries=3, initial_delay=1):
# ... (SAMA) ...
for attempt in range(max_retries):
try:
result = func()
if result is None or (isinstance(result, pd.DataFrame) and result.empty):
if attempt == max_retries - 1:
print(f"Function returned empty/None after {max_retries} attempts")
return None
return result
except Exception as e:
if attempt == max_retries - 1:
print(f"Final attempt failed after {max_retries} retries: {str(e)}")
return None
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1))
print(f"Error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
time.sleep(delay)
return None
def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series:
# ... (SAMA) ...
prices = prices.ffill().bfill()
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series]:
# ... (SAMA) ...
prices = prices.ffill().bfill()
exp1 = prices.ewm(span=fast, adjust=False).mean()
exp2 = prices.ewm(span=slow, adjust=False).mean()
macd = exp1 - exp2
signal_line = macd.ewm(span=signal, adjust=False).mean()
return macd, signal_line
def calculate_bollinger_bands(prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series, pd.Series]:
# ... (SAMA) ...
prices = prices.ffill().bfill()
middle_band = prices.rolling(window=period).mean()
std = prices.rolling(window=period).std()
upper_band = middle_band + (std * std_dev)
lower_band = middle_band - (std * std_dev)
return upper_band, middle_band, lower_band
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
# ... (SAMA) ...
try:
# Calculate moving averages
df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean()
df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean()
df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean()
# Calculate RSI
df['RSI'] = calculate_rsi(df['Close'])
# Calculate MACD
df['MACD'], df['MACD_Signal'] = calculate_macd(df['Close'])
# Calculate Bollinger Bands
df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close'])
# Calculate returns and volatility
df['Returns'] = df['Close'].pct_change()
df['Volatility'] = df['Returns'].rolling(window=20).std()
# Fill any remaining NaN values
df = df.fillna(method='ffill').fillna(method='bfill')
return df
except Exception as e:
print(f"Error calculating technical indicators: {str(e)}")
return df
# --- Existing Utility Functions (Modified) ---
# Hapus fungsi get_idx_stocks
def fetch_stock_data(symbol, period_days):
# ... (SAMA) ...
try:
end_date = datetime.now()
start_date = end_date - timedelta(days=period_days + 30)
ticker = yf.Ticker(symbol)
def fetch_history():
return ticker.history(start=start_date, end=end_date, interval="1d")
data = retry_yfinance_request(fetch_history)
if data is None or data.empty:
return None
# Apply technical indicators and cleanup
data = calculate_technical_indicators(data)
# Filter for the exact number of days requested (trading days)
data = data.tail(period_days)
if data.empty:
return None
data.index = pd.to_datetime(data.index)
return data
except Exception as e:
print(f"Error fetching data for {symbol}: {e}")
return None
def prepare_timesfm_data(stock_data, use_volume=False):
# ... (SAMA) ...
data = stock_data['Close'].copy()
data = data.fillna(method='ffill').fillna(method='bfill')
# Chronos needs 1D numpy array of raw data
timeseries_data = data.values.flatten()
# We no longer use a separate scaler
return timeseries_data, None
def create_forecast_plot(historical_data, forecast_dates, forecast_prices, symbol):
# ... (SAMA) ...
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=(f'{symbol} Stock Price Forecast', 'Volume'),
row_width=[0.2, 0.7]
)
# Historical closing prices
fig.add_trace(
go.Scatter(
x=historical_data.index,
y=historical_data['Close'],
mode='lines',
name='Historical Price',
line=dict(color='blue', width=2)
),
row=1, col=1
)
# Forecast prices
fig.add_trace(
go.Scatter(
x=forecast_dates,
y=forecast_prices,
mode='lines',
name='Forecast',
line=dict(color='red', width=2, dash='dash')
),
row=1, col=1
)
# Confidence interval (simple approach)
std_dev = np.std(historical_data['Close'].values) * 0.1
upper_bound = forecast_prices + std_dev
lower_bound = forecast_prices - std_dev
fig.add_trace(
go.Scatter(
x=forecast_dates,
y=upper_bound,
mode='lines',
line=dict(width=0),
showlegend=False,
hoverinfo='skip'
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=forecast_dates,
y=lower_bound,
mode='lines',
line=dict(width=0),
fill='tonexty',
fillcolor='rgba(255,0,0,0.2)',
name='Confidence Interval',
hoverinfo='skip'
),
row=1, col=1
)
# Volume
fig.add_trace(
go.Bar(
x=historical_data.index,
y=historical_data['Volume'],
name='Volume',
marker_color='lightblue',
yaxis='y2'
),
row=2, col=1
)
# Update layout
fig.update_layout(
title=f'{symbol} Stock Price Prediction',
xaxis_title='Date',
yaxis_title='Price (IDR)',
yaxis2_title='Volume',
hovermode='x unified',
showlegend=True,
height=600
)
fig.update_yaxes(title_text="Price (IDR)", row=1, col=1)
fig.update_yaxes(title_text="Volume", row=2, col=1)
fig.update_xaxes(title_text="Date", row=2, col=1)
return fig
def get_stock_info(symbol):
# ... (SAMA) ...
try:
ticker = yf.Ticker(symbol)
info = retry_yfinance_request(lambda: ticker.info)
if info is None:
return {}
# Format market cap
market_cap = info.get('marketCap', 0)
if market_cap:
if market_cap >= 1e12:
market_cap_str = f"Rp {market_cap/1e12:.2f}T"
elif market_cap >= 1e9:
market_cap_str = f"Rp {market_cap/1e9:.2f}B"
elif market_cap >= 1e6:
market_cap_str = f"Rp {market_cap/1e6:.2f}M"
else:
market_cap_str = f"Rp {market_cap:,.0f}"
info['marketCap'] = market_cap_str
return info
except Exception as e:
print(f"Error getting stock info: {e}")
return {}