Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
app_series_others.py | |
-------------------------------------------------------------------- | |
「依樣畫葫蘆」版:把 Qlib 內建的 6 個“其他”模型(gats、sfm、 | |
tabnet、add、igmtf、hist)拉進同一支腳本,比照 | |
`app_series_Attention.py` 的流程: | |
1. 下載歷史股價 (yfinance) | |
2. 準備滑動視窗資料 | |
3. 逐一訓練並預測未來 N 天收盤價 | |
4. 畫圖 + 儲存 PNG / CSV | |
-------------------------------------------------------------------- | |
**重點差異** | |
* 這些模型各自的 `fit / predict` 介面不完全相同;為了簡化, | |
這裡用 `importlib` 動態載入,再嘗試: | |
a. 若類別有 `.fit()`,就呼叫它(DatasetH 版) | |
b. 否則 fallback 到手動 train loop(跟 attention 版相同) | |
* 若模型屬於「跨樣本」類(TabNet / SFM / ADD / HIST), | |
`prepare_data_tabular()` 會把時序資料展平成單筆特徵; | |
其餘(GATS / IGMTF)依然使用滑窗方式。 | |
-------------------------------------------------------------------- | |
**使用方法** | |
$ python3 app_series_others.py \\ | |
--ticker TSLA \\ | |
--days 7 # 預測天數 \\ | |
--period 1y # 訓練資料期間(yfinance 標準字串)\\ | |
--cutoff 2025-03-20 # 訓練到哪一天 \\ | |
--compare real # 是否畫出真實線(real / none) | |
# 例:用一年資料訓練,預測 7 天 | |
python3 app_series_others.py --ticker TSLA --period 1y --days 7 \ | |
--cutoff 2025-03-20 --compare real | |
""" | |
import argparse, importlib, os, sys, warnings, math | |
from pathlib import Path | |
from datetime import datetime, timedelta | |
import yfinance as yf | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import torch | |
import torch.nn as nn | |
from torch.utils.data import TensorDataset, DataLoader | |
# --------------------- 參數 --------------------- | |
def get_args(): | |
p = argparse.ArgumentParser() | |
p.add_argument('--ticker', type=str, default='TSLA') | |
p.add_argument('--period', type=str, default='3mo', | |
help='yfinance period, e.g. 6mo / 1y / 5y / max') | |
p.add_argument('--days', type=int, default=7, | |
help='forecast horizon') | |
p.add_argument('--cutoff', type=str, default=None, | |
help='yyyy-mm-dd ; 若為 None 取資料最後一天') | |
p.add_argument('--compare', type=str, default='real', | |
choices=['real', 'none']) | |
p.add_argument('--window', type=int, default=10, | |
help='滑動視窗長度 (僅對 TS 類模型適用)') | |
p.add_argument('--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') | |
return p.parse_args() | |
# --------------------- 下載 + 前處理 --------------------- | |
def fetch_data(ticker, period, cutoff): | |
df = yf.download(ticker, period=period, auto_adjust=True) | |
if cutoff is not None: | |
df = df[df.index <= cutoff] | |
return df | |
def prepare_data_ts(df, window): | |
# 把 OHLCV 5 欄做成 (N, window, 5) tensor | |
feat_cols = ['Open', 'High', 'Low', 'Close', 'Volume'] | |
data = df[feat_cols].values | |
X, y = [], [] | |
for i in range(len(data) - window - 1): | |
X.append(data[i:i+window]) | |
y.append(data[i+window, 3]) # 下一天收盤價 | |
X = torch.tensor(np.array(X), dtype=torch.float32) | |
y = torch.tensor(np.array(y), dtype=torch.float32) | |
return X, y | |
def prepare_data_tabular(df, lookback=30): | |
""" | |
把最近 lookback 天的統計特徵展平成單筆向量, | |
給 TabNet / SFM / ADD / HIST 這類“橫斷面”模型。 | |
""" | |
feat_cols = ['Open', 'High', 'Low', 'Close', 'Volume'] | |
feats, labels = [], [] | |
for i in range(lookback, len(df)-1): | |
window = df.iloc[i-lookback:i] | |
f = [] | |
for col in feat_cols: | |
series = window[col] | |
f += [series.mean(), series.std(), series.min(), series.max(), | |
series.iloc[-1] - series.iloc[0]] # delta | |
feats.append(f) | |
labels.append(df.iloc[i, 3]) # 當天 close | |
X = torch.tensor(np.array(feats), dtype=torch.float32) | |
y = torch.tensor(np.array(labels), dtype=torch.float32) | |
return X, y | |
# --------------------- 動態載入模型 --------------------- | |
MODEL_SPECS = [ | |
# (module_path, [candidate class names]) | |
('qlib.contrib.model.pytorch_gats', ['GATSModel', 'GATS']), | |
('qlib.contrib.model.pytorch_sfm', ['SFMModel', 'SFM']), | |
('qlib.contrib.model.pytorch_tabnet',['TabNet']), | |
('qlib.contrib.model.pytorch_add', ['ADDModel']), | |
('qlib.contrib.model.pytorch_igmtf', ['IGMTF']), | |
('qlib.contrib.model.pytorch_hist', ['HIST']) | |
] | |
def load_model(module_path, class_list): | |
try: | |
module = importlib.import_module(module_path) | |
for cls in class_list: | |
if hasattr(module, cls): | |
return getattr(module, cls) | |
warnings.warn(f'{module_path} 裏找不到 {class_list}') | |
return None | |
except ImportError as e: | |
warnings.warn(f'無法 import {module_path}:{e}') | |
return None | |
# --------------------- 通用 train/predict --------------------- | |
def train_predict_ts(model_cls, X, y, X_last, device='cpu', epochs=200, lr=1e-3): | |
model = model_cls(d_feat=X.shape[2], output_dim=1) | |
net = model.model if hasattr(model, 'model') else model | |
net.to(device) | |
ds = TensorDataset(X.to(device), y.to(device)) | |
dl = DataLoader(ds, batch_size=32, shuffle=True) | |
opt = torch.optim.Adam(net.parameters(), lr=lr) | |
loss_fn = nn.MSELoss() | |
net.train() | |
for _ in range(epochs): | |
for xb, yb in dl: | |
opt.zero_grad() | |
pred = net(xb).squeeze() | |
loss = loss_fn(pred, yb) | |
loss.backward() | |
opt.step() | |
net.eval() | |
with torch.no_grad(): | |
pred_future = net(X_last.to(device)).squeeze().item() | |
return pred_future | |
def train_predict_tab(model_cls, X, y, X_last, device='cpu', | |
epochs=200, lr=1e-3): | |
import inspect | |
sig = inspect.signature(model_cls.__init__) | |
kw = {} | |
if 'd_feat' in sig.parameters: kw['d_feat'] = X.shape[1] | |
if 'feature_dim' in sig.parameters: kw['feature_dim'] = X.shape[1] | |
if 'input_dim' in sig.parameters: kw['input_dim'] = X.shape[1] | |
if 'field_dim' in sig.parameters: kw['field_dim'] = X.shape[1] | |
if 'embed_dim' in sig.parameters: kw['embed_dim'] = 16 | |
if 'output_dim' in sig.parameters: kw['output_dim'] = 1 | |
if 'target_dim' in sig.parameters: kw['target_dim'] = 1 | |
model = model_cls(**kw) | |
# ---- 嘗試官方 fit/predict,失敗就 fallback ---- | |
if hasattr(model, 'fit') and hasattr(model, 'predict'): | |
try: | |
model.fit(X.numpy(), y.numpy()) | |
return float(model.predict(X_last.numpy()).item()) | |
except Exception as e: | |
print(f'⚠️ {model_cls.__name__}.fit() 失敗,改用手動訓練 loop:{e}') | |
# ----------- 手動 train loop --------------- | |
net = model.model if hasattr(model, 'model') else model | |
net.to(device) | |
ds = TensorDataset(X.to(device), y.to(device)) | |
dl = DataLoader(ds, batch_size=32, shuffle=True) | |
opt = torch.optim.Adam(net.parameters(), lr=lr) | |
loss_fn = nn.MSELoss() | |
net.train() | |
for _ in range(epochs): | |
for xb, yb in dl: | |
opt.zero_grad() | |
loss_fn(net(xb).squeeze(), yb).backward() | |
opt.step() | |
net.eval() | |
with torch.no_grad(): | |
return net(X_last.to(device)).squeeze().item() | |
return pred_future | |
def forecast_others(ticker, forecast_days=7, period="1y", cutoff=None, compare_real=False): | |
df = fetch_data(ticker, period, cutoff) | |
if df is None or df.empty: | |
raise ValueError('❌ 無資料!檢查 ticker / 期間設定') | |
# 分割訓練與真實資料 | |
if compare_real: | |
real_future = df['Close'].iloc[-forecast_days:] | |
df_hist = df.iloc[:-forecast_days] | |
else: | |
real_future = None | |
df_hist = df | |
if df_hist is None or df_hist.empty or len(df_hist) < 11: | |
raise ValueError(f"{ticker} 訓練資料不足,無法進行預測。") | |
X_ts, y_ts = prepare_data_ts(df_hist, window=10) | |
X_last_ts = torch.tensor(df_hist[['Open', 'High', 'Low', 'Close', 'Volume']].values[-10:], | |
dtype=torch.float32).unsqueeze(0) | |
X_tab, y_tab = prepare_data_tabular(df_hist, lookback=30) | |
last_feats = prepare_data_tabular(df_hist, lookback=30)[0][-1].unsqueeze(0) | |
if X_ts.size(0) == 0 or y_ts.size(0) == 0 or X_tab.size(0) == 0 or y_tab.size(0) == 0: | |
raise ValueError(f"{ticker} 訓練資料切片後無有效樣本,請嘗試更長的 period 或不同的 cutoff。") | |
predictions = {} | |
for mod_path, cls_list in MODEL_SPECS: | |
ModelClass = load_model(mod_path, cls_list) | |
if ModelClass is None: | |
continue | |
model_name = ModelClass.__name__ | |
print(f'🔍 Training {model_name} ...') | |
try: | |
if any(tag in mod_path for tag in ['gats', 'igmtf']): | |
pred = train_predict_ts(ModelClass, X_ts, y_ts, X_last_ts) | |
else: | |
pred = train_predict_tab(ModelClass, X_tab, y_tab, last_feats) | |
except Exception as e: | |
print(f'⚠️ 跳過 {model_name}:{e}') | |
continue | |
predictions[model_name.upper()] = pred | |
# 畫圖與表格 | |
future_dates = pd.date_range(df_hist.index[-1] + timedelta(days=1), periods=forecast_days, freq='B') | |
df_out = pd.DataFrame(index=future_dates) | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
for name, value in predictions.items(): | |
df_out[name] = [value] * forecast_days | |
ax.plot(future_dates, [value] * forecast_days, label=name) | |
if compare_real and real_future is not None: | |
df_out["Real"] = real_future.values | |
ax.plot(real_future.index, real_future.values, 'k--', label='Real') | |
ax.set_title(f"{ticker} Forecast Comparison (Other Models)") | |
ax.set_xlabel('Date') | |
ax.set_ylabel('Close Price') | |
ax.legend() | |
ax.grid(True) | |
fig.autofmt_xdate() | |
return fig, df_out | |
# --------------------- 主流程 --------------------- | |
def main(): | |
args = get_args() | |
df = fetch_data(args.ticker, args.period, args.cutoff) | |
if df.empty: | |
print('❌ 無資料!檢查 ticker / 期間設定') | |
sys.exit(1) | |
# 真實未來 close(for compare) | |
if args.compare == 'real': | |
real_future = df['Close'].iloc[-args.days:] | |
df_hist = df.iloc[:-args.days] | |
else: | |
real_future = None | |
df_hist = df | |
# 先準備 time‑series 與 tabular 兩份資料 | |
X_ts, y_ts = prepare_data_ts(df_hist, args.window) | |
X_last_ts = torch.tensor(df_hist[['Open','High','Low','Close','Volume']].values[-args.window:], | |
dtype=torch.float32).unsqueeze(0) | |
X_tab, y_tab = prepare_data_tabular(df_hist, lookback=30) | |
last_feats = prepare_data_tabular(df_hist, lookback=30)[0][-1].unsqueeze(0) | |
predictions = {} | |
for mod_path, cls_list in MODEL_SPECS: # ① 這裡拿到 cls_list | |
ModelClass = load_model(mod_path, cls_list) # ② 傳入 cls_list | |
if ModelClass is None: | |
continue | |
model_name = ModelClass.__name__ # ③ 用真正載到的類名顯示 | |
print(f'🔍 Training {model_name} ...') | |
try: | |
# GATS、IGMTF 走 time‑series,其他走 tabular | |
if any(tag in mod_path for tag in ['gats', 'igmtf']): | |
pred = train_predict_ts( | |
ModelClass, X_ts, y_ts, X_last_ts, device=args.device) | |
else: | |
pred = train_predict_tab( | |
ModelClass, X_tab, y_tab, last_feats, device=args.device) | |
except Exception as e: | |
print(f'⚠️ 跳過 {model_name}:{e}') | |
continue | |
predictions[model_name.upper()] = pred | |
print(f'✅ {model_name}: {pred:.2f}') | |
# --------------------- 畫圖 --------------------- | |
fig, ax = plt.subplots(figsize=(10,5)) | |
future_dates = pd.date_range(df_hist.index[-1] + timedelta(days=1), periods=args.days, freq='B') | |
for name, value in predictions.items(): | |
ax.plot(future_dates, [value]*args.days, label=name) | |
if real_future is not None: | |
ax.plot(real_future.index, real_future.values, 'k--', label='Real') | |
ax.set_title(f'{args.ticker} Forecast Comparison (Other Models)') | |
ax.set_xlabel('Date') | |
ax.set_ylabel('Close Price') | |
ax.legend() | |
out_png = f'series_Others_{args.ticker.lower()}_forecast.png' | |
plt.tight_layout() | |
plt.savefig(out_png) | |
print(f'📈 圖表已儲存 {out_png}') | |
# 儲存 CSV | |
out_csv = f'series_Others_{args.ticker.lower()}_forecast.csv' | |
pd.Series(predictions).to_csv(out_csv, header=False) | |
print(f'📄 CSV 已儲存 {out_csv}') | |
if __name__ == "__main__": | |
main() | |