import yfinance as yf import pandas as pd import numpy as np import plotly.graph_objects as go from plotly.subplots import make_subplots from datetime import datetime, timedelta from typing import Dict, List, Tuple import pytz import time import random import threading from dataclasses import dataclass # Import configurations from config import IDX_STOCKS, IDX_MARKET_CONFIG # --- Market Status Manager --- @dataclass class MarketStatus: """Data class to hold market status information""" is_open: bool status_text: str next_trading_day: str last_updated: str time_until_open: str time_until_close: str current_time_et: str market_name: str market_type: str market_symbol: str # Global configuration for market status updates MARKET_STATUS_UPDATE_INTERVAL_MINUTES = 10 # Update every 10 minutes class MarketStatusManager: """Manages market status with periodic updates for the IDX market""" def __init__(self): self.update_interval = MARKET_STATUS_UPDATE_INTERVAL_MINUTES * 60 # Convert to seconds self._statuses = {} self._lock = threading.Lock() self._stop_event = threading.Event() self._update_thread = None self._start_update_thread() def _get_current_market_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus: """Get current market status with detailed information for specific market""" config = IDX_MARKET_CONFIG[market_key] now = datetime.now(pytz.timezone('Asia/Jakarta')) # Use timezone-aware datetime for IDX market_tz = pytz.timezone(config['timezone']) market_time = now.astimezone(market_tz) open_hour, open_minute = map(int, config['open_time'].split(':')) close_hour, close_minute = map(int, config['close_time'].split(':')) market_open_time = market_time.replace(hour=open_hour, minute=open_minute, second=0, microsecond=0) market_close_time = market_time.replace(hour=close_hour, minute=close_minute, second=0, microsecond=0) is_trading_day = market_time.weekday() in config['days'] if not is_trading_day: is_open = False status_text = f"{config['name']} is closed (Weekend)" elif market_open_time <= market_time <= market_close_time: is_open = True status_text = f"{config['name']} is currently open" else: is_open = False if market_time < market_open_time: status_text = f"{config['name']} is closed (Before opening)" else: status_text = f"{config['name']} is closed (After closing)" next_trading_day = self._get_next_trading_day(market_time, config['days']) time_until_open = self._get_time_until_open(market_time, market_open_time, config) time_until_close = self._get_time_until_close(market_time, market_close_time, config) return MarketStatus( is_open=is_open, status_text=status_text, next_trading_day=next_trading_day.strftime('%Y-%m-%d'), last_updated=market_time.strftime('%Y-%m-%d %H:%M:%S %Z'), time_until_open=time_until_open, time_until_close=time_until_close, current_time_et=market_time.strftime('%H:%M:%S %Z'), market_name=config['name'], market_type=config['type'], market_symbol=config['symbol'] ) def _get_next_trading_day(self, current_time: datetime, trading_days: list) -> datetime: """Get the next trading day for a specific market""" next_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) while next_day.weekday() not in trading_days: next_day += timedelta(days=1) return next_day def _get_time_until_open(self, current_time: datetime, market_open_time: datetime, config: dict) -> str: """Calculate time until market opens""" if current_time.weekday() not in config['days']: days_until_next = 1 while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']: days_until_next += 1 next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next) next_open = next_trading_day.replace( hour=int(config['open_time'].split(':')[0]), minute=int(config['open_time'].split(':')[1]), second=0, microsecond=0 ) time_diff = next_open - current_time else: if current_time < market_open_time: time_diff = market_open_time - current_time else: days_until_next = 1 while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']: days_until_next += 1 next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next) next_open = next_trading_day.replace( hour=int(config['open_time'].split(':')[0]), minute=int(config['open_time'].split(':')[1]), second=0, microsecond=0 ) time_diff = next_open - current_time return self._format_time_delta(time_diff) def _get_time_until_close(self, current_time: datetime, market_close_time: datetime, config: dict) -> str: """Calculate time until market closes""" if current_time.weekday() not in config['days']: return "N/A (Weekend)" if current_time < market_close_time: time_diff = market_close_time - current_time return self._format_time_delta(time_diff) else: return "Market closed for today" def _format_time_delta(self, time_diff: timedelta) -> str: """Format timedelta into human-readable string""" total_seconds = int(time_diff.total_seconds()) if total_seconds < 0: return "N/A" days = total_seconds // 86400 hours = (total_seconds % 86400) // 3600 minutes = (total_seconds % 3600) // 60 if days > 0: return f"{days}d {hours}h {minutes}m" elif hours > 0: return f"{hours}h {minutes}m" else: return f"{minutes}m" def _update_loop(self): """Background thread loop for updating market status""" while not self._stop_event.is_set(): try: new_statuses = {} market_key = 'IDX_STOCKS' new_statuses[market_key] = self._get_current_market_status(market_key) with self._lock: self._statuses = new_statuses time.sleep(self.update_interval) except Exception as e: print(f"Error updating market status: {str(e)}") time.sleep(60) def _start_update_thread(self): """Start the background update thread""" if self._update_thread is None or not self._update_thread.is_alive(): self._stop_event.clear() self._update_thread = threading.Thread(target=self._update_loop, daemon=True) self._update_thread.start() def get_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus: """Get current market status (thread-safe)""" with self._lock: if market_key not in self._statuses: self._statuses[market_key] = self._get_current_market_status(market_key) return self._statuses[market_key] def stop(self): """Stop the update thread""" self._stop_event.set() if self._update_thread and self._update_thread.is_alive(): self._update_thread.join(timeout=5) # --- LAZY INITIALIZATION FIX --- _market_status_manager_instance = None def get_market_manager(): """Returns the singleton MarketStatusManager instance, initializing it if necessary.""" global _market_status_manager_instance if _market_status_manager_instance is None: _market_status_manager_instance = MarketStatusManager() return _market_status_manager_instance # Hapus baris lama: market_status_manager = MarketStatusManager() # --- LAZY INITIALIZATION FIX --- # --- Utility Functions (Modified) --- def retry_yfinance_request(func, max_retries=3, initial_delay=1): # ... (SAMA) ... for attempt in range(max_retries): try: result = func() if result is None or (isinstance(result, pd.DataFrame) and result.empty): if attempt == max_retries - 1: print(f"Function returned empty/None after {max_retries} attempts") return None return result except Exception as e: if attempt == max_retries - 1: print(f"Final attempt failed after {max_retries} retries: {str(e)}") return None delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1)) print(f"Error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") time.sleep(delay) return None def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series: # ... (SAMA) ... prices = prices.ffill().bfill() delta = prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss return 100 - (100 / (1 + rs)) def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series]: # ... (SAMA) ... prices = prices.ffill().bfill() exp1 = prices.ewm(span=fast, adjust=False).mean() exp2 = prices.ewm(span=slow, adjust=False).mean() macd = exp1 - exp2 signal_line = macd.ewm(span=signal, adjust=False).mean() return macd, signal_line def calculate_bollinger_bands(prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series, pd.Series]: # ... (SAMA) ... prices = prices.ffill().bfill() middle_band = prices.rolling(window=period).mean() std = prices.rolling(window=period).std() upper_band = middle_band + (std * std_dev) lower_band = middle_band - (std * std_dev) return upper_band, middle_band, lower_band def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: # ... (SAMA) ... try: # Calculate moving averages df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean() df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean() df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() # Calculate RSI df['RSI'] = calculate_rsi(df['Close']) # Calculate MACD df['MACD'], df['MACD_Signal'] = calculate_macd(df['Close']) # Calculate Bollinger Bands df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close']) # Calculate returns and volatility df['Returns'] = df['Close'].pct_change() df['Volatility'] = df['Returns'].rolling(window=20).std() # Fill any remaining NaN values df = df.fillna(method='ffill').fillna(method='bfill') return df except Exception as e: print(f"Error calculating technical indicators: {str(e)}") return df # --- Existing Utility Functions (Modified) --- # Hapus fungsi get_idx_stocks def fetch_stock_data(symbol, period_days): # ... (SAMA) ... try: end_date = datetime.now() start_date = end_date - timedelta(days=period_days + 30) ticker = yf.Ticker(symbol) def fetch_history(): return ticker.history(start=start_date, end=end_date, interval="1d") data = retry_yfinance_request(fetch_history) if data is None or data.empty: return None # Apply technical indicators and cleanup data = calculate_technical_indicators(data) # Filter for the exact number of days requested (trading days) data = data.tail(period_days) if data.empty: return None data.index = pd.to_datetime(data.index) return data except Exception as e: print(f"Error fetching data for {symbol}: {e}") return None def prepare_timesfm_data(stock_data, use_volume=False): # ... (SAMA) ... data = stock_data['Close'].copy() data = data.fillna(method='ffill').fillna(method='bfill') # Chronos needs 1D numpy array of raw data timeseries_data = data.values.flatten() # We no longer use a separate scaler return timeseries_data, None def create_forecast_plot(historical_data, forecast_dates, forecast_prices, symbol): # ... (SAMA) ... fig = make_subplots( rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05, subplot_titles=(f'{symbol} Stock Price Forecast', 'Volume'), row_width=[0.2, 0.7] ) # Historical closing prices fig.add_trace( go.Scatter( x=historical_data.index, y=historical_data['Close'], mode='lines', name='Historical Price', line=dict(color='blue', width=2) ), row=1, col=1 ) # Forecast prices fig.add_trace( go.Scatter( x=forecast_dates, y=forecast_prices, mode='lines', name='Forecast', line=dict(color='red', width=2, dash='dash') ), row=1, col=1 ) # Confidence interval (simple approach) std_dev = np.std(historical_data['Close'].values) * 0.1 upper_bound = forecast_prices + std_dev lower_bound = forecast_prices - std_dev fig.add_trace( go.Scatter( x=forecast_dates, y=upper_bound, mode='lines', line=dict(width=0), showlegend=False, hoverinfo='skip' ), row=1, col=1 ) fig.add_trace( go.Scatter( x=forecast_dates, y=lower_bound, mode='lines', line=dict(width=0), fill='tonexty', fillcolor='rgba(255,0,0,0.2)', name='Confidence Interval', hoverinfo='skip' ), row=1, col=1 ) # Volume fig.add_trace( go.Bar( x=historical_data.index, y=historical_data['Volume'], name='Volume', marker_color='lightblue', yaxis='y2' ), row=2, col=1 ) # Update layout fig.update_layout( title=f'{symbol} Stock Price Prediction', xaxis_title='Date', yaxis_title='Price (IDR)', yaxis2_title='Volume', hovermode='x unified', showlegend=True, height=600 ) fig.update_yaxes(title_text="Price (IDR)", row=1, col=1) fig.update_yaxes(title_text="Volume", row=2, col=1) fig.update_xaxes(title_text="Date", row=2, col=1) return fig def get_stock_info(symbol): # ... (SAMA) ... try: ticker = yf.Ticker(symbol) info = retry_yfinance_request(lambda: ticker.info) if info is None: return {} # Format market cap market_cap = info.get('marketCap', 0) if market_cap: if market_cap >= 1e12: market_cap_str = f"Rp {market_cap/1e12:.2f}T" elif market_cap >= 1e9: market_cap_str = f"Rp {market_cap/1e9:.2f}B" elif market_cap >= 1e6: market_cap_str = f"Rp {market_cap/1e6:.2f}M" else: market_cap_str = f"Rp {market_cap:,.0f}" info['marketCap'] = market_cap_str return info except Exception as e: print(f"Error getting stock info: {e}") return {}