[Python] k近傍法での株価予測

ロジスティック回帰で、翌日の株価の上昇/下降を予測します。

k近傍法

k近傍法(ケイきんぼうほう、k-nearest neighbor algorithm, k-NN)は、特徴空間における最も近い訓練例に基づいた分類の手法であり、パターン認識でよく使われる。

出典: フリー百科事典『ウィキペディア(Wikipedia)』
scikit-learn という有名なモジュールを用いて、有名なあやめの分類問題を、k近傍法で解いてみます。 IPython で行います。 k近傍法 k近傍法(ケイきんぼうほう、英:k-nearest neighbor algorithm,k-NN)は、特徴空...

sklearn で簡単に扱うことができます。

sklearn.neighbors.KNeighborsClassifier

import numpy as np
import pandas as pd
import datetime

import numpy as np
import pandas as pd
import sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix

from pandas_datareader import data as pdr
import yfinance as yf
import requests_cache
# override pandas_datareader
yf.pdr_override()
# casche for downloaded data
expire_after = datetime.timedelta(days=3)
session = requests_cache.CachedSession(cache_name='cache', backend='sqlite', expire_after=expire_after)


def calc_daily_pct_change(data):
    pct_change = data.pct_change()
    return pct_change

def create_dataset(stock, start, ende, lags=40):
    df =  pdr.get_data_yahoo(stock, start, end)

    df_lag = pd.DataFrame(index=df.index)
    df_lag['today'] = df['Adj Close']
    df_lag['Volume'] = df['Volume']

    for i in range(1, lags+1):
        df_lag[f'lag_close_{i}'] = df['Adj Close'].shift(i)
        df_lag[f'lag_vol_{i}'] = df['Volume'].shift(i)

    df_ret = pd.DataFrame(index=df_lag.index)
    df_ret['today'] = df_lag['today'].pct_change()

    for i in range(1, lags+1):
        df_ret[f'lag_close_{i}'] = df_lag[f'lag_close_{i}'].pct_change()
        df_ret[f'lag_vol_{i}'] = df_lag[f'lag_vol_{i}'].pct_change()

    df_ret.dropna(inplace=True)
    df_ret['up_or_down'] = np.sign(df_ret['today'])
    # drop rows where col 'up_or_down'==0 to redress the balance
    df_ret = df_ret[df_ret['up_or_down'] != 0]

    return df_ret

if __name__ == '__main__':
    start = datetime.datetime(2000, 1, 1)
    end = datetime.datetime(2020, 1, 1)

    data = create_dataset('GE', start, end)
    # print(data.describe())
    # print(data['up_or_down'].value_counts())

    X = data.drop(['today', 'up_or_down'], axis=1)
    y = data['up_or_down']

    start_test = datetime.datetime(2019, 1, 1)

    X_train = X[X.index < start_test]
    X_test = X[X.index >= start_test]
    y_train = y[y.index < start_test]
    y_test = y[y.index >= start_test]

    # standardize    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # # n_neighbors の探索
    # param_s_knn ={'est_n_neighbors':[10, 50, 100, 200, 300, 400, 500],
    #                 'est_weights':['uniform','distance'],
    #                 'est_p':[1,2]}
    # best_accuracy = 0
    # best_params = {}

    # for n_neighbors in param_s_knn['est_n_neighbors']:
    #     for weights in param_s_knn['est_weights']:
    #         for p in param_s_knn['est_p']:
    #             model = KNeighborsClassifier(n_neighbors=n_neighbors,
    #                                          weights=weights,
    #                                          p=p)
    #             model.fit(X_train_scaled, y_train)
    #             accuracy = model.score(X_test_scaled, y_test)
    #             if accuracy > best_accuracy:
    #                 best_accuracy = accuracy
    #                 best_params = {'best_n_neighbors': n_neighbors,
    #                                 'best_weights': weights,
    #                                 'best_p': p}
    
    # # 0.5564516129032258
    # # {'best_n_neighbors': 100, 'best_weights': 'uniform', 'best_p': 1}
    # print(best_accuracy)
    # print(best_params)

    model = KNeighborsClassifier(n_neighbors=100,
                                weights='uniform',
                                p=1)
    model.fit(X_train_scaled, y_train)
    pred = model.predict(X_test_scaled)
    pred_prob = model.predict_proba(X_test_scaled)
    # [-1.  1.]
    # print(model.classes_)
    
    # Accuracy 0.556 
    print(f'Accuracy {model.score(X_test_scaled, y_test):.3f}')
    # [[64 51]
    #  [59 74]]
    print(confusion_matrix(pred, y_test))
    # # 上手くいっていない


    # 確率が0.55より大きい場合のみに限定して混同行列を見てみる。
    df_test = pd.DataFrame(y_test)
    df_test['prob_down'] = pred_prob.transpose()[0]
    bound = 0.55
    test_over_bound = df_test[(df_test['prob_down'] > bound) | (df_test['prob_down'] < (1-bound))].copy()
    test_over_bound['pred'] = test_over_bound['prob_down'].map(lambda x: -1. if x>bound else 1.)
    # confusion matrix when prob>0.55
    # [[11  7]
    # [18 21]]
    print(f'confusion matrix when prob>{bound}')
    print(confusion_matrix(test_over_bound['pred'], test_over_bound['up_or_down']))
    # 計算するとこの場合で accuracy は 0.561
    # 上手くいっていない。
    

ロジスティック回帰と同様に上手くいきませんでした。

ロジスティック回帰で、翌日の株価の上昇/下降を予測します。 ロジスティック回帰 ロジスティック回帰(ロジスティックかいき、英:Logistic regression)は、ベルヌーイ分布に従う変数の統計的回帰モデルの一種である。 出典: フリー百科事典『ウィ...