stock-qlib-public / app_series_others.py
tbdavid2019's picture
Add application file
d7d2bc1
#!/usr/bin/env python3
"""
app_series_others.py
--------------------------------------------------------------------
「依樣畫葫蘆」版:把 Qlib 內建的 6 個“其他”模型(gats、sfm、
tabnet、add、igmtf、hist)拉進同一支腳本,比照
`app_series_Attention.py` 的流程:
1. 下載歷史股價 (yfinance)
2. 準備滑動視窗資料
3. 逐一訓練並預測未來 N 天收盤價
4. 畫圖 + 儲存 PNG / CSV
--------------------------------------------------------------------
**重點差異**
* 這些模型各自的 `fit / predict` 介面不完全相同;為了簡化,
這裡用 `importlib` 動態載入,再嘗試:
a. 若類別有 `.fit()`,就呼叫它(DatasetH 版)
b. 否則 fallback 到手動 train loop(跟 attention 版相同)
* 若模型屬於「跨樣本」類(TabNet / SFM / ADD / HIST),
`prepare_data_tabular()` 會把時序資料展平成單筆特徵;
其餘(GATS / IGMTF)依然使用滑窗方式。
--------------------------------------------------------------------
**使用方法**
$ python3 app_series_others.py \\
--ticker TSLA \\
--days 7 # 預測天數 \\
--period 1y # 訓練資料期間(yfinance 標準字串)\\
--cutoff 2025-03-20 # 訓練到哪一天 \\
--compare real # 是否畫出真實線(real / none)
# 例:用一年資料訓練,預測 7 天
python3 app_series_others.py --ticker TSLA --period 1y --days 7 \
--cutoff 2025-03-20 --compare real
"""
import argparse, importlib, os, sys, warnings, math
from pathlib import Path
from datetime import datetime, timedelta
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
# --------------------- 參數 ---------------------
def get_args():
p = argparse.ArgumentParser()
p.add_argument('--ticker', type=str, default='TSLA')
p.add_argument('--period', type=str, default='3mo',
help='yfinance period, e.g. 6mo / 1y / 5y / max')
p.add_argument('--days', type=int, default=7,
help='forecast horizon')
p.add_argument('--cutoff', type=str, default=None,
help='yyyy-mm-dd ; 若為 None 取資料最後一天')
p.add_argument('--compare', type=str, default='real',
choices=['real', 'none'])
p.add_argument('--window', type=int, default=10,
help='滑動視窗長度 (僅對 TS 類模型適用)')
p.add_argument('--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu')
return p.parse_args()
# --------------------- 下載 + 前處理 ---------------------
def fetch_data(ticker, period, cutoff):
df = yf.download(ticker, period=period, auto_adjust=True)
if cutoff is not None:
df = df[df.index <= cutoff]
return df
def prepare_data_ts(df, window):
# 把 OHLCV 5 欄做成 (N, window, 5) tensor
feat_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
data = df[feat_cols].values
X, y = [], []
for i in range(len(data) - window - 1):
X.append(data[i:i+window])
y.append(data[i+window, 3]) # 下一天收盤價
X = torch.tensor(np.array(X), dtype=torch.float32)
y = torch.tensor(np.array(y), dtype=torch.float32)
return X, y
def prepare_data_tabular(df, lookback=30):
"""
把最近 lookback 天的統計特徵展平成單筆向量,
給 TabNet / SFM / ADD / HIST 這類“橫斷面”模型。
"""
feat_cols = ['Open', 'High', 'Low', 'Close', 'Volume']
feats, labels = [], []
for i in range(lookback, len(df)-1):
window = df.iloc[i-lookback:i]
f = []
for col in feat_cols:
series = window[col]
f += [series.mean(), series.std(), series.min(), series.max(),
series.iloc[-1] - series.iloc[0]] # delta
feats.append(f)
labels.append(df.iloc[i, 3]) # 當天 close
X = torch.tensor(np.array(feats), dtype=torch.float32)
y = torch.tensor(np.array(labels), dtype=torch.float32)
return X, y
# --------------------- 動態載入模型 ---------------------
MODEL_SPECS = [
# (module_path, [candidate class names])
('qlib.contrib.model.pytorch_gats', ['GATSModel', 'GATS']),
('qlib.contrib.model.pytorch_sfm', ['SFMModel', 'SFM']),
('qlib.contrib.model.pytorch_tabnet',['TabNet']),
('qlib.contrib.model.pytorch_add', ['ADDModel']),
('qlib.contrib.model.pytorch_igmtf', ['IGMTF']),
('qlib.contrib.model.pytorch_hist', ['HIST'])
]
def load_model(module_path, class_list):
try:
module = importlib.import_module(module_path)
for cls in class_list:
if hasattr(module, cls):
return getattr(module, cls)
warnings.warn(f'{module_path} 裏找不到 {class_list}')
return None
except ImportError as e:
warnings.warn(f'無法 import {module_path}{e}')
return None
# --------------------- 通用 train/predict ---------------------
def train_predict_ts(model_cls, X, y, X_last, device='cpu', epochs=200, lr=1e-3):
model = model_cls(d_feat=X.shape[2], output_dim=1)
net = model.model if hasattr(model, 'model') else model
net.to(device)
ds = TensorDataset(X.to(device), y.to(device))
dl = DataLoader(ds, batch_size=32, shuffle=True)
opt = torch.optim.Adam(net.parameters(), lr=lr)
loss_fn = nn.MSELoss()
net.train()
for _ in range(epochs):
for xb, yb in dl:
opt.zero_grad()
pred = net(xb).squeeze()
loss = loss_fn(pred, yb)
loss.backward()
opt.step()
net.eval()
with torch.no_grad():
pred_future = net(X_last.to(device)).squeeze().item()
return pred_future
def train_predict_tab(model_cls, X, y, X_last, device='cpu',
epochs=200, lr=1e-3):
import inspect
sig = inspect.signature(model_cls.__init__)
kw = {}
if 'd_feat' in sig.parameters: kw['d_feat'] = X.shape[1]
if 'feature_dim' in sig.parameters: kw['feature_dim'] = X.shape[1]
if 'input_dim' in sig.parameters: kw['input_dim'] = X.shape[1]
if 'field_dim' in sig.parameters: kw['field_dim'] = X.shape[1]
if 'embed_dim' in sig.parameters: kw['embed_dim'] = 16
if 'output_dim' in sig.parameters: kw['output_dim'] = 1
if 'target_dim' in sig.parameters: kw['target_dim'] = 1
model = model_cls(**kw)
# ---- 嘗試官方 fit/predict,失敗就 fallback ----
if hasattr(model, 'fit') and hasattr(model, 'predict'):
try:
model.fit(X.numpy(), y.numpy())
return float(model.predict(X_last.numpy()).item())
except Exception as e:
print(f'⚠️ {model_cls.__name__}.fit() 失敗,改用手動訓練 loop:{e}')
# ----------- 手動 train loop ---------------
net = model.model if hasattr(model, 'model') else model
net.to(device)
ds = TensorDataset(X.to(device), y.to(device))
dl = DataLoader(ds, batch_size=32, shuffle=True)
opt = torch.optim.Adam(net.parameters(), lr=lr)
loss_fn = nn.MSELoss()
net.train()
for _ in range(epochs):
for xb, yb in dl:
opt.zero_grad()
loss_fn(net(xb).squeeze(), yb).backward()
opt.step()
net.eval()
with torch.no_grad():
return net(X_last.to(device)).squeeze().item()
return pred_future
def forecast_others(ticker, forecast_days=7, period="1y", cutoff=None, compare_real=False):
df = fetch_data(ticker, period, cutoff)
if df is None or df.empty:
raise ValueError('❌ 無資料!檢查 ticker / 期間設定')
# 分割訓練與真實資料
if compare_real:
real_future = df['Close'].iloc[-forecast_days:]
df_hist = df.iloc[:-forecast_days]
else:
real_future = None
df_hist = df
if df_hist is None or df_hist.empty or len(df_hist) < 11:
raise ValueError(f"{ticker} 訓練資料不足,無法進行預測。")
X_ts, y_ts = prepare_data_ts(df_hist, window=10)
X_last_ts = torch.tensor(df_hist[['Open', 'High', 'Low', 'Close', 'Volume']].values[-10:],
dtype=torch.float32).unsqueeze(0)
X_tab, y_tab = prepare_data_tabular(df_hist, lookback=30)
last_feats = prepare_data_tabular(df_hist, lookback=30)[0][-1].unsqueeze(0)
if X_ts.size(0) == 0 or y_ts.size(0) == 0 or X_tab.size(0) == 0 or y_tab.size(0) == 0:
raise ValueError(f"{ticker} 訓練資料切片後無有效樣本,請嘗試更長的 period 或不同的 cutoff。")
predictions = {}
for mod_path, cls_list in MODEL_SPECS:
ModelClass = load_model(mod_path, cls_list)
if ModelClass is None:
continue
model_name = ModelClass.__name__
print(f'🔍 Training {model_name} ...')
try:
if any(tag in mod_path for tag in ['gats', 'igmtf']):
pred = train_predict_ts(ModelClass, X_ts, y_ts, X_last_ts)
else:
pred = train_predict_tab(ModelClass, X_tab, y_tab, last_feats)
except Exception as e:
print(f'⚠️ 跳過 {model_name}{e}')
continue
predictions[model_name.upper()] = pred
# 畫圖與表格
future_dates = pd.date_range(df_hist.index[-1] + timedelta(days=1), periods=forecast_days, freq='B')
df_out = pd.DataFrame(index=future_dates)
fig, ax = plt.subplots(figsize=(10, 5))
for name, value in predictions.items():
df_out[name] = [value] * forecast_days
ax.plot(future_dates, [value] * forecast_days, label=name)
if compare_real and real_future is not None:
df_out["Real"] = real_future.values
ax.plot(real_future.index, real_future.values, 'k--', label='Real')
ax.set_title(f"{ticker} Forecast Comparison (Other Models)")
ax.set_xlabel('Date')
ax.set_ylabel('Close Price')
ax.legend()
ax.grid(True)
fig.autofmt_xdate()
return fig, df_out
# --------------------- 主流程 ---------------------
def main():
args = get_args()
df = fetch_data(args.ticker, args.period, args.cutoff)
if df.empty:
print('❌ 無資料!檢查 ticker / 期間設定')
sys.exit(1)
# 真實未來 close(for compare)
if args.compare == 'real':
real_future = df['Close'].iloc[-args.days:]
df_hist = df.iloc[:-args.days]
else:
real_future = None
df_hist = df
# 先準備 time‑series 與 tabular 兩份資料
X_ts, y_ts = prepare_data_ts(df_hist, args.window)
X_last_ts = torch.tensor(df_hist[['Open','High','Low','Close','Volume']].values[-args.window:],
dtype=torch.float32).unsqueeze(0)
X_tab, y_tab = prepare_data_tabular(df_hist, lookback=30)
last_feats = prepare_data_tabular(df_hist, lookback=30)[0][-1].unsqueeze(0)
predictions = {}
for mod_path, cls_list in MODEL_SPECS: # ① 這裡拿到 cls_list
ModelClass = load_model(mod_path, cls_list) # ② 傳入 cls_list
if ModelClass is None:
continue
model_name = ModelClass.__name__ # ③ 用真正載到的類名顯示
print(f'🔍 Training {model_name} ...')
try:
# GATS、IGMTF 走 time‑series,其他走 tabular
if any(tag in mod_path for tag in ['gats', 'igmtf']):
pred = train_predict_ts(
ModelClass, X_ts, y_ts, X_last_ts, device=args.device)
else:
pred = train_predict_tab(
ModelClass, X_tab, y_tab, last_feats, device=args.device)
except Exception as e:
print(f'⚠️ 跳過 {model_name}{e}')
continue
predictions[model_name.upper()] = pred
print(f'✅ {model_name}: {pred:.2f}')
# --------------------- 畫圖 ---------------------
fig, ax = plt.subplots(figsize=(10,5))
future_dates = pd.date_range(df_hist.index[-1] + timedelta(days=1), periods=args.days, freq='B')
for name, value in predictions.items():
ax.plot(future_dates, [value]*args.days, label=name)
if real_future is not None:
ax.plot(real_future.index, real_future.values, 'k--', label='Real')
ax.set_title(f'{args.ticker} Forecast Comparison (Other Models)')
ax.set_xlabel('Date')
ax.set_ylabel('Close Price')
ax.legend()
out_png = f'series_Others_{args.ticker.lower()}_forecast.png'
plt.tight_layout()
plt.savefig(out_png)
print(f'📈 圖表已儲存 {out_png}')
# 儲存 CSV
out_csv = f'series_Others_{args.ticker.lower()}_forecast.csv'
pd.Series(predictions).to_csv(out_csv, header=False)
print(f'📄 CSV 已儲存 {out_csv}')
if __name__ == "__main__":
main()