spark SGD学习

了解一下梯度下降算法,通过公式学习spark实现。

算法介绍

梯度下降(gradient descent)是最小化风险函数、损失函数的一种常用方法,当前有随机梯度下降(stochastic gradient descent)和批量梯度下降(batch gradient descent)两种迭代求解思路。
$$h(\theta)=\sum_{j=0}^n \theta_jx_j$$
$$J(\theta)=\frac 1 2\sum_{i=1}^m(y^i-h_{\theta}(x^i))^2$$
其中$h(\theta)$为待拟合函数,$J(\theta)$为损失函数,$\theta$为参数向量,n为参数的总个数,m为训练集的记录总数,j为其中一个参数,i为其中一条记录。

批量梯度下降(BGD)

  1. 将$J(\theta)$对$\theta$求偏导,得到每个$\theta$对应的的梯度
    $$\frac {\partial J(\theta)} {\partial \theta}=-\frac 1 m \sum_{i=1}^m(y^i-h_{\theta}(x^i))x_j^i$$
  2. 由于是要最小化风险函数,所以按每个参数$\theta$的梯度负方向,来更新每个$\theta$
    $$\theta_j^`=\theta_j+\frac 1 m \sum_{i=1}^m(y^i-h_{\theta}(x^i))x_j^i)$$
  3. 每迭代一步,都要用到训练集所有的数据,所有如果m很大,速度会很慢。

随机梯度下降(SGD)

  1. 将损失函数写成以下方式:
    $$J(\theta)=\frac 1 m\sum_{i=1}^m \frac 1 2 (y^i-h_{\theta}(x^i))^2$$
    所以每个样本的损失函数为
    $$cost(\theta,(x^i,y^i))=\frac 1 2 (y^i-h_{\theta}(x^i))^2$$
  2. 对$\theta$求偏导得到对应梯度,来更新$\theta$
    $$\theta_j^`=\theta_j+(y^i-h_{\theta}(x^i))x_j^i$$
  3. 假设每次下降的步长为$\alpha$
    $$\theta_j^`=\theta_j+\alpha(y^i-h_{\theta}(x^i))x_j^i\tag 1$$

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def sgdDemo {
val featuresMatrix: List[List[Double]] = List(List(1, 4), List(2, 5), List(5, 1), List(4, 2))//特征矩阵
val labelMatrix: List[Double] = List(19, 26, 19, 20)//真实值向量
var theta: List[Double] = List(0, 0)
var loss: Double = 10.0
for {
i <- 0 until 1000 //迭代次数
if (loss > 0.01) //收敛条件loss<=0.01
} {
var error_sum = 0.0 //总误差
var j = i % 4
var h = 0.0
for (k <- 0 until 2) {
h += featuresMatrix(j)(k) * theta(k)
} //计算给出的测试数据集中第j个对象的计算类标签
error_sum = labelMatrix(j) - h //计算给出的测试数据集中类标签与计算的类标签的误差值
var cacheTheta: List[Double] = List()

for (k <- 0 until 2) {
val updaterTheta = theta(k) + 0.001 * (error_sum) * featuresMatrix(j)(k)
cacheTheta = updaterTheta +: cacheTheta
} //更新权重向量
cacheTheta.foreach(t => print(t + ","))
print(error_sum + "\n")
theta = cacheTheta
//更新误差率
var currentLoss: Double = 0
for (j <- 0 until 4) {
var sum = 0.0
for (k <- 0 until 2) {
sum += featuresMatrix(j)(k) * theta(k)
}
currentLoss += (sum - labelMatrix(j)) * (sum - labelMatrix(j))
}
loss = currentLoss
println("loss->>>>" + loss / 4 + ",i>>>>>" + i)
}
}

Spark中的实现
使用org.apache.spark.mllib.regression.LinearRegressionWithSGD分析,后面的类没有特别标注的包名都是org.apache.spark.mllib.regression。

LinearRegressionWithSGD如下:

1
2
3
4
5
6
7
8
9
10
class LinearRegressionWithSGD extends GeneralizedLinearAlgorithm{
//...其他部分省略
private val gradient = new LeastSquaresGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(miniBatchFraction)
//...
}

程序运行入口:

1
2
3
4
5
6
7
//GeneralizedLinearAlgorithm
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
//...
//核心算法在此
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
//...
}

LinearRegressionWithSGD中optimizer为GradientDescent,在GradientDescent中有:

1
2
3
4
5
6
7
8
9
10
11
12
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
}

继续往下,在GradientDescent伴生对象中有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
//...
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
//numIterations 为迭代次数
for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
if (miniBatchSize > 0) {
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)
weights = update._1
regVal = update._2
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
}
//...
(weights, stochasticLossHistory.toArray)
}

其中有gradient.compute和updater.compute两个方法分别执行梯度下降和参数更新。由LinearRegressionWithSGD中看出默认gradient和updater分别为LeastSquaresGradient和SimpleUpdater。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LeastSquaresGradient extends Gradient {
//...
override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
//计算误差值
//dot计算向量的点积
val diff = dot(data, weights) - label
//axpy(a,x,y)让y中的每个参数均执行y += a * x
//当前梯度=原梯度+误差(为负数)*原数据
axpy(diff, data, cumGradient)
diff * diff / 2.0
}
}

即为公式一中的$(y^i-h_{\theta}(x^i))x_j^i$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SimpleUpdater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
//brzAxpy(a,x,y)方法实现y+=a*x,此处即为更新参数
//thisIterStepSize为下降的梯度
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
(Vectors.fromBreeze(brzWeights), 0)
}
}

此即为公式一中的$\theta_j^`=\theta_j+\alpha *$上一步的结果。

参考文章:
http://blog.csdn.net/yangguo_2011/article/details/33859337
http://blog.csdn.net/lilyth_lilyth/article/details/8973972

热评文章