spark ALS学习

最近使用spark ALS做推荐,故写一篇文章记录所得。

算法介绍

spark实现的推荐算法是基于矩阵分解的ALS-WR(alternating-least-squares with weighted-λ-regularization)的协同过滤算法。
矩阵r表示用户对物品的评分矩阵,矩阵u,m表示用户和物品的特征矩阵。
$$r\approx u \cdot m$$
损失函数为
$$f(U,M)=\sum(r_{ij}-u_i^T m_j)^2+\lambda(\sum n_{u_i} ||u_i||^2+\sum n_{m_j}||m_j||^2)$$
$n_{u_i}$和$n_{m_i}$分别表示user评分数和电影数,$I_i$表示user i评分过的物品,$I_j$表示评论过物品j的所有用户。
损失函数前一部分为真实值与分解矩阵计算值得误差平方和,后一部分为正则范数,防止过拟合,目的即变成求f函数的最小值。
每次迭代,
固定M,逐个更新每个user的特征u(对u求偏导,令偏导为0求解)。
固定U,逐个更新每个item的特征m(对m求偏导,令偏导为0求解)。
给定的U的一列,称为$u_i$,由解正则化线性最小平方问题得到,这个问题由user i评分记录,以及i评分过的电影的特征向量组成

$\frac 1 2\frac{\partial f}{\partial u_{ki}}=0,\forall{i,k}\tag 1$
$\Rightarrow \sum_{j\in I_i}(u_i^Tm_j-r_{ij})m_{kj}+\lambda n_{ui}u_{ki}=0,\forall{i,k}\tag 2$
$\Rightarrow \sum_{j\in I_i}m_{kj}m_j^Tu_i+\lambda n_{ui}u_{ki}=\sum_{j\in I_i}m_{kj}r_{ij},\forall{i,k}\tag 3$
$\Rightarrow (M_{I_i}M_{I_i}^T+\lambda n_{u_i}E)u_i=M_{I_i}R^T(i,I_i),\forall i \tag 4$
$\Rightarrow u_i=A_i^{-1}V_i,\forall i\tag 5$

其中$A_i=M_{I_i}M_{I_i}^T+\lambda n_{u_i}E=M_{I_i}R^T(i,I_i)$E是单位矩阵,$M_{I_i}$表示M的子矩阵,它的列是user i评分过的物品,$R(i,I_i)$是以评分物品为列的矩阵R的第i行向量。
同理$m_j=A_j^{-1}V_j$

代码实现

本文从spark中的LocalALS分析(SparkALS与其原理一样,只不过用RDD实现并行计算处理)

1
2
3
4
5
6
7
8
9
10
11
//ITERATIONS 为迭代次数
for (iter <- 1 to ITERATIONS) {
println(s"Iteration $iter:")
//固定用户,逐个更新所有物品的特征
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
//固定物品,逐个更新所有用户的特征
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
//计算均方根误差
println("RMSE = " + rmse(R, ms, us))
println()
}

迭代更新的方法为updateMovie和updateUser,实现原理一致,取其一介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def updateUser(j: Int, u: RealVector, ms: Array[RealVector], R:RealMatrix) : RealVector = {
//F为隐性因子的个数,M为电影的总数,U为用户的总数
var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
var Xty: RealVector = new ArrayRealVector(F)
// For each movie that the user rated
// 迭代所有电影,应取该用户评论过的电影集
for (i <- 0 until M) {
val m = ms(i)//取出电影的特征向量
// Add m * m^t to XtX
// outProduct即为矩阵叉乘另一个矩阵的转置矩阵
// add方法会把两个矩阵每个对应位置相加
XtX = XtX.add(m.outerProduct(m))
// Add m * rating to Xty
// mapMultiply方法会用将原向量中的每个值都乘以入参(此处即用户j对电影i的评分)
// add方法会把每个向量的对应位置相加
Xty = Xty.add(m.mapMultiply(R.getEntry(i, j)))
}
// Add regularization coefficients to diagonal terms
for (d <- 0 until F) {
// addToEntry方法将2维数组中当前参数1行和参数2列确定的对应元素加上参数3形成新值
XtX.addToEntry(d, d, LAMBDA * M)
}
// Solve it with Cholesky
// 使用乔里斯基分解求解,mllib中还提供非负最小二乘分解(NNLS)
new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}

结合公式4,$(M_{I_i}M_{I_i}^T+\lambda n_{u_i}E)u_i=M_{I_i}R^T(i,I_i)$,其中XtX即为公式中$u_i$前的计算结果,Xty为等号后的计算结果。

热评文章