了解一下梯度下降算法,通过公式学习spark实现。
算法介绍
梯度下降(gradient descent)是最小化风险函数、损失函数的一种常用方法,当前有随机梯度下降(stochastic gradient descent)和批量梯度下降(batch gradient descent)两种迭代求解思路。
$$h(\theta)=\sum_{j=0}^n \theta_jx_j$$
$$J(\theta)=\frac 1 2\sum_{i=1}^m(y^i-h_{\theta}(x^i))^2$$
其中$h(\theta)$为待拟合函数,$J(\theta)$为损失函数,$\theta$为参数向量,n为参数的总个数,m为训练集的记录总数,j为其中一个参数,i为其中一条记录。
批量梯度下降(BGD)
- 将$J(\theta)$对$\theta$求偏导,得到每个$\theta$对应的的梯度
$$\frac {\partial J(\theta)} {\partial \theta}=-\frac 1 m \sum_{i=1}^m(y^i-h_{\theta}(x^i))x_j^i$$ - 由于是要最小化风险函数,所以按每个参数$\theta$的梯度负方向,来更新每个$\theta$
$$\theta_j^`=\theta_j+\frac 1 m \sum_{i=1}^m(y^i-h_{\theta}(x^i))x_j^i)$$ - 每迭代一步,都要用到训练集所有的数据,所有如果m很大,速度会很慢。
随机梯度下降(SGD)
- 将损失函数写成以下方式:
$$J(\theta)=\frac 1 m\sum_{i=1}^m \frac 1 2 (y^i-h_{\theta}(x^i))^2$$
所以每个样本的损失函数为
$$cost(\theta,(x^i,y^i))=\frac 1 2 (y^i-h_{\theta}(x^i))^2$$ - 对$\theta$求偏导得到对应梯度,来更新$\theta$
$$\theta_j^`=\theta_j+(y^i-h_{\theta}(x^i))x_j^i$$ - 假设每次下降的步长为$\alpha$
$$\theta_j^`=\theta_j+\alpha(y^i-h_{\theta}(x^i))x_j^i\tag 1$$
代码实现
1 | def sgdDemo { |
Spark中的实现
使用org.apache.spark.mllib.regression.LinearRegressionWithSGD分析,后面的类没有特别标注的包名都是org.apache.spark.mllib.regression。
LinearRegressionWithSGD如下:1
2
3
4
5
6
7
8
9
10class LinearRegressionWithSGD extends GeneralizedLinearAlgorithm{
//...其他部分省略
private val gradient = new LeastSquaresGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(miniBatchFraction)
//...
}
程序运行入口:1
2
3
4
5
6
7//GeneralizedLinearAlgorithm
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
//...
//核心算法在此
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
//...
}
LinearRegressionWithSGD中optimizer为GradientDescent,在GradientDescent中有:1
2
3
4
5
6
7
8
9
10
11
12def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
}
继续往下,在GradientDescent伴生对象中有:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
//...
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
//numIterations 为迭代次数
for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
if (miniBatchSize > 0) {
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)
weights = update._1
regVal = update._2
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
}
//...
(weights, stochasticLossHistory.toArray)
}
其中有gradient.compute和updater.compute两个方法分别执行梯度下降和参数更新。由LinearRegressionWithSGD中看出默认gradient和updater分别为LeastSquaresGradient和SimpleUpdater。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class LeastSquaresGradient extends Gradient {
//...
override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
//计算误差值
//dot计算向量的点积
val diff = dot(data, weights) - label
//axpy(a,x,y)让y中的每个参数均执行y += a * x
//当前梯度=原梯度+误差(为负数)*原数据
axpy(diff, data, cumGradient)
diff * diff / 2.0
}
}
即为公式一中的$(y^i-h_{\theta}(x^i))x_j^i$1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class SimpleUpdater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
//brzAxpy(a,x,y)方法实现y+=a*x,此处即为更新参数
//thisIterStepSize为下降的梯度
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
(Vectors.fromBreeze(brzWeights), 0)
}
}
此即为公式一中的$\theta_j^`=\theta_j+\alpha *$上一步的结果。
参考文章:
http://blog.csdn.net/yangguo_2011/article/details/33859337
http://blog.csdn.net/lilyth_lilyth/article/details/8973972