In [2]:
 import yfinance as yf
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.svm import SVR
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import matplotlib.pyplot as plt
from datetime import datetime
 
from skopt import gp_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args

def compute_rsi(data, window):
    delta = data.diff()
    up, down = delta.copy(), delta.copy()
    up[up < 0] = 0
    down[down > 0] = 0
    
    roll_up = up.rolling(window=window).mean()
    roll_down = down.abs().rolling(window=window).mean()
    
    RS = roll_up / roll_down
    RSI = 100.0 - (100.0 / (1.0 + RS))
    
    return RSI

def compute_bollinger_bands(data, window):
    sma = data['Close'].rolling(window=window).mean()
    rolling_std = data['Close'].rolling(window=window).std()
    upper_band = sma + (rolling_std*2)
    lower_band = sma - (rolling_std*2)
    
    return upper_band, lower_band

def create_dataset(dataset, look_back=1):
    dataX, dataY = [], []
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(dataset[i + look_back, 0])
    return np.array(dataX), np.array(dataY)

def get_data(ticker_symbol, start_date, end_date):
    ticker_data = yf.Ticker(ticker_symbol)
    historical_data = ticker_data.history(start=start_date, end=end_date)
    return historical_data[['Close', 'Volume']].copy()
def mean_absolute_percentage_error(y_true, y_pred, epsilon=1e-10):
    return np.mean(np.abs((y_true - y_pred) / (y_true + epsilon))) * 100

#{'RSI_window': 20, 'SMA_window': 9, 'Bollinger_window': 23}

# 评估函数
def evaluate_parameters(rsi_window, sma_window, bollinger_window):
    data = get_data('KO', '2023-01-01', '2023-10-01')
    
    data['RSI'] = compute_rsi(data['Close'], rsi_window)
    data['SMA'] = data['Close'].rolling(window=sma_window).mean()
    data['Upper_Band'], data['Lower_Band'] = compute_bollinger_bands(data, bollinger_window)
    
    data.dropna(inplace=True)
    
    scaler = MinMaxScaler(feature_range=(0, 1))
    data_scaled = scaler.fit_transform(data)
    
    look_back = 10
    X, y = create_dataset(data_scaled, look_back)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12)
    
    model = LinearRegression()
    model.fit(X_train[:, -1, :], y_train)
    y_pred = model.predict(X_test[:, -1, :])
    y_real = y_test
    
    mape = mean_absolute_percentage_error(y_real, y_pred)
    
    return mape
# 定义搜索空间
space  = [Integer(5, 20, name='rsi_window'),
          Integer(3, 15, name='sma_window'),
          Integer(10, 30, name='bollinger_window')]
# 使用贝叶斯优化
@use_named_args(space)
def objective(**params):
    return evaluate_parameters(params['rsi_window'], params['sma_window'], params['bollinger_window'])
res = gp_minimize(objective, space, n_calls=50, random_state=0)
# 获取最佳参数
best_rsi_window = res.x[0]
best_sma_window = res.x[1]
best_bollinger_window = res.x[2]
#  输出最佳参数
print("Best parameters:")
print("RSI window:", res.x[0])
print("SMA window:", res.x[1])
print("Bollinger Bands window:", res.x[2])
data = historical_data[['Close', 'Volume']].copy()
# 使用优化后的参数更新技术指标
data['RSI'] = compute_rsi(data['Close'], best_rsi_window)
data['SMA'] = data['Close'].rolling(window=best_sma_window).mean()
data['Upper_Band'], data['Lower_Band'] = compute_bollinger_bands(data, best_bollinger_window)

data.dropna(inplace=True)

# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

# 使用滑动窗口法创建数据集
look_back = 10
X, y = create_dataset(data_scaled, look_back)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12)

models = {
    "Linear Regression": LinearRegression(),
    "Random Forest": RandomForestRegressor(n_estimators=100, random_state=12),
    "Gradient Boosting": GradientBoostingRegressor(n_estimators=100, random_state=12),
    "SVR": SVR(kernel='rbf', C=1e3, gamma=0.1),
    "LSTM": Sequential([
        LSTM(100, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True),
        Dropout(0.2),
        LSTM(50, return_sequences=True),
        Dropout(0.2),
        LSTM(25),
        Dense(1)
    ])
}

models["LSTM"].compile(optimizer='adam', loss='mean_squared_error')
models["Linear Regression"].fit(X_train[:, -1, :], y_train)
models["Random Forest"].fit(X_train[:, -1, :], y_train)
models["Gradient Boosting"].fit(X_train[:, -1, :], y_train)
models["SVR"].fit(X_train[:, -1, :], y_train)
models["LSTM"].fit(X_train, y_train, epochs=50, batch_size=10, validation_data=(X_test, y_test), verbose=1, shuffle=False)

real_data = ticker_data.history(start='2023-10-16', end=datetime.today().strftime('%Y-%m-%d'))
real_prices = real_data['Close'].values
num_points = min(len(real_prices), 10)

