Spaces:
Sleeping
Sleeping
File size: 16,437 Bytes
bf35729 d45c1d3 ed302b6 d45c1d3 96b3514 d45c1d3 ed302b6 d45c1d3 ed302b6 d45c1d3 ed302b6 d45c1d3 ed302b6 d45c1d3 ed302b6 d45c1d3 ed302b6 96b3514 bf35729 ed302b6 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 ed302b6 bf35729 ed302b6 bf35729 ed302b6 bf35729 d45c1d3 bf35729 d45c1d3 bf35729 fe0093d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
import yfinance as yf
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import pytz
import time
import random
import threading
from dataclasses import dataclass
# Import configurations
from config import IDX_STOCKS, IDX_MARKET_CONFIG
# --- Market Status Manager ---
@dataclass
class MarketStatus:
"""Data class to hold market status information"""
is_open: bool
status_text: str
next_trading_day: str
last_updated: str
time_until_open: str
time_until_close: str
current_time_et: str
market_name: str
market_type: str
market_symbol: str
# Global configuration for market status updates
MARKET_STATUS_UPDATE_INTERVAL_MINUTES = 10 # Update every 10 minutes
class MarketStatusManager:
"""Manages market status with periodic updates for the IDX market"""
def __init__(self):
self.update_interval = MARKET_STATUS_UPDATE_INTERVAL_MINUTES * 60 # Convert to seconds
self._statuses = {}
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._update_thread = None
self._start_update_thread()
def _get_current_market_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus:
"""Get current market status with detailed information for specific market"""
config = IDX_MARKET_CONFIG[market_key]
now = datetime.now(pytz.timezone('Asia/Jakarta')) # Use timezone-aware datetime for IDX
market_tz = pytz.timezone(config['timezone'])
market_time = now.astimezone(market_tz)
open_hour, open_minute = map(int, config['open_time'].split(':'))
close_hour, close_minute = map(int, config['close_time'].split(':'))
market_open_time = market_time.replace(hour=open_hour, minute=open_minute, second=0, microsecond=0)
market_close_time = market_time.replace(hour=close_hour, minute=close_minute, second=0, microsecond=0)
is_trading_day = market_time.weekday() in config['days']
if not is_trading_day:
is_open = False
status_text = f"{config['name']} is closed (Weekend)"
elif market_open_time <= market_time <= market_close_time:
is_open = True
status_text = f"{config['name']} is currently open"
else:
is_open = False
if market_time < market_open_time:
status_text = f"{config['name']} is closed (Before opening)"
else:
status_text = f"{config['name']} is closed (After closing)"
next_trading_day = self._get_next_trading_day(market_time, config['days'])
time_until_open = self._get_time_until_open(market_time, market_open_time, config)
time_until_close = self._get_time_until_close(market_time, market_close_time, config)
return MarketStatus(
is_open=is_open,
status_text=status_text,
next_trading_day=next_trading_day.strftime('%Y-%m-%d'),
last_updated=market_time.strftime('%Y-%m-%d %H:%M:%S %Z'),
time_until_open=time_until_open,
time_until_close=time_until_close,
current_time_et=market_time.strftime('%H:%M:%S %Z'),
market_name=config['name'],
market_type=config['type'],
market_symbol=config['symbol']
)
def _get_next_trading_day(self, current_time: datetime, trading_days: list) -> datetime:
"""Get the next trading day for a specific market"""
next_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
while next_day.weekday() not in trading_days:
next_day += timedelta(days=1)
return next_day
def _get_time_until_open(self, current_time: datetime, market_open_time: datetime, config: dict) -> str:
"""Calculate time until market opens"""
if current_time.weekday() not in config['days']:
days_until_next = 1
while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']:
days_until_next += 1
next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next)
next_open = next_trading_day.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
else:
if current_time < market_open_time:
time_diff = market_open_time - current_time
else:
days_until_next = 1
while (current_time + timedelta(days=days_until_next)).weekday() not in config['days']:
days_until_next += 1
next_trading_day = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=days_until_next)
next_open = next_trading_day.replace(
hour=int(config['open_time'].split(':')[0]),
minute=int(config['open_time'].split(':')[1]),
second=0, microsecond=0
)
time_diff = next_open - current_time
return self._format_time_delta(time_diff)
def _get_time_until_close(self, current_time: datetime, market_close_time: datetime, config: dict) -> str:
"""Calculate time until market closes"""
if current_time.weekday() not in config['days']:
return "N/A (Weekend)"
if current_time < market_close_time:
time_diff = market_close_time - current_time
return self._format_time_delta(time_diff)
else:
return "Market closed for today"
def _format_time_delta(self, time_diff: timedelta) -> str:
"""Format timedelta into human-readable string"""
total_seconds = int(time_diff.total_seconds())
if total_seconds < 0:
return "N/A"
days = total_seconds // 86400
hours = (total_seconds % 86400) // 3600
minutes = (total_seconds % 3600) // 60
if days > 0:
return f"{days}d {hours}h {minutes}m"
elif hours > 0:
return f"{hours}h {minutes}m"
else:
return f"{minutes}m"
def _update_loop(self):
"""Background thread loop for updating market status"""
while not self._stop_event.is_set():
try:
new_statuses = {}
market_key = 'IDX_STOCKS'
new_statuses[market_key] = self._get_current_market_status(market_key)
with self._lock:
self._statuses = new_statuses
time.sleep(self.update_interval)
except Exception as e:
print(f"Error updating market status: {str(e)}")
time.sleep(60)
def _start_update_thread(self):
"""Start the background update thread"""
if self._update_thread is None or not self._update_thread.is_alive():
self._stop_event.clear()
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def get_status(self, market_key: str = 'IDX_STOCKS') -> MarketStatus:
"""Get current market status (thread-safe)"""
with self._lock:
if market_key not in self._statuses:
self._statuses[market_key] = self._get_current_market_status(market_key)
return self._statuses[market_key]
def stop(self):
"""Stop the update thread"""
self._stop_event.set()
if self._update_thread and self._update_thread.is_alive():
self._update_thread.join(timeout=5)
# --- LAZY INITIALIZATION FIX ---
_market_status_manager_instance = None
def get_market_manager():
"""Returns the singleton MarketStatusManager instance, initializing it if necessary."""
global _market_status_manager_instance
if _market_status_manager_instance is None:
_market_status_manager_instance = MarketStatusManager()
return _market_status_manager_instance
# Hapus baris lama: market_status_manager = MarketStatusManager()
# --- LAZY INITIALIZATION FIX ---
# --- Utility Functions (Modified) ---
def retry_yfinance_request(func, max_retries=3, initial_delay=1):
# ... (SAMA) ...
for attempt in range(max_retries):
try:
result = func()
if result is None or (isinstance(result, pd.DataFrame) and result.empty):
if attempt == max_retries - 1:
print(f"Function returned empty/None after {max_retries} attempts")
return None
return result
except Exception as e:
if attempt == max_retries - 1:
print(f"Final attempt failed after {max_retries} retries: {str(e)}")
return None
delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1))
print(f"Error (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {str(e)}")
time.sleep(delay)
return None
def calculate_rsi(prices: pd.Series, period: int = 14) -> pd.Series:
# ... (SAMA) ...
prices = prices.ffill().bfill()
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series]:
# ... (SAMA) ...
prices = prices.ffill().bfill()
exp1 = prices.ewm(span=fast, adjust=False).mean()
exp2 = prices.ewm(span=slow, adjust=False).mean()
macd = exp1 - exp2
signal_line = macd.ewm(span=signal, adjust=False).mean()
return macd, signal_line
def calculate_bollinger_bands(prices: pd.Series, period: int = 20, std_dev: int = 2) -> Tuple[pd.Series, pd.Series, pd.Series]:
# ... (SAMA) ...
prices = prices.ffill().bfill()
middle_band = prices.rolling(window=period).mean()
std = prices.rolling(window=period).std()
upper_band = middle_band + (std * std_dev)
lower_band = middle_band - (std * std_dev)
return upper_band, middle_band, lower_band
def calculate_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
# ... (SAMA) ...
try:
# Calculate moving averages
df['SMA_20'] = df['Close'].rolling(window=20, min_periods=1).mean()
df['SMA_50'] = df['Close'].rolling(window=50, min_periods=1).mean()
df['SMA_200'] = df['Close'].rolling(window=200, min_periods=1).mean()
# Calculate RSI
df['RSI'] = calculate_rsi(df['Close'])
# Calculate MACD
df['MACD'], df['MACD_Signal'] = calculate_macd(df['Close'])
# Calculate Bollinger Bands
df['BB_Upper'], df['BB_Middle'], df['BB_Lower'] = calculate_bollinger_bands(df['Close'])
# Calculate returns and volatility
df['Returns'] = df['Close'].pct_change()
df['Volatility'] = df['Returns'].rolling(window=20).std()
# Fill any remaining NaN values
df = df.fillna(method='ffill').fillna(method='bfill')
return df
except Exception as e:
print(f"Error calculating technical indicators: {str(e)}")
return df
# --- Existing Utility Functions (Modified) ---
# Hapus fungsi get_idx_stocks
def fetch_stock_data(symbol, period_days):
# ... (SAMA) ...
try:
end_date = datetime.now()
start_date = end_date - timedelta(days=period_days + 30)
ticker = yf.Ticker(symbol)
def fetch_history():
return ticker.history(start=start_date, end=end_date, interval="1d")
data = retry_yfinance_request(fetch_history)
if data is None or data.empty:
return None
# Apply technical indicators and cleanup
data = calculate_technical_indicators(data)
# Filter for the exact number of days requested (trading days)
data = data.tail(period_days)
if data.empty:
return None
data.index = pd.to_datetime(data.index)
return data
except Exception as e:
print(f"Error fetching data for {symbol}: {e}")
return None
def prepare_timesfm_data(stock_data, use_volume=False):
# ... (SAMA) ...
data = stock_data['Close'].copy()
data = data.fillna(method='ffill').fillna(method='bfill')
# Chronos needs 1D numpy array of raw data
timeseries_data = data.values.flatten()
# We no longer use a separate scaler
return timeseries_data, None
def create_forecast_plot(historical_data, forecast_dates, forecast_prices, symbol):
# ... (SAMA) ...
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.05,
subplot_titles=(f'{symbol} Stock Price Forecast', 'Volume'),
row_width=[0.2, 0.7]
)
# Historical closing prices
fig.add_trace(
go.Scatter(
x=historical_data.index,
y=historical_data['Close'],
mode='lines',
name='Historical Price',
line=dict(color='blue', width=2)
),
row=1, col=1
)
# Forecast prices
fig.add_trace(
go.Scatter(
x=forecast_dates,
y=forecast_prices,
mode='lines',
name='Forecast',
line=dict(color='red', width=2, dash='dash')
),
row=1, col=1
)
# Confidence interval (simple approach)
std_dev = np.std(historical_data['Close'].values) * 0.1
upper_bound = forecast_prices + std_dev
lower_bound = forecast_prices - std_dev
fig.add_trace(
go.Scatter(
x=forecast_dates,
y=upper_bound,
mode='lines',
line=dict(width=0),
showlegend=False,
hoverinfo='skip'
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=forecast_dates,
y=lower_bound,
mode='lines',
line=dict(width=0),
fill='tonexty',
fillcolor='rgba(255,0,0,0.2)',
name='Confidence Interval',
hoverinfo='skip'
),
row=1, col=1
)
# Volume
fig.add_trace(
go.Bar(
x=historical_data.index,
y=historical_data['Volume'],
name='Volume',
marker_color='lightblue',
yaxis='y2'
),
row=2, col=1
)
# Update layout
fig.update_layout(
title=f'{symbol} Stock Price Prediction',
xaxis_title='Date',
yaxis_title='Price (IDR)',
yaxis2_title='Volume',
hovermode='x unified',
showlegend=True,
height=600
)
fig.update_yaxes(title_text="Price (IDR)", row=1, col=1)
fig.update_yaxes(title_text="Volume", row=2, col=1)
fig.update_xaxes(title_text="Date", row=2, col=1)
return fig
def get_stock_info(symbol):
# ... (SAMA) ...
try:
ticker = yf.Ticker(symbol)
info = retry_yfinance_request(lambda: ticker.info)
if info is None:
return {}
# Format market cap
market_cap = info.get('marketCap', 0)
if market_cap:
if market_cap >= 1e12:
market_cap_str = f"Rp {market_cap/1e12:.2f}T"
elif market_cap >= 1e9:
market_cap_str = f"Rp {market_cap/1e9:.2f}B"
elif market_cap >= 1e6:
market_cap_str = f"Rp {market_cap/1e6:.2f}M"
else:
market_cap_str = f"Rp {market_cap:,.0f}"
info['marketCap'] = market_cap_str
return info
except Exception as e:
print(f"Error getting stock info: {e}")
return {} |