以AC算法教会程序玩CartPole

目录
  1. CartPole与AC算法简介
    1. CartPole
    2. AC算法
  2. 代码概述
    1. AC部分
    2. 动画gif处理
    3. 主程序处理
TOC

相关文件将会包含在我的GitHub仓库中

CartPole与AC算法简介

CartPole

CartPole是强化学习领域的经典题型了,题目就是一个小车和一个倒立摆,也就是一个可以左右移动的方块上有一跟可以自由倾斜的棍子,开始的时候环境给予棍子一个小的角度,然后让程序控制小车左右移动使得棍子尽量不掉下来。

游戏的评分就是坚持的时间长短,环境会在倾斜15度或者超出小车移动范围的时候结束游戏。一般认为坚持超过200帧则游戏成功。

AC算法

代码思想是强化学习中的ActorCritic算法,即“演员 - 评委”体系。

首先需要介绍的是程序对游戏环境的识别。对于一个未知的状态体系(一般认为是马尔可夫状态),程序可能不能马上得到它走到这一步的价值如何(因为环境反馈的激励有可能不与当前状态直接关联),因此重点将在于如何评估到达一个状态后的价值,以及针对这样的状态-价值变换,我们如何进行抉择。我们一般将这样的两个问题分别称作评委和演员。

一般来说,使用两个网络分别扮演演员和评委,演员输入当前状态,输出概率参数,而评委输入当前状态,选择后状态以及获得的激励,输出估计的价值。

在训练的时候,根据价值网络(评委)的输出,我们可以训练动作网络(演员)的权重,然后通过估计价值和激励价值之差(TD-error)我们可以训练价值网络。

代码概述

代码使用的是VScodepython环境,使用OpenAI-gymCartPole-v0作为测试环境,matplotlibpyplotanimation作为可视化以及gif动图的输出(注意,gif输出采用的writerffmpeg,这需要另行配置,程序会提示当前可用的writer),AC代码没有找到现成库,ActorCritic均使用的tensorflow下的keras构建神经网络,而且都是3层网络(包括输出层和输入层)。

在运行代码时,可以通过调整注释掉的代码改变代码的输出,得到想要的内容。另外,为避免奇怪的冲突,默认gif保存的位置为绝对位置D盘。注意,由于plt.show()的位置在输出gif的函数之前,关掉图像后才能获得gif动图(这是为了避免gif和图像被输出到一起)。

程序在AC的C(Critic)中将Advantage,即TD-error作为评判标准。

同时,由于Advantage中含有本来model自己的输出,使得AC结果不容易收敛,Critic代码采取了双神经网络的形式,一个网络负责进行训练,而另一个神经网络负责输出(作为副本),并以一定周期(程序内为5帧)与前一个神经网络进行同步(更新参数),由此加快收敛。

##代码细节

AC部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# 关于Actor的部分
class Actor:
def __init__(self,learning_rate):
self.lr = learning_rate
self.model = self.model_init()

# 模型建立及设定
def model_init(self):
# 输入层,ob的特征有4个
input_layer = tf.keras.layers.Input(shape=(4,))

# 中间层,取20个神经元,激活函数用的relu
layer = keras.layers.Dense(
units=20,
activation=keras.activations.relu,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
)(input_layer)

# 输出层,输出只有两个动作的概率(左和右),激活函数为softmax
output_layer = keras.layers.Dense(
units=2,
activation=keras.activations.softmax,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
)(layer)

# 设置学习率
self.optimizer = keras.optimizers.Adam(learning_rate=self.lr)

# 建立模型 损失函数选择交叉熵
model = keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(loss='sparse_categorical_crossentropy', optimizer=self.optimizer)

return model

# 动作选择
def choose_action(self,state):# 有p的概率选择0
p = self.model(state)[0].numpy()
rand_one = np.random.rand()
if (rand_one > p[0]):
return 1
else:
return 0

# 模型训练
def fit(self, state, action, weight):
self.model.fit(state, np.array([action]), verbose=0, sample_weight=weight)

# 关于critic的部分
class Critic:
def __init__(self,learning_rate,gama,iter_t):
self.iter = 0 #计算副本更新周期
self.iter_t = iter_t # 副本更新周期
self.lr = learning_rate
self.gama = gama
self.model = self.model_init()
self.model_ = self.model # 创建副本 周期更新

# 模型建立及设定
def model_init(self):
# 输入层,ob的特征有4个
input_layer = tf.keras.layers.Input(shape=(4,))

# 中间层,取20个神经元,激活函数用的relu
layer = keras.layers.Dense(
units=20,
activation=keras.activations.relu,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
)(input_layer)

# 输出层
output_layer = keras.layers.Dense(
units=1,
kernel_initializer=tf.random_normal_initializer(0., .1),
bias_initializer=tf.constant_initializer(0.1),
)(layer)

# 设置学习率
self.optimizer = keras.optimizers.Adam(learning_rate=self.lr)

# 建立模型
model = keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(loss='mse', optimizer=self.optimizer)

return model

def fit(self, state, reward, state_):
self.iter += 1
value = reward + self.model_(state_) * self.gama # 使用副本,使得数据收敛更容易
td_error = value - self.model(state)

self.model.fit(state, value, verbose=0)
if(self.iter >= self.iter_t):
self.iter = 0
self.model_ = self.model

return td_error

有基本的注释,实现了几个简单的功能。

其中A的训练取了个巧,在激励为正的情况下,可以视为训练数据的权重变化带入,就不用打开神经网络里面了。

动画gif处理

照网上写了个函数,然后在需要的帧添加进列表,最后组装即可。

1
2
3
4
5
6
7
8
def display_frames_as_gif(frames,name):
patch = plt.imshow(frames[0])
plt.axis('off')
def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval=1)
anim.save('D:\\'+name+'.gif', writer='ffmpeg', fps=30)

添加帧:

1
frames.append(env.render(mode = 'rgb_array'))

然后最后组装:

1
display_frames_as_gif(frames,'result')

主程序处理

主函数负责调用之前的库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import gym
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from matplotlib import animation

def main():
env = gym.make('CartPole-v0')
frames1 = []
frames2 = []
time = []
actor = Actor(1e-3)
critic = Critic(1e-2,0.95,5)

print("------------------- start trying")
print(animation.writers.list())
for epi in range(200):
rewards = []
observation = env.reset(seed = 1)
observation = observation[np.newaxis, :]

for t in range(1000):
#env.render()# 是否渲染
# if (epi >= 0 and epi <= 9):frames1.append(env.render(mode = 'rgb_array'))
if (epi >= 189 and epi <= 199):frames2.append(env.render(mode = 'rgb_array'))

# 选择
action = actor.choose_action(observation)
observation_, reward, done, info = env.step(action)# action 0左1右
observation_ = observation_[np.newaxis, :]

# 训练
if (done and t < 199): reward = -20
if (done and t >= 199): reward = 20
rewards.append(reward)
td_error = critic.fit(observation, reward, observation_)
actor.fit(observation, action, td_error)
observation = observation_

if done:
print("Episode {}: {} timesteps".format(epi,t+1))
time.append(t)
break
env.close()
DAR
SON