def predict_future(model, initial_input, steps, is_lstm=False):
    future_predictions = []
    current_input = initial_input.copy()
    
    for i in range(steps):
        if is_lstm:
            current_prediction = model.predict(current_input[np.newaxis, :, :])[0, 0]
            future_predictions.append(current_prediction)
            current_input = np.roll(current_input, -1, axis=0)
            current_input[-1, 0] = current_prediction
        else:
            current_prediction = model.predict(current_input[-1, :].reshape(1, -1))
            future_predictions.append(current_prediction[0])
            current_input = np.roll(current_input, -1, axis=0)
            current_input[-1, 0] = current_prediction[0]
    
    return future_predictions
 # 获取每个模型的预测结果
all_predictions = []

for model_name, model in models.items():
    if model_name == "LSTM":
        predictions[model_name] = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=True)
    else:
        predictions[model_name] = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=False)
    
    predicted_prices = scaler.inverse_transform(np.hstack([np.array(predictions[model_name]).reshape(-1, 1), np.zeros((len(predictions[model_name]), data.shape[1]-1))]))[:, 0]
    all_predictions.append(predicted_prices)
    plt.plot(real_data.index[:num_points], predicted_prices, label=f'{model_name} Predicted Price')

# 计算平均预测值
average_prediction = np.mean(all_predictions, axis=0)
predictions["Average"] = average_prediction
plt.plot(real_data.index[:num_points], average_prediction, label='Average Predicted Price', color='cyan', linewidth=1.5)

plt.plot(real_data.index[:num_points], real_prices[:num_points], label='Real Price', color='black', linewidth=1)
plt.legend()
plt.title(f'Price Prediction Comparison: {ticker_symbol}')
plt.show()

print(f"Real Prices: {real_prices[:num_points]}")
for model_name, prediction in predictions.items():
    if model_name in models or model_name == "Average":
        print(f"{model_name} Predicted Prices: {prediction}")
import random

# 定义函数计算预测误差
def mean_absolute_percentage_error(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

# 从2023年随机选择5个10日时间段
random_dates = random.sample(list(historical_data['2023'].index), 5)

model_errors = {}

for start_date in random_dates:
    end_date = start_date + pd.Timedelta(days=10)
    
    real_data = ticker_data.history(start=start_date, end=end_date)
    real_prices = real_data['Close'].values
    num_points = len(real_prices)
    
    for model_name, model in models.items():
        if model_name == "LSTM":
            predicted = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=True)
        else:
            predicted = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=False)
        
        predicted_prices = scaler.inverse_transform(np.hstack([np.array(predicted).reshape(-1, 1), np.zeros((len(predicted), data.shape[1]-1))]))[:, 0]
        error = mean_absolute_percentage_error(real_prices, predicted_prices)
        
        if model_name not in model_errors:
            model_errors[model_name] = []
        model_errors[model_name].append(error)

avg_errors = {model_name: np.mean(errors) for model_name, errors in model_errors.items()}
best_model_name = min(avg_errors, key=avg_errors.get)

print(f"Average MAPE for each model over 5 random periods:")
for model_name, error in avg_errors.items():
    print(f"{model_name}: {error:.2f}%")
    
print(f"\nThe best model based on the average MAPE over 5 random periods is: {best_model_name} with an average error of {avg_errors[best_model_name]:.2f}%.")




Best parameters:
RSI window: 10
SMA window: 15
Bollinger Bands window: 20


NameError: name 'data' is not defined

In [8]:
def compute_rsi(data, window):
    delta = data.diff()
    up, down = delta.copy(), delta.copy()
    up[up < 0] = 0
    down[down > 0] = 0
    
    roll_up = up.rolling(window=window).mean()
    roll_down = down.abs().rolling(window=window).mean()
    
    RS = roll_up / roll_down
    RSI = 100.0 - (100.0 / (1.0 + RS))
    
    return RSI

def compute_bollinger_bands(data, window):
    sma = data['Close'].rolling(window=window).mean()
    rolling_std = data['Close'].rolling(window=window).std()
    upper_band = sma + (rolling_std*2)
    lower_band = sma - (rolling_std*2)
    
    return upper_band, lower_band

def create_dataset(dataset, look_back=1):
    dataX, dataY = [], []
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(dataset[i + look_back, 0])
    return np.array(dataX), np.array(dataY)


In [9]:
ticker_symbol = 'BA'
ticker_data = yf.Ticker(ticker_symbol)
start_date = '2023-01-01'
end_date = '2023-10-01'
historical_data = ticker_data.history(start=start_date, end=end_date)
data = historical_data[['Close', 'Volume']].copy()

# 使用优化后的参数更新技术指标
data['RSI'] = compute_rsi(data['Close'], best_rsi_window)
data['SMA'] = data['Close'].rolling(window=best_sma_window).mean()
data['Upper_Band'], data['Lower_Band'] = compute_bollinger_bands(data, best_bollinger_window)

