前言
本项目利用了PARL环境的DDPG算法框架实现。
数据来源和股票环境来源 https://github.com/wangshub/RL-Stock
原始环境会报错,做了调整后依然报错,后经Arrowarcher同学指正,该项目已经修改并且充分适配PARL。
Arrowarcher同学的项目Arrowarcher/paddlex_gui
forrestneo同学的项目https://github.com/forrestneo/stock_reinforcement_learning_PARL
原环境的基础上,arrowarcher同学对reward奖励做了修改,原reward只要收益为正就是1,在此基础上增加了根据收益的比例变化。
Step1 安装依赖
!pip uninstall -y parl # 说明:AIStudio预装的parl版本太老,容易跟其他库产生兼容性冲突,建议先卸载
!pip uninstall -y pandas scikit-learn # 提示:在AIStudio中卸载这两个库再import parl可避免warning提示,不卸载也不影响parl的使用
!pip install gym
#!pip install atari-py # 玩Gym的Atari游戏必装依赖,本次作业使用了Atari的Pong(乒乓球)环境
!pip install paddlepaddle==1.6.3
!pip install parl==1.3.1
!pip install baostock
检查依赖安装情况
检查依赖包版本是否正确
!pip list | grep paddlepaddle
!pip list | grep parl
!pip list | grep gym
STEP2 下载数据
import baostock as bs
import pandas as pd
import os
OUTPUT = 'work/stockdata'
def mkdir(directory):
if not os.path.exists(directory):
os.makedirs(directory)
class Downloader(object):
def __init__(self,
output_dir,
date_start='1990-01-01',
date_end='2020-03-23'):
self._bs = bs
bs.login()
self.date_start = date_start
# self.date_end = datetime.datetime.now().strftime("%Y-%m-%d")
self.date_end = date_end
self.output_dir = output_dir
self.fields = "date,code,open,high,low,close,volume,amount," \
"adjustflag,turn,tradestatus,pctChg,peTTM," \
"pbMRQ,psTTM,pcfNcfTTM,isST"
def exit(self):
bs.logout()
def get_codes_by_date(self, date):
print(date)
stock_rs = bs.query_all_stock(date)
stock_df = stock_rs.get_data()
print(stock_df)
return stock_df
def run(self):
stock_df = self.get_codes_by_date(self.date_end)
for index, row in stock_df.iterrows():
print(f'processing {row["code"]} {row["code_name"]}')
df_code = bs.query_history_k_data_plus(row["code"], self.fields,
start_date=self.date_start,
end_date=self.date_end).get_data()
df_code.to_csv(f'{self.output_dir}/{row["code"]}.{row["code_name"]}.csv', index=False)
df_code.to_csv(f'{self.output_dir}/{row["code"]}.{row["code_name"]}.csv', index=False)
self.exit()
if __name__ == '__main__':
# 获取全部股票的日K线数据
mkdir('./stockdata/train')
downloader = Downloader('./stockdata/train', date_start='1990-01-01', date_end='2019-11-29')
downloader.run()
mkdir('./stockdata/test')
downloader = Downloader('./stockdata/test', date_start='2019-12-01', date_end='2019-12-31')
downloader.run()
文档结构
导入parl库
import os
import numpy as np
import sys
import codecs
import paddle.fluid as fluid
import parl
from parl import layers
from parl.utils import logger
import random
import json
import gym
from gym import spaces
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
from parl.utils import ReplayMemory # 经验回放
from parl.utils import action_mapping # 将神经网络输出映射到对应的 实际动作取值范围 内
Step3 设置超参数
######################################################################
######################################################################
#
# 1. 请设定 learning rate,尝试增减查看效果
#
######################################################################
######################################################################
ACTOR_LR = 0.0001 # Actor网络更新的 learning rate
CRITIC_LR = 0.0005 # Critic网络更新的 learning rate
GAMMA = 0.99 # reward 的衰减因子,一般取 0.9 到 0.999 不等
TAU = 0.001 # target_model 跟 model 同步参数 的 软更新参数
MEMORY_SIZE = 1e4 # replay memory的大小,越大越占用内存
MEMORY_WARMUP_SIZE = 1e6 # replay_memory 里需要预存一些经验数据,再从里面sample一个batch的经验让agent去learn
REWARD_SCALE = 0.001 # reward 的缩放因子
BATCH_SIZE = 256 # 每次给agent learn的数据数量,从replay memory随机里sample一批数据出来
TRAIN_TOTAL_STEPS = 1e6 # 总训练步数
TEST_EVERY_STEPS = 1e4 # 每个N步评估一下算法效果,每次评估5个episode求平均reward
Step4 搭建Model、Algorithm、Agent架构
Agent把产生的数据传给algorithm,algorithm根据model的模型结构计算出Loss,使用SGD或者其他优化器不断的优化,PARL这种架构可以很方便的应用在各类深度强化学习问题中。
(1)Model
Model用来定义前向(Forward)网络,用户可以自由的定制自己的网络结构。
class ActorModel(parl.Model):
def __init__(self, act_dim):
hidden_dim_1, hidden_dim_2 = 64, 64 # 128
self.fc1 = layers.fc(size=hidden_dim_1, act='relu')
# self.fc2 = layers.fc(size=hidden_dim_2, act='relu')
self.fc2 = layers.fc(size=act_dim, act='tanh')
def policy(self, obs):
x = self.fc1(obs)
x = self.fc2(x)
# x = self.fc3(x)
return x
class CriticModel(parl.Model):
def __init__(self):
# hidden_dim_1, hidden_dim_2 = 64, 64
# self.fc1 = layers.fc(size=hidden_dim_1, act='tanh')
# self.fc2 = layers.fc(size=hidden_dim_2, act='tanh')
# self.fc3 = layers.fc(size=1, act=None)
hid_size = 64
self.fc1 = layers.fc(size=hid_size, act='relu')
self.fc2 = layers.fc(size=1, act=None)
def value(self, obs, act):
x = self.fc1(obs)
# concat = layers.concat([x, act], axis=1)
# x = self.fc2(concat)
Q = self.fc2(x)
Q = layers.squeeze(Q, axes=[1])
return Q
class StockModel(parl.Model):
def __init__(self, act_dim):
self.actor_model = ActorModel(act_dim)
self.critic_model = CriticModel()
def policy(self, obs):
return self.actor_model.policy(obs)
def value(self, obs, act):
return self.critic_model.value(obs, act)
def get_actor_params(self):
return self.actor_model.parameters()
(2)Algorithm
Algorithm 定义了具体的算法来更新前向网络(Model),也就是通过定义损失函数来更新Model,和算法相关的计算都放在algorithm中。
from parl.algorithms import DDPG
(3)Agent
Agent负责算法与环境的交互,在交互过程中把生成的数据提供给Algorithm来更新模型(Model),数据的预处理流程也一般定义在这里。
class Agent(parl.Agent):
def __init__(self, algorithm, obs_dim, act_dim=4):
assert isinstance(obs_dim, int)
assert isinstance(act_dim, int)
self.obs_dim = obs_dim
self.act_dim = act_dim
super(Agent, self).__init__(algorithm)
# Attention: In the beginning, sync target model totally.
self.alg.sync_target(decay=0)
def build_program(self):
self.pred_program = fluid.Program()
self.learn_program = fluid.Program()
with fluid.program_guard(self.pred_program):
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
self.pred_act = self.alg.predict(obs)
with fluid.program_guard(self.learn_program):
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
act = layers.data(
name='act', shape=[self.act_dim], dtype='float32')
reward = layers.data(name='reward', shape=[], dtype='float32')
next_obs = layers.data(
name='next_obs', shape=[self.obs_dim], dtype='float32')
terminal = layers.data(name='terminal', shape=[], dtype='bool')
_, self.critic_cost = self.alg.learn(obs, act, reward, next_obs,
terminal)
def predict(self, obs):
obs = np.expand_dims(obs, axis=0)
act = self.fluid_executor.run(
self.pred_program, feed={'obs': obs},
fetch_list=[self.pred_act])[0]
return act
def learn(self, obs, act, reward, next_obs, terminal):
feed = {
'obs': obs,
'act': act,
'reward': reward,
'next_obs': next_obs,
'terminal': terminal
}
critic_cost = self.fluid_executor.run(
self.learn_program, feed=feed, fetch_list=[self.critic_cost])[0]
self.alg.sync_target()
return critic_cost
Step 4.1 股票环境
import random
import numpy as np
import gym
from gym import spaces
MAX_ACCOUNT_BALANCE = 214748
MAX_NUM_SHARES = 214748
MAX_SHARE_PRICE = 5000
MAX_VOLUME = 1000e6
MAX_AMOUNT = 3e5
MAX_OPEN_POSITIONS = 5
MAX_STEPS = 500
MAX_DAY_CHANGE = 1
max_loss =-50000
max_predict_rate = 4
INITIAL_ACCOUNT_BALANCE = 10000
class StockTradingEnv(gym.Env):
"""A stock trading environment for OpenAI gym"""
metadata = {'render.modes': ['human']}
def __init__(self, df):
super(StockTradingEnv, self).__init__()
self.df = df
self.reward_range = (0, MAX_ACCOUNT_BALANCE)
# Actions of the format Buy x%, Sell x%, Hold, etc.
self.action_space = spaces.Box(
low=np.array([0, 0]), high=np.array([3, 1]), dtype=np.float16)
# Prices contains the OHCL values for the last five prices
self.observation_space = spaces.Box(
low=0, high=1, shape=(19,), dtype=np.float16)
def _next_observation(self):
# 有些股票数据缺失一些数据,这里处理一下,不处理报错
d10 = self.df.loc[self.current_step, 'peTTM'] / 1e4
d11 = self.df.loc[self.current_step, 'pbMRQ'] / 100
d12 = self.df.loc[self.current_step, 'psTTM'] / 100
if np.isnan(d10): # 某些股票大多是0.00000000e+00,如果是nan会报错
d10 = d11 = d12 = 0.00000000e+00
obs = np.array([
self.df.loc[self.current_step, 'open'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'high'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'low'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'close'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'volume'] / MAX_VOLUME,
self.df.loc[self.current_step, 'amount'] / MAX_AMOUNT,
self.df.loc[self.current_step, 'adjustflag'] / 10,
self.df.loc[self.current_step, 'tradestatus'] / 1,
self.df.loc[self.current_step, 'pctChg'] / 100,
d10,
d11,
d12,
self.df.loc[self.current_step, 'pctChg'] / 1e3,
self.balance / MAX_ACCOUNT_BALANCE,
self.max_net_worth / MAX_ACCOUNT_BALANCE,
self.shares_held / MAX_NUM_SHARES,
self.cost_basis / MAX_SHARE_PRICE,
self.total_shares_sold / MAX_NUM_SHARES,
self.total_sales_value / (MAX_NUM_SHARES * MAX_SHARE_PRICE),
])
return obs
def _take_action(self, action):
# Set the current price to a random price within the time step
current_price = random.uniform(
self.df.loc[self.current_step, "open"], self.df.loc[self.current_step, "close"])
action_type = action[0]
amount = action[1]
if action_type <1: #1
# Buy amount % of balance in shares
total_possible = int(self.balance / current_price)
shares_bought = int(total_possible * amount)
prev_cost = self.cost_basis * self.shares_held
additional_cost = shares_bought * current_price
self.balance -= additional_cost
self.cost_basis = (
prev_cost + additional_cost) / (self.shares_held + shares_bought)
self.shares_held += shares_bought
elif action_type < 2:
# Sell amount % of shares held
shares_sold = int(self.shares_held * amount)
self.balance += shares_sold * current_price
self.shares_held -= shares_sold
self.total_shares_sold += shares_sold
self.total_sales_value += shares_sold * current_price
self.net_worth = self.balance + self.shares_held * current_price
if self.net_worth > self.max_net_worth:
self.max_net_worth = self.net_worth
if self.shares_held == 0:
self.cost_basis = 0
def step(self, action):
# Execute one time step within the environment
self._take_action(action)
done = False
self.current_step += 1
if self.max_net_worth >= INITIAL_ACCOUNT_BALANCE * max_predict_rate:
done = True
if self.current_step > len(self.df.loc[:, 'open'].values) - 1:
self.current_step = 0 # loop training
done = True
delay_modifier = (self.current_step / MAX_STEPS)
# profits
profit = self.net_worth - INITIAL_ACCOUNT_BALANCE
profit_percent = profit / INITIAL_ACCOUNT_BALANCE
if profit_percent>=0:
reward = max(1,profit_percent/0.001)
else:
reward = -100
if self.net_worth <= 0 :
done = True
obs = self._next_observation()
return obs, reward, done, {}
def reset(self, new_df=None):
# Reset the state of the environment to an initial state
self.balance = INITIAL_ACCOUNT_BALANCE
self.net_worth = INITIAL_ACCOUNT_BALANCE
self.max_net_worth = INITIAL_ACCOUNT_BALANCE
self.shares_held = 0
self.cost_basis = 0
self.total_shares_sold = 0
self.total_sales_value = 0
self.count = 0
self.interval = 5
# pass test dataset to environment
if new_df:
self.df = new_df
# Set the current step to a random point within the data frame
# self.current_step = random.randint(
# 0, len(self.df.loc[:, 'open'].values) - 6)
self.current_step = 0
return self._next_observation()
def render(self, mode='human'):
# Render the environment to the screen
profit = self.net_worth - INITIAL_ACCOUNT_BALANCE
print('-'*30)
print(f'Step: {self.current_step}')
print(f'Balance: {self.balance}')
print(f'Shares held: {self.shares_held} (Total sold: {self.total_shares_sold})')
print(f'Avg cost for held shares: {self.cost_basis} (Total sales value: {self.total_sales_value})')
print(f'Net worth: {self.net_worth} (Max net worth: {self.max_net_worth})')
print(f'Profit: {profit}')
return profit
Step 5 Training && Test(训练&&测试)
# replay_memory.py
import random
import collections
import numpy as np
def run_episode(env, agent, rpm):
obs = env.reset()
total_reward, steps = 0, 0
while True:
steps += 1
batch_obs = np.expand_dims(obs, axis=0)
action = agent.predict(batch_obs.astype('float32'))
action = np.squeeze(action)
# 给输出动作增加探索扰动,输出限制在 [-1.0, 1.0] 范围内
action = np.random.normal(action, 1.0)
action = np.clip(action, -1.0, 1.0)
# 动作映射到对应的 实际动作取值范围 内, action_mapping是从parl.utils那里import进来的函数
action = action_mapping(action, env.action_space.low[0],
env.action_space.high[0])
next_obs, reward, done, info = env.step(action)
rpm.append(obs, action, REWARD_SCALE * reward, next_obs, done)
if rpm.size() > MEMORY_WARMUP_SIZE:
batch_obs, batch_action, batch_reward, batch_next_obs, \
batch_terminal = rpm.sample_batch(BATCH_SIZE)
critic_cost = agent.learn(batch_obs, batch_action, batch_reward,
batch_next_obs, batch_terminal)
obs = next_obs
total_reward += reward
if done:
break
return total_reward, steps
# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent):
eval_reward = []
for i in range(5):
obs = env.reset()
total_reward, steps = 0, 0
while True:
batch_obs = np.expand_dims(obs, axis=0)
action = agent.predict(batch_obs.astype('float32'))
action = np.squeeze(action)
action = np.clip(action, -1.0, 1.0)
action = action_mapping(action, env.action_space.low[0],
env.action_space.high[0])
next_obs, reward, done, info = env.step(action)
obs = next_obs
total_reward += reward
steps += 1
if done:
break
eval_reward.append(total_reward)
return np.mean(eval_reward)
Step6 创建环境和Agent,启动训练,保存模型
font = fm.FontProperties(fname='font/wqy-microhei.ttc')
# plt.rc('font', family='Source Han Sans CN')
plt.rcParams['axes.unicode_minus'] = False
# 创建环境
#df = pd.read_csv('sh.600055.csv')
df = pd.read_csv('stockdata/train/sh.600055.csv')
df = df.sort_values('date')
The algorithms require a vectorized environment to run
env = StockTradingEnv(df)
# 创建环境
act_dim = env.action_space.shape[0]
obs_dim = env.observation_space.shape[0]
# 创建经验池
rpm = ReplayMemory(int(MEMORY_SIZE), obs_dim, act_dim)
# 根据parl框架构建agent
######################################################################
######################################################################
#
# 4. 请参考课堂Demo,嵌套Model, DQN, Agent构建 agent
#
######################################################################
######################################################################
model = StockModel(act_dim=act_dim)
algorithm = DDPG(
model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
agent = Agent(algorithm, obs_dim=obs_dim, act_dim=act_dim)
# 加载模型
# save_path = './dqn_model.ckpt'
# agent.restore(save_path)
# 先往经验池里存一些数据,避免最开始训练的时候样本丰富度不够
max_episode = 500
test_flag = 0
total_steps = 0
while total_steps < TRAIN_TOTAL_STEPS:
train_reward, steps = run_episode(env, agent, rpm)
total_steps += steps
#logger.info('Steps: {} Reward: {}'.format(total_steps, train_reward)) # 打印训练reward
if total_steps // TEST_EVERY_STEPS >= test_flag: # 每隔一定step数,评估一次模型
while total_steps // TEST_EVERY_STEPS >= test_flag:
test_flag += 1
evaluate_reward = evaluate(env, agent)
env.render()
logger.info('Steps {}, Test reward: {}'.format(
total_steps,evaluate_reward)) # 打印评估的reward
# 每评估一次,就保存一次模型,以训练的step数命名
ckpt = 'steps_{}.ckpt'.format(total_steps)
agent.save(ckpt)
Step7 测试效果
plt.rcParams['axes.unicode_minus'] = False
def stock_trade(stock_file,mode_path):
day_profits = []
df = pd.read_csv(stock_file)
df = df.sort_values('date')
# The algorithms require a vectorized environment to run
env = StockTradingEnv(df)
act_dim = env.action_space.shape[0]
obs_dim = env.observation_space.shape[0]
# 使用parl框架搭建Agent:QuadrotorModel, DDPG, QuadrotorAgent三者嵌套
model = StockModel(act_dim)
algorithm = DDPG(
model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
agent = Agent(algorithm, obs_dim, act_dim)
df_test = pd.read_csv(stock_file.replace('train', 'test'))
# 加载模型
if os.path.exists(mode_path):
agent.restore(mode_path)
env2 = StockTradingEnv(df_test)
obs = env2.reset()
for i in range(len(df_test) - 1):
batch_obs = np.expand_dims(obs, axis=0)
action = agent.predict(batch_obs.astype('float32'))
action = np.squeeze(action)
action = np.random.normal(action, 1.0)
action = np.clip(action, -1.0, 1.0) ## special
action = action_mapping(action, env2.action_space.low[0],
env2.action_space.high[0])
next_obs, reward, done, info = env2.step(action)
obs = next_obs
profit = env2.render()
day_profits.append(profit)
if done:
break
return day_profits
def find_file(path, name):
# print(path, name)
for root, dirs, files in os.walk(path):
for fname in files:
if name in fname:
return os.path.join(root, fname)
def test_a_stock_trade(stock_code,mode_path):
stock_file = find_file('./stockdata/train', str(stock_code))
daily_profits = stock_trade(stock_file,mode_path)
fig, ax = plt.subplots()
ax.plot(daily_profits, '-o', label=stock_code, marker='o', ms=10, alpha=0.7, mfc='orange')
ax.grid()
plt.xlabel('step')
plt.ylabel('profit')
ax.legend()
# plt.show()
plt.savefig(f'./{stock_code}.png')
def multi_stock_trade(mode_path):
start_code = 600000
max_num = 3000
group_result = []
for code in range(start_code, start_code + max_num):
stock_file = find_file('./stockdata/train', str(code))
if stock_file:
try:
profits = stock_trade(stock_file, mode_path)
group_result.append(profits)
except Exception as err:
print(err)
with open(f'code-{start_code}-{start_code + max_num}.pkl', 'wb') as f:
pickle.dump(group_result, f)
if __name__ == '__main__':
# multi_stock_trade()
test_a_stock_trade('sh.600061.国投资本','steps_1001736.ckpt')
# ret = find_file('./stockdata/train', '600036')
# print(ret)