python实现粒子群算法(PSO)优化神经网络超参数——以预测英雄联盟比赛结果为例
目录
程序简介
本实验根据英雄联盟的对局数据,搭建全连接网络分类模型,以粒子群算法对神经网络的节点数和dropout概率进行调优,最后对比默认模型和优化后的模型对英雄联盟比赛结果的预测准确率
粒子群优化算法(PSO) 是一种进化计算技术源于对鸟群捕食的行为研究。粒子群优化算法的基本思想:是通过群体中个体之间的协作和信息共享来寻找最优解。它的优点是收敛快、实现简单,缺点则是容易陷入局部最优
程序/数据集下载
代码分析
导入模块
from tensorflow.keras.layers import Input,Dense,Dropout,Activation import matplotlib.pyplot as plt from tensorflow.keras.models import load_model from tensorflow.keras.models import Model from tensorflow.keras.optimizers import Adam from sklearn.metrics import accuracy_score import pandas as pd import numpy as np import json from copy import deepcopy
数据是英雄联盟各对局10分钟的战况,英雄联盟对战方分为红蓝双方,blueWins就是指是否为蓝方获胜,其他列基本就是小龙、野怪、经济啥的,原谅我解释不清。。。读研的时候升到白银,因为出装问题每一局都要被骂,顺便说一下,我老是在里面装妹子,玩辅助,因为这样骂我的人就少了很多,好友已经满了┭┮﹏┭┮错的不是我,是这个世界
data = pd.read_csv("Static/high_diamond_ranked_10min.csv").iloc[:,1:] print("数据尺寸:",data.shape) print("展示下数据前5行和前5列") data.iloc[:,:5].head()
数据尺寸: (9879, 39) 展示下数据前5行和前5列
blueWins | blueWardsPlaced | blueWardsDestroyed | blueFirstBlood | blueKills | |
---|---|---|---|---|---|
0 | 0 | 28 | 2 | 1 | 9 |
1 | 0 | 12 | 1 | 0 | 5 |
2 | 0 | 15 | 0 | 0 | 7 |
3 | 0 | 43 | 1 | 0 | 4 |
4 | 0 | 75 | 4 | 0 | 6 |
切割数据集,分成训练、验证和测试,这里只用验证集给PSO调优用,并没有给神经网络训练保存最佳checkpoint
data = data.sample(frac=1.0)#打乱数据 trainData = data.iloc[:6000]#训练集 xTrain = trainData.values[:,1:] yTrain = trainData.values[:,:1] valData = data.iloc[6000:8000]#验证集 xVal = valData.values[:,1:] yVal = valData.values[:,:1] testData = data.iloc[8000:]#测试集 xTest = testData.values[:,1:] yTest = testData.values[:,:1]
1个粒子即1个方案,在本实验中,1个粒子就是1个(节点数、dropout概率)的数组,PSO就是计算每个粒子对应方案的适应度,找到最合适的方案的过程,下文的PSO类会根据粒子需要优化的特征对粒子迭代,支持小数或整数的特征,若为自定的离散区间,可在适应度函数中,将非法粒子的适应度给予惩罚值,实例化该类后,需要指定适应度函数给iterate函数对粒子方案进行迭代,本实验的粒子群算法为最简单的形式,更新公式如下:
class PSO(): def __init__(self,featureNum,featureArea,featureLimit,featureType,particleNum=5,epochMax=10,c1=2,c2=2): ''' 粒子群算法 :param featureNum: 粒子特征数 :param featureArea: 特征上下限矩阵 :param featureLimit: 特征上下阙界,也是区间的开闭 0为不包含 1为包含 :param featureType: 特征类型 int float :param particleNum: 粒子个数 :param epochMax: 最大迭代次数 :param c1: 自身认知学习因子 :param c2: 群体认知学习因子 ''' #如上所示 self.featureNum = featureNum self.featureArea = np.array(featureArea).reshape(featureNum,2) self.featureLimit = np.array(featureLimit).reshape(featureNum,2) self.featureType = featureType self.particleNum = particleNum self.epochMax = epochMax self.c1 = c1 self.c2 = c2 self.epoch = 0#已迭代次数 #自身最优适应度记录 self.pBest = [-1e+10 for i in range(particleNum)] self.pBestArgs = [None for i in range(particleNum)] #全局最优适应度记录 self.gBest = -1e+10 self.gBestArgs = None #初始化所有粒子 self.particles = [self.initParticle() for i in range(particleNum)] #初始化所有粒子的学习速度 self.vs = [np.random.uniform(0,1,size=featureNum) for i in range(particleNum)] #迭代历史 self.gHistory = {"特征%d"%i:[] for i in range(featureNum)} self.gHistory["群内平均"] = [] self.gHistory["全局最优"] = [] def standardValue(self,value,lowArea,upArea,lowLimit,upLimit,valueType): ''' 规范一个特征值,使其落在区间内 :param value: 特征值 :param lowArea: 下限 :param upArea: 上限 :param lowLimit: 下限开闭区间 :param upLimit: 上限开闭区间 :param valueType: 特征类型 :return: 修正后的值 ''' if value < lowArea: value = lowArea if value > upArea: value = upArea if valueType is int: value = np.round(value,0) #下限为闭区间 if value <= lowArea and lowLimit==0: value = lowArea + 1 #上限为闭区间 if value >= upArea and upLimit==0: value = upArea - 1 elif valueType is float: #下限为闭区间 if value <= lowArea and lowLimit == 0: value = lowArea + 1e-10 #上限为闭=间 if value >= upArea and upLimit==0: value = upArea - 1e-10 return value def initParticle(self): '''随机初始化1个粒子''' values = [] #初始化这么多特征数 for i in range(self.featureNum): #该特征的上下限 lowArea = self.featureArea[i][0] upArea = self.featureArea[i][1] #该特征的上下阙界 lowLimit = self.featureLimit[i][0] upLimit = self.featureLimit[i][1] #随机值 value = np.random.uniform(0,1) * (upArea-lowArea) + lowArea value = self.standardValue(value,lowArea,upArea,lowLimit,upLimit,self.featureType[i]) values.append(value) return values def iterate(self,calFitness): ''' 开始迭代 :param calFitness:适应度函数 输入为1个粒子的所有特征和全局最佳适应度,输出为适应度 ''' while self.epoch<self.epochMax: self.epoch += 1 for i,particle in enumerate(self.particles): #该粒子的适应度 fitness = calFitness(particle,self.gBest) #更新该粒子的自身认知最佳方案 if self.pBest[i] < fitness: self.pBest[i] = fitness self.pBestArgs[i] = deepcopy(particle) #更新全局最佳方案 if self.gBest < fitness: self.gBest = fitness self.gBestArgs = deepcopy(particle) #更新粒子 for i, particle in enumerate(self.particles): #更新速度 self.vs[i] = np.array(self.vs[i]) + self.c1*np.random.uniform(0,1,size=self.featureNum)*(np.array(self.pBestArgs[i])-np.array(self.particles[i])) + self.c2*np.random.uniform(0,1,size=self.featureNum)*(np.array(self.gBestArgs)-np.array(self.particles[i])) #更新特征值 self.particles[i] = np.array(particle) + self.vs[i] #规范特征值 values = [] for j in range(self.featureNum): #该特征的上下限 lowArea = self.featureArea[j][0] upArea = self.featureArea[j][1] #该特征的上下阙界 lowLimit = self.featureLimit[j][0] upLimit = self.featureLimit[j][1] #随机值 value =self.particles[i][j] value = self.standardValue(value,lowArea,upArea,lowLimit,upLimit,self.featureType[j]) values.append(value) self.particles[i] = values #保存历史数据 for i in range(self.featureNum): self.gHistory["特征%d"%i].append(self.gBestArgs[i]) self.gHistory["群内平均"].append(np.mean(self.pBest)) self.gHistory["全局最优"].append(self.gBest) print("PSO epoch:%d/%d 群内平均:%.4f 全局最优:%.4f"%(self.epoch,self.epochMax,np.mean(self.pBest),self.gBest))
buildNet函数根据网络节点数和dropout概率来构建一个简单的全连接分类网络,其输入特征数为38,输出特征数为1(当然,也可以选择网络层数、学习率等超参数来优化,为方便学习,这里只选择这俩超参数进行实验)并对该网络进行训练
def buildNet(nodeNum,p): ''' 搭建全连接网络 进行训练,返回模型和训练历史、验证集准确率和测试集准确率 :param nodeNum: 网络节点数 :param p: dropout概率 ''' #输入层 38个对局特征 inputLayer = Input(shape=(38,)) #中间层 middle = Dense(nodeNum)(inputLayer) middle = Dropout(p)(middle) #输出层 二分类 outputLayer = Dense(1,activation="sigmoid")(middle) #建模 二分类损失 model = Model(inputs=inputLayer,outputs=outputLayer) optimizer = Adam(lr=1e-3) model.compile(optimizer=optimizer,loss="binary_crossentropy",metrics=['acc']) #训练 history = model.fit(xTrain,yTrain,verbose=0,batch_size=1000,epochs=100,validation_data=(xVal,yVal)).history #验证集准确率 valAcc = accuracy_score(yVal,model.predict(xVal).round(0)) #测试集准确率 testAcc = accuracy_score(yTest,model.predict(xTest).round(0)) return model,history,valAcc,testAcc
为了跟优化好的模型有所对比,这里我们训练一个默认参数的神经网络,它的超参数取值即各超参数区间的平均值,训练并打印网络结构和训练指标
nodeArea = [10,200]#节点数区间 pArea = [0,0.5]#dropout概率区间 #按区间平均值训练一个神经网络 nodeNum = int(np.mean(nodeArea)) p = np.mean(pArea) defaultNet,defaultHistory,defaultValAcc,defaultTestAcc = buildNet(nodeNum,p) defaultNet.summary() print("\n默认网络的 节点数:%d dropout概率:%.2f 验证集准确率:%.4f 测试集准确率:%.4f"%(nodeNum,p,defaultValAcc,defaultTestAcc))
Model: "model_346" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= input_347 (InputLayer) [(None, 38)] 0 _________________________________________________________________ dense_691 (Dense) (None, 105) 4095 _________________________________________________________________ dropout_346 (Dropout) (None, 105) 0 _________________________________________________________________ dense_692 (Dense) (None, 1) 106 ================================================================= Total params: 4,201 Trainable params: 4,201 Non-trainable params: 0 _________________________________________________________________ 默认网络的 节点数:105 dropout概率:0.25 验证集准确率:0.6535 测试集准确率:0.6578
实例化PSO模型,将区间信息输入,开始迭代,适应度函数就是输入1各粒子和全局最优适应度,返回该粒子对应方案的验证集准确率
featureNum = 2#2个需要优化的特征 featureArea = [nodeArea,pArea]#2个特征取值范围 featureLimit = [[1,1],[0,1]]#取值范围的开闭 0为闭区间 1为开区间 featureType = [int,float]#2个特征的类型 #粒子群算法类 pso = PSO(featureNum,featureArea,featureLimit,featureType) def calFitness(particle,gBest): '''适应度函数,输入1个粒子的数组和全局最优适应度,返回该粒子对应的适应度''' nodeNum,p = particle#取出粒子的特征值 net,history,valAcc,testAcc = buildNet(nodeNum,p) #该粒子方案超过全局最优 if valAcc>gBest: #保存模型和对应信息 net.save("Static/best.h5") history = pd.DataFrame(history) history.to_excel("Static/best.xlsx",index=None) with open("Static/info.json","w") as f: f.write(json.dumps({"valAcc":valAcc,"testAcc":testAcc})) return valAcc #开始用粒子群算法迅游 pso.iterate(calFitness) #载入最佳模型和对应的训练历史 bestNet = load_model("Static/best.h5") with open("Static/info.json","r") as f: info = json.loads(f.read()) bestValAcc = float(info["valAcc"]) bestTestAcc = float(info["testAcc"]) bestHistory = pd.read_excel("Static/best.xlsx") print("最优模型的验证集准确率:%.4f 测试集准确率:%.4f"%(bestValAcc,bestTestAcc))
PSO epoch:1/10 群内平均:0.7210 全局最优:0.7280 PSO epoch:2/10 群内平均:0.7210 全局最优:0.7280 PSO epoch:3/10 群内平均:0.7251 全局最优:0.7280 PSO epoch:4/10 群内平均:0.7275 全局最优:0.7350 PSO epoch:5/10 群内平均:0.7275 全局最优:0.7350 PSO epoch:6/10 群内平均:0.7299 全局最优:0.7350 PSO epoch:7/10 群内平均:0.7313 全局最优:0.7350 PSO epoch:8/10 群内平均:0.7313 全局最优:0.7350 PSO epoch:9/10 群内平均:0.7313 全局最优:0.7350 PSO epoch:10/10 群内平均:0.7313 全局最优:0.7350 最优模型的验证集准确率:0.7350 测试集准确率:0.7350
查看PSO最优解随迭代次数的变换
history = pd.DataFrame(pso.gHistory) history["epoch"] = range(1,history.shape[0]+1) history
特征0 | 特征1 | 群内平均 | 全局最优 | epoch | |
---|---|---|---|---|---|
0 | 50.0 | 0.267706 | 0.7210 | 0.728 | 1 |
1 | 50.0 | 0.267706 | 0.7210 | 0.728 | 2 |
2 | 50.0 | 0.267706 | 0.7251 | 0.728 | 3 |
3 | 57.0 | 0.201336 | 0.7275 | 0.735 | 4 |
4 | 57.0 | 0.201336 | 0.7275 | 0.735 | 5 |
5 | 57.0 | 0.201336 | 0.7299 | 0.735 | 6 |
6 | 57.0 | 0.201336 | 0.7313 | 0.735 | 7 |
7 | 57.0 | 0.201336 | 0.7313 | 0.735 | 8 |
8 | 57.0 | 0.201336 | 0.7313 | 0.735 | 9 |
9 | 57.0 | 0.201336 | 0.7313 | 0.735 | 10 |
对比下默认参数模型和PSO调优模型的准确率,是有点效果,仅供学习…
fig, ax = plt.subplots() x = np.arange(2) a = [defaultValAcc,bestValAcc] b = [defaultTestAcc,bestTestAcc] total_width, n = 0.8, 2 width = total_width / n x = x - (total_width - width) / 2 ax.bar(x, a, width=width, label='val',color="#00BFFF") for x1,y1 in zip(x,a): plt.text(x1,y1+0.01,'%.3f' %y1, ha='center',va='bottom') ax.bar(x + width, b, width=width, label='test',color="#FFA500") for x1,y1 in zip(x,b): plt.text(x1+width,y1+0.01,'%.3f' %y1, ha='center',va='bottom') ax.legend() ax.set_xticks([0, 1]) ax.set_ylim([0,1.2]) ax.set_ylabel("acc") ax.set_xticklabels(["default net","PSO-net"]) fig.savefig("Static/对比.png",dpi=250)