data.dropna(inplace=True)

# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

# 使用滑动窗口法创建数据集
look_back = 10
X, y = create_dataset(data_scaled, look_back)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12)

models = {
    "Linear Regression": LinearRegression(),
    "Random Forest": RandomForestRegressor(n_estimators=100, random_state=12),
    "Gradient Boosting": GradientBoostingRegressor(n_estimators=100, random_state=12),
    "SVR": SVR(kernel='rbf', C=1e3, gamma=0.1),
    "LSTM": Sequential([
        LSTM(100, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True),
        Dropout(0.2),
        LSTM(50, return_sequences=True),
        Dropout(0.2),
        LSTM(25),
        Dense(1)
    ])
}

models["LSTM"].compile(optimizer='adam', loss='mean_squared_error')
models["Linear Regression"].fit(X_train[:, -1, :], y_train)
models["Random Forest"].fit(X_train[:, -1, :], y_train)
models["Gradient Boosting"].fit(X_train[:, -1, :], y_train)
models["SVR"].fit(X_train[:, -1, :], y_train)
models["LSTM"].fit(X_train, y_train, epochs=50, batch_size=10, validation_data=(X_test, y_test), verbose=1, shuffle=False)

real_data = ticker_data.history(start='2023-10-16', end=datetime.today().strftime('%Y-%m-%d'))
real_prices = real_data['Close'].values
num_points = min(len(real_prices), 10)

def predict_future(model, initial_input, steps, is_lstm=False):
    future_predictions = []
    current_input = initial_input.copy()
    
    for i in range(steps):
        if is_lstm:
            current_prediction = model.predict(current_input[np.newaxis, :, :])[0, 0]
            future_predictions.append(current_prediction)
            current_input = np.roll(current_input, -1, axis=0)
            current_input[-1, 0] = current_prediction
        else:
            current_prediction = model.predict(current_input[-1, :].reshape(1, -1))
            future_predictions.append(current_prediction[0])
            current_input = np.roll(current_input, -1, axis=0)
            current_input[-1, 0] = current_prediction[0]
    
    return future_predictions
 # 获取每个模型的预测结果
all_predictions = []

for model_name, model in models.items():
    if model_name == "LSTM":
        predictions[model_name] = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=True)
    else:
        predictions[model_name] = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=False)
    
    predicted_prices = scaler.inverse_transform(np.hstack([np.array(predictions[model_name]).reshape(-1, 1), np.zeros((len(predictions[model_name]), data.shape[1]-1))]))[:, 0]
    all_predictions.append(predicted_prices)
    plt.plot(real_data.index[:num_points], predicted_prices, label=f'{model_name} Predicted Price')

# 计算平均预测值
average_prediction = np.mean(all_predictions, axis=0)
predictions["Average"] = average_prediction
plt.plot(real_data.index[:num_points], average_prediction, label='Average Predicted Price', color='cyan', linewidth=1.5)

plt.plot(real_data.index[:num_points], real_prices[:num_points], label='Real Price', color='black', linewidth=1)
plt.legend()
plt.title(f'Price Prediction Comparison: {ticker_symbol}')
plt.show()

print(f"Real Prices: {real_prices[:num_points]}")
for model_name, prediction in predictions.items():
    if model_name in models or model_name == "Average":
        print(f"{model_name} Predicted Prices: {prediction}")
import random

# 定义函数计算预测误差
def mean_absolute_percentage_error(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

# 从2023年随机选择5个10日时间段
random_dates = random.sample(list(historical_data['2023'].index), 5)

model_errors = {}

for start_date in random_dates:
    end_date = start_date + pd.Timedelta(days=10)
    
    real_data = ticker_data.history(start=start_date, end=end_date)
    real_prices = real_data['Close'].values
    num_points = len(real_prices)
    
    for model_name, model in models.items():
        if model_name == "LSTM":
            predicted = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=True)
        else:
            predicted = predict_future(model, data_scaled[-look_back:, :], num_points, is_lstm=False)
        
        predicted_prices = scaler.inverse_transform(np.hstack([np.array(predicted).reshape(-1, 1), np.zeros((len(predicted), data.shape[1]-1))]))[:, 0]
        error = mean_absolute_percentage_error(real_prices, predicted_prices)
        
        if model_name not in model_errors:
            model_errors[model_name] = []
        model_errors[model_name].append(error)

avg_errors = {model_name: np.mean(errors) for model_name, errors in model_errors.items()}
best_model_name = min(avg_errors, key=avg_errors.get)

print(f"Average MAPE for each model over 5 random periods:")
for model_name, error in avg_errors.items():
    print(f"{model_name}: {error:.2f}%")
    
print(f"\nThe best model based on the average MAPE over 5 random periods is: {best_model_name} with an average error of {avg_errors[best_model_name]:.2f}%.")


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


NameError: name 'predictions' is not defined