Spaces:
Sleeping
Sleeping
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Tuple | |
| import pytz | |
| import time | |
| import random | |
| import threading | |
| from dataclasses import dataclass | |
| # Import configurations | |
| from config import IDX_STOCKS, IDX_MARKET_CONFIG | |
| # --- Market Status Manager --- | |
| class MarketStatus: | |
| """Data class to hold market status information""" | |
| is_open: bool | |
| status_text: str | |
| next_trading_day: str | |
| last_updated: str | |
| time_until_open: str | |
| time_until_close: str | |
| current_time_et: str | |
| market_name: str | |
| market_type: str | |
| market_symbol: str | |
| # Global configuration for market status updates | |
| MARKET_STATUS_UPDATE_INTERVAL_MINUTES = 10 # Update every 10 minutes | |
| class MarketStatusManager: | |
| """Manages market status with periodic updates for the IDX market""" | |
| def __init__(self): | |
| self.update_interval = MARKET_STATUS_UPDATE_INTERVAL_MINUTES * 60 # Convert to seconds | |
| self._statuses = {} | |
| self._lock = threading.Lock() | |
| self._stop_event = threading.Event() | |
| self._update_thread = None | |
| self._start_update_thread() | |
| def _get_current_market_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus: | |
| """Get current market status with detailed information for specific market""" | |
| config = IDX_MARKET_CONFIG[market_key] | |
| now = datetime.now(pytz.timezone('Asia/Jakarta')) # Use timezone-aware datetime for IDX | |
| market_tz = pytz.timezone(config['timezone']) | |
| market_time = now.astimezone(market_tz) | |
| open_hour, open_minute = map(int, config['open_time'].split(':')) | |
| close_hour, close_minute = map(int, config['close_time'].split(':')) | |
| market_open_time = market_time.replace(hour=open_hour, minute=open_minute, second=0, microsecond=0) | |
| market_close_time = market_time.replace(hour=close_hour, minute=close_minute, second=0, microsecond=0) | |
| is_trading_day = market_time.weekday() in config['days'] | |
| if not is_trading_day: | |
| is_open = False | |
| status_text = f"{config['name']} is closed (Weekend)" | |
| elif market_open_time <= market_time <= market_close_time: | |
| is_open = True | |
| status_text = f"{config['name']} is currently open" | |
| else: | |
| is_open = False | |
| if market_time < market_open_time: | |
| status_text = f"{config['name']} is closed (Before opening)" | |
| else: | |
| status_text = f"{config['name']} is closed (After closing)" | |
| next_trading_day = self._get_next_trading_day(market_time, config['days']) | |
| time_until_open = self._get_time_until_open(market_time, market_open_time, config) | |
| time_until_close = self._get_time_until_close(market_time, market_close_time, config) | |
| return MarketStatus( | |
| is_open=is_open, | |
| status_text=status_text, | |
| next_trading_day=next_trading_day.strftime('%Y-%m-%d'), | |
| last_updated=market_time.strftime('%Y-%m-%d %H:%M:%S %Z'), | |
| time_until_open=time_until_open, | |
| time_until_close=time_until_close, | |
| current_time_et=market_time.strftime('%H:%M:%S %Z'), | |
| market_name=config['name'], | |
| market_type=config['type'], | |
| market_symbol=config['symbol'] | |
| ) | |
| def _get_next_trading_day(self, current_time: datetime, trading_days: list) -> datetime: | |
| """Get the next trading day for a specific market""" | |
| next_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
| while next_day.weekday() not in trading_days: | |
| next_day += timedelta(days=1) | |
| return next_day | |
| def _get_time_until_open(self, current_time: datetime, market_open_time: datetime, config: dict) -> str: | |
| """Calculate time until market opens""" | |
| if current_time.weekday() not in config['days']: | |
| days_until_next = 1 | |
| while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']: | |
| days_until_next += 1 | |
| next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next) | |
| next_open = next_trading_day.replace( | |
| hour=int(config['open_time'].split(':')[0]), | |
| minute=int(config['open_time'].split(':')[1]), | |
| second=0, microsecond=0 | |
| ) | |
| time_diff = next_open - current_time | |
| else: | |
| if current_time < market_open_time: | |
| time_diff = market_open_time - current_time | |
| else: | |
| days_until_next = 1 | |
| while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']: | |
| days_until_next += 1 | |
| next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next) | |
| next_open = next_trading_day.replace( | |
| hour=int(config['open_time'].split(':')[0]), | |
| minute=int(config['open_time'].split(':')[1]), | |
| second=0, microsecond=0 | |
| ) | |
| time_diff = next_open - current_time | |
| return self._format_time_delta(time_diff) | |
| def _get_time_until_close(self, current_time: datetime, market_close_time: datetime, config: dict) -> str: | |
| """Calculate time until market closes""" | |
| if current_time.weekday() not in config['days']: | |
| return "N/A (Weekend)" | |
| if current_time < market_close_time: | |
| time_diff = market_close_time - current_time | |
| return self._format_time_delta(time_diff) | |
| else: | |
| return "Market closed for today" | |
| def _format_time_delta(self, time_diff: timedelta) -> str: | |
| """Format timedelta into human-readable string""" | |
| total_seconds = int(time_diff.total_seconds()) | |
| if total_seconds < 0: | |
| return "N/A" | |
| days = total_seconds // 86400 | |
| hours = (total_seconds % 86400) // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| if days > 0: | |
| return f"{days}d {hours}h {minutes}m" | |
| elif hours > 0: | |
| return f"{hours}h {minutes}m" | |
| else: | |
| return f"{minutes}m" | |
| def _update_loop(self): | |
| """Background thread loop for updating market status""" | |
| while not self._stop_event.is_set(): | |
| try: | |
| new_statuses = {} | |
| market_key = 'IDX_STOCKS' | |
| new_statuses[market_key] = self._get_current_market_status(market_key) | |
| with self._lock: | |
| self._statuses = new_statuses | |
| time.sleep(self.update_interval) | |
| except Exception as e: | |
| print(f"Error updating market status: {str(e)}") | |
| time.sleep(60) | |
| def _start_update_thread(self): | |
| """Start the background update thread""" | |
| if self._update_thread is None or not self._update_thread.is_alive(): | |
| self._stop_event.clear() | |
| self._update_thread = threading.Thread(target=self._update_loop, daemon=True) | |
| self._update_thread.start() | |
| def get_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus: | |
| """Get current market status (thread-safe)""" | |
| with self._lock: | |
| if market_key not in self._statuses: | |
| self._statuses[market_key] = self._get_current_market_status(market_key) | |
| return self._statuses[market_key] | |
| def stop(self): | |
| """Stop the update thread""" | |
| self._stop_event.set() | |
| if self._update_thread and self._update_thread.is_alive(): | |
| self._update_thread.join(timeout=5) | |
| # --- LAZY INITIALIZATION FIX --- | |
| _market_status_manager_instance = None | |
| def get_market_manager(): | |
| """Returns the singleton MarketStatusManager instance, initializing it if necessary.""" | |
| global _market_status_manager_instance | |
| if _market_status_manager_instance is None: | |
| _market_status_manager_instance = MarketStatusManager() | |
| return _market_status_manager_instance | |
| # Hapus baris lama: market_status_manager = MarketStatusManager() | |
| # --- LAZY INITIALIZATION FIX --- | |
| # --- Utility Functions (Modified) --- | |
| def retry_yfinance_request(func, max_retries=3, initial_delay=1): | |
| # ... (SAMA) ... | |
| for attempt in range(max_retries): | |
| try: | |
| result = func() | |
| if result is None or (isinstance(result, pd.DataFrame) and result.empty): | |
| if attempt == max_retries - 1: | |
| print(f"Function returned empty/None after {max_retries} attempts") | |
| return None | |
| return result | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| print(f"Final attempt failed after {max_retries} retries: {str(e)}") | |
| return None | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1)) | |
| print(f"Error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}") | |
| time.sleep(delay) | |
| return None | |
| def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series: | |
| # ... (SAMA) ... | |
| prices = prices.ffill().bfill() | |
| delta = prices.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| return 100 - (100 / (1 + rs)) | |
| def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series]: | |
| # ... (SAMA) ... | |
| prices = prices.ffill().bfill() | |
| exp1 = prices.ewm(span=fast, adjust=False).mean() | |
| exp2 = prices.ewm(span=slow, adjust=False).mean() | |
| macd = exp1 - exp2 | |
| signal_line = macd.ewm(span=signal, adjust=False).mean() | |
| return macd, signal_line | |
| def calculate_bollinger_bands(prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| # ... (SAMA) ... | |
| prices = prices.ffill().bfill() | |
| middle_band = prices.rolling(window=period).mean() | |
| std = prices.rolling(window=period).std() | |
| upper_band = middle_band + (std * std_dev) | |
| lower_band = middle_band - (std * std_dev) | |
| return upper_band, middle_band, lower_band | |
| def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| # ... (SAMA) ... | |
| try: | |
| # Calculate moving averages | |
| df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean() | |
| df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean() | |
| df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean() | |
| # Calculate RSI | |
| df['RSI'] = calculate_rsi(df['Close']) | |
| # Calculate MACD | |
| df['MACD'], df['MACD_Signal'] = calculate_macd(df['Close']) | |
| # Calculate Bollinger Bands | |
| df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close']) | |
| # Calculate returns and volatility | |
| df['Returns'] = df['Close'].pct_change() | |
| df['Volatility'] = df['Returns'].rolling(window=20).std() | |
| # Fill any remaining NaN values | |
| df = df.fillna(method='ffill').fillna(method='bfill') | |
| return df | |
| except Exception as e: | |
| print(f"Error calculating technical indicators: {str(e)}") | |
| return df | |
| # --- Existing Utility Functions (Modified) --- | |
| # Hapus fungsi get_idx_stocks | |
| def fetch_stock_data(symbol, period_days): | |
| # ... (SAMA) ... | |
| try: | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=period_days + 30) | |
| ticker = yf.Ticker(symbol) | |
| def fetch_history(): | |
| return ticker.history(start=start_date, end=end_date, interval="1d") | |
| data = retry_yfinance_request(fetch_history) | |
| if data is None or data.empty: | |
| return None | |
| # Apply technical indicators and cleanup | |
| data = calculate_technical_indicators(data) | |
| # Filter for the exact number of days requested (trading days) | |
| data = data.tail(period_days) | |
| if data.empty: | |
| return None | |
| data.index = pd.to_datetime(data.index) | |
| return data | |
| except Exception as e: | |
| print(f"Error fetching data for {symbol}: {e}") | |
| return None | |
| def prepare_timesfm_data(stock_data, use_volume=False): | |
| # ... (SAMA) ... | |
| data = stock_data['Close'].copy() | |
| data = data.fillna(method='ffill').fillna(method='bfill') | |
| # Chronos needs 1D numpy array of raw data | |
| timeseries_data = data.values.flatten() | |
| # We no longer use a separate scaler | |
| return timeseries_data, None | |
| def create_forecast_plot(historical_data, forecast_dates, forecast_prices, symbol): | |
| # ... (SAMA) ... | |
| fig = make_subplots( | |
| rows=2, cols=1, | |
| shared_xaxes=True, | |
| vertical_spacing=0.05, | |
| subplot_titles=(f'{symbol} Stock Price Forecast', 'Volume'), | |
| row_width=[0.2, 0.7] | |
| ) | |
| # Historical closing prices | |
| fig.add_trace( | |
| go.Scatter( | |
| x=historical_data.index, | |
| y=historical_data['Close'], | |
| mode='lines', | |
| name='Historical Price', | |
| line=dict(color='blue', width=2) | |
| ), | |
| row=1, col=1 | |
| ) | |
| # Forecast prices | |
| fig.add_trace( | |
| go.Scatter( | |
| x=forecast_dates, | |
| y=forecast_prices, | |
| mode='lines', | |
| name='Forecast', | |
| line=dict(color='red', width=2, dash='dash') | |
| ), | |
| row=1, col=1 | |
| ) | |
| # Confidence interval (simple approach) | |
| std_dev = np.std(historical_data['Close'].values) * 0.1 | |
| upper_bound = forecast_prices + std_dev | |
| lower_bound = forecast_prices - std_dev | |
| fig.add_trace( | |
| go.Scatter( | |
| x=forecast_dates, | |
| y=upper_bound, | |
| mode='lines', | |
| line=dict(width=0), | |
| showlegend=False, | |
| hoverinfo='skip' | |
| ), | |
| row=1, col=1 | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=forecast_dates, | |
| y=lower_bound, | |
| mode='lines', | |
| line=dict(width=0), | |
| fill='tonexty', | |
| fillcolor='rgba(255,0,0,0.2)', | |
| name='Confidence Interval', | |
| hoverinfo='skip' | |
| ), | |
| row=1, col=1 | |
| ) | |
| # Volume | |
| fig.add_trace( | |
| go.Bar( | |
| x=historical_data.index, | |
| y=historical_data['Volume'], | |
| name='Volume', | |
| marker_color='lightblue', | |
| yaxis='y2' | |
| ), | |
| row=2, col=1 | |
| ) | |
| # Update layout | |
| fig.update_layout( | |
| title=f'{symbol} Stock Price Prediction', | |
| xaxis_title='Date', | |
| yaxis_title='Price (IDR)', | |
| yaxis2_title='Volume', | |
| hovermode='x unified', | |
| showlegend=True, | |
| height=600 | |
| ) | |
| fig.update_yaxes(title_text="Price (IDR)", row=1, col=1) | |
| fig.update_yaxes(title_text="Volume", row=2, col=1) | |
| fig.update_xaxes(title_text="Date", row=2, col=1) | |
| return fig | |
| def get_stock_info(symbol): | |
| # ... (SAMA) ... | |
| try: | |
| ticker = yf.Ticker(symbol) | |
| info = retry_yfinance_request(lambda: ticker.info) | |
| if info is None: | |
| return {} | |
| # Format market cap | |
| market_cap = info.get('marketCap', 0) | |
| if market_cap: | |
| if market_cap >= 1e12: | |
| market_cap_str = f"Rp {market_cap/1e12:.2f}T" | |
| elif market_cap >= 1e9: | |
| market_cap_str = f"Rp {market_cap/1e9:.2f}B" | |
| elif market_cap >= 1e6: | |
| market_cap_str = f"Rp {market_cap/1e6:.2f}M" | |
| else: | |
| market_cap_str = f"Rp {market_cap:,.0f}" | |
| info['marketCap'] = market_cap_str | |
| return info | |
| except Exception as e: | |
| print(f"Error getting stock info: {e}") | |
| return {} |