了解一下梯度下降算法,通过公式学习spark实现。

算法介绍

梯度下降(gradient descent)是最小化风险函数、损失函数的一种常用方法,当前有随机梯度下降(stochastic gradient descent)和批量梯度下降(batch gradient descent)两种迭代求解思路。
$$h(\theta)=\sum_{j=0}^n \theta_jx_j$$
$$J(\theta)=\frac 1 2\sum_{i=1}^m(y^i-h_{\theta}(x^i))^2$$
其中$h(\theta)$为待拟合函数,$J(\theta)$为损失函数,$\theta$为参数向量,n为参数的总个数,m为训练集的记录总数,j为其中一个参数,i为其中一条记录。

批量梯度下降(BGD)

  1. 将$J(\theta)$对$\theta$求偏导,得到每个$\theta$对应的的梯度
    $$\frac {\partial J(\theta)} {\partial \theta}=-\frac 1 m \sum_{i=1}^m(y^i-h_{\theta}(x^i))x_j^i$$
  2. 由于是要最小化风险函数,所以按每个参数$\theta$的梯度负方向,来更新每个$\theta$
    $$\theta_j^`=\theta_j+\frac 1 m \sum_{i=1}^m(y^i-h_{\theta}(x^i))x_j^i)$$
  3. 每迭代一步,都要用到训练集所有的数据,所有如果m很大,速度会很慢。

随机梯度下降(SGD)

  1. 将损失函数写成以下方式:
    $$J(\theta)=\frac 1 m\sum_{i=1}^m \frac 1 2 (y^i-h_{\theta}(x^i))^2$$
    所以每个样本的损失函数为
    $$cost(\theta,(x^i,y^i))=\frac 1 2 (y^i-h_{\theta}(x^i))^2$$
  2. 对$\theta$求偏导得到对应梯度,来更新$\theta$
    $$\theta_j^`=\theta_j+(y^i-h_{\theta}(x^i))x_j^i$$
  3. 假设每次下降的步长为$\alpha$
    $$\theta_j^`=\theta_j+\alpha(y^i-h_{\theta}(x^i))x_j^i\tag 1$$

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def sgdDemo {
val featuresMatrix: List[List[Double]] = List(List(1, 4), List(2, 5), List(5, 1), List(4, 2))//特征矩阵
val labelMatrix: List[Double] = List(19, 26, 19, 20)//真实值向量
var theta: List[Double] = List(0, 0)
var loss: Double = 10.0
for {
i <- 0 until 1000 //迭代次数
if (loss > 0.01) //收敛条件loss<=0.01
} {
var error_sum = 0.0 //总误差
var j = i % 4
var h = 0.0
for (k <- 0 until 2) {
h += featuresMatrix(j)(k) * theta(k)
} //计算给出的测试数据集中第j个对象的计算类标签
error_sum = labelMatrix(j) - h //计算给出的测试数据集中类标签与计算的类标签的误差值
var cacheTheta: List[Double] = List()

for (k <- 0 until 2) {
val updaterTheta = theta(k) + 0.001 * (error_sum) * featuresMatrix(j)(k)
cacheTheta = updaterTheta +: cacheTheta
} //更新权重向量
cacheTheta.foreach(t => print(t + ","))
print(error_sum + "\n")
theta = cacheTheta
//更新误差率
var currentLoss: Double = 0
for (j <- 0 until 4) {
var sum = 0.0
for (k <- 0 until 2) {
sum += featuresMatrix(j)(k) * theta(k)
}
currentLoss += (sum - labelMatrix(j)) * (sum - labelMatrix(j))
}
loss = currentLoss
println("loss->>>>" + loss / 4 + ",i>>>>>" + i)
}
}

Spark中的实现
使用org.apache.spark.mllib.regression.LinearRegressionWithSGD分析,后面的类没有特别标注的包名都是org.apache.spark.mllib.regression。

LinearRegressionWithSGD如下:

1
2
3
4
5
6
7
8
9
10
class LinearRegressionWithSGD extends GeneralizedLinearAlgorithm{
//...其他部分省略
private val gradient = new LeastSquaresGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(miniBatchFraction)
//...
}

程序运行入口:

1
2
3
4
5
6
7
//GeneralizedLinearAlgorithm
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
//...
//核心算法在此
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
//...
}

LinearRegressionWithSGD中optimizer为GradientDescent,在GradientDescent中有:

1
2
3
4
5
6
7
8
9
10
11
12
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
}

继续往下,在GradientDescent伴生对象中有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
//...
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
//numIterations 为迭代次数
for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
if (miniBatchSize > 0) {
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)
weights = update._1
regVal = update._2
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
}
//...
(weights, stochasticLossHistory.toArray)
}

其中有gradient.compute和updater.compute两个方法分别执行梯度下降和参数更新。由LinearRegressionWithSGD中看出默认gradient和updater分别为LeastSquaresGradient和SimpleUpdater。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LeastSquaresGradient extends Gradient {
//...
override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
//计算误差值
//dot计算向量的点积
val diff = dot(data, weights) - label
//axpy(a,x,y)让y中的每个参数均执行y += a * x
//当前梯度=原梯度+误差(为负数)*原数据
axpy(diff, data, cumGradient)
diff * diff / 2.0
}
}

即为公式一中的$(y^i-h_{\theta}(x^i))x_j^i$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SimpleUpdater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
//brzAxpy(a,x,y)方法实现y+=a*x,此处即为更新参数
//thisIterStepSize为下降的梯度
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
(Vectors.fromBreeze(brzWeights), 0)
}
}

此即为公式一中的$\theta_j^`=\theta_j+\alpha *$上一步的结果。

参考文章:
http://blog.csdn.net/yangguo_2011/article/details/33859337
http://blog.csdn.net/lilyth_lilyth/article/details/8973972

最近使用spark ALS做推荐,故写一篇文章记录所得。

算法介绍

spark实现的推荐算法是基于矩阵分解的ALS-WR(alternating-least-squares with weighted-λ-regularization)的协同过滤算法。
矩阵r表示用户对物品的评分矩阵,矩阵u,m表示用户和物品的特征矩阵。
$$r\approx u \cdot m$$
损失函数为
$$f(U,M)=\sum(r_{ij}-u_i^T m_j)^2+\lambda(\sum n_{u_i} ||u_i||^2+\sum n_{m_j}||m_j||^2)$$
$n_{u_i}$和$n_{m_i}$分别表示user评分数和电影数,$I_i$表示user i评分过的物品,$I_j$表示评论过物品j的所有用户。
损失函数前一部分为真实值与分解矩阵计算值得误差平方和,后一部分为正则范数,防止过拟合,目的即变成求f函数的最小值。
每次迭代,
固定M,逐个更新每个user的特征u(对u求偏导,令偏导为0求解)。
固定U,逐个更新每个item的特征m(对m求偏导,令偏导为0求解)。
给定的U的一列,称为$u_i$,由解正则化线性最小平方问题得到,这个问题由user i评分记录,以及i评分过的电影的特征向量组成

$\frac 1 2\frac{\partial f}{\partial u_{ki}}=0,\forall{i,k}\tag 1$
$\Rightarrow \sum_{j\in I_i}(u_i^Tm_j-r_{ij})m_{kj}+\lambda n_{ui}u_{ki}=0,\forall{i,k}\tag 2$
$\Rightarrow \sum_{j\in I_i}m_{kj}m_j^Tu_i+\lambda n_{ui}u_{ki}=\sum_{j\in I_i}m_{kj}r_{ij},\forall{i,k}\tag 3$
$\Rightarrow (M_{I_i}M_{I_i}^T+\lambda n_{u_i}E)u_i=M_{I_i}R^T(i,I_i),\forall i \tag 4$
$\Rightarrow u_i=A_i^{-1}V_i,\forall i\tag 5$

其中$A_i=M_{I_i}M_{I_i}^T+\lambda n_{u_i}E=M_{I_i}R^T(i,I_i)$E是单位矩阵,$M_{I_i}$表示M的子矩阵,它的列是user i评分过的物品,$R(i,I_i)$是以评分物品为列的矩阵R的第i行向量。
同理$m_j=A_j^{-1}V_j$

代码实现

本文从spark中的LocalALS分析(SparkALS与其原理一样,只不过用RDD实现并行计算处理)

1
2
3
4
5
6
7
8
9
10
11
//ITERATIONS 为迭代次数
for (iter <- 1 to ITERATIONS) {
println(s"Iteration $iter:")
//固定用户,逐个更新所有物品的特征
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
//固定物品,逐个更新所有用户的特征
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
//计算均方根误差
println("RMSE = " + rmse(R, ms, us))
println()
}

迭代更新的方法为updateMovie和updateUser,实现原理一致,取其一介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def updateUser(j: Int, u: RealVector, ms: Array[RealVector], R:RealMatrix) : RealVector = {
//F为隐性因子的个数,M为电影的总数,U为用户的总数
var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
var Xty: RealVector = new ArrayRealVector(F)
// For each movie that the user rated
// 迭代所有电影,应取该用户评论过的电影集
for (i <- 0 until M) {
val m = ms(i)//取出电影的特征向量
// Add m * m^t to XtX
// outProduct即为矩阵叉乘另一个矩阵的转置矩阵
// add方法会把两个矩阵每个对应位置相加
XtX = XtX.add(m.outerProduct(m))
// Add m * rating to Xty
// mapMultiply方法会用将原向量中的每个值都乘以入参(此处即用户j对电影i的评分)
// add方法会把每个向量的对应位置相加
Xty = Xty.add(m.mapMultiply(R.getEntry(i, j)))
}
// Add regularization coefficients to diagonal terms
for (d <- 0 until F) {
// addToEntry方法将2维数组中当前参数1行和参数2列确定的对应元素加上参数3形成新值
XtX.addToEntry(d, d, LAMBDA * M)
}
// Solve it with Cholesky
// 使用乔里斯基分解求解,mllib中还提供非负最小二乘分解(NNLS)
new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}

结合公式4,$(M_{I_i}M_{I_i}^T+\lambda n_{u_i}E)u_i=M_{I_i}R^T(i,I_i)$,其中XtX即为公式中$u_i$前的计算结果,Xty为等号后的计算结果。

BLAS(Basic Linear Algebra Subprograms 基础线性代数程序集)相关算法中文的介绍比较少,所以写一篇简单记录下,难免有误,欢迎拍砖。

BLAS按照功能被分为三个级别:
Level 1:矢量-矢量运算
$y \leftarrow \alpha x + y$
Level 2:矩阵-矢量运算
$y \leftarrow \alpha Ax + \beta y$
Level 3:矩阵-矩阵运算
$C \leftarrow \alpha AB + \beta C$
官方参考实现:http://www.netlib.org/blas/
代码示例选用com.github.fommil.netlib.BLAS(spark官方亦是这个)

本文介绍下Level1

srot

实现数组a,b间数据的处理,处理公式为:
$a_i = c \ast a_i + s \ast b_i$
$b_i = c \ast b_i - s \ast a_i$

1
2
3
4
5
6
7
public void srot(int n, float[] sx, int _sx_offset, int incx, float[] sy, int _sy_offset, int incy, float c, float s);
参数解析:
n :需要处理的数据总数
sx|sy :互相处理数据的数组
_sx_offset|_sy_offset :sx|sy数组数据处理起始的位置
incx|incy :步长,取数据的间隔
c|s :数据处理的系数

1
2
3
4
5
6
7
8
9
val a = Array(2.0f,3.0f,5.0f,4.0f,1.0f)
val b = Array(2.0f,3.0f,5.0f,4.0f,1.0f)

val e = blas.srot(2,a,1,1,b,0,2,3.0f,2.0f)

println(a.mkString(","))
println(b.mkString(","))
//2.0,13.0,25.0,4.0,1.0
//0.0,3.0,5.0,4.0,1.0

本例中需要处理的数据数量为2,a数组从索引1开始,以步长为1取数据进行处理,也就是取到(3.0,5.0),b数组从索引0开始,以步长为2取数据进行处理,也就是取到(2.0,5.0),那么也就是对这四个数据进行处理,怎么处理呢?
按照公式$a_i = c \ast a_i + s \ast b_i$,$b_i = c \ast b_i - s \ast a_i$
$a_1$ = 3.0 $\ast$ 3+2.0 $\ast$ 2 = 13.0
$a_2$ = 5.0 $\ast$ 3+5.0 $\ast$ 2 = 25.0
$b_0$ = 2.0$\ast$3-3.0$\ast$2 = 0.0
$b_2$ = 5.0$\ast$3-5.0$\ast$2 = 5.0

sscal

实现数组内数据的乘法
$sx_i = sx_i \ast sa$

1
2
3
4
5
6
public void sscal(int n, float sa, float[] sx, int _sx_offset, int incx);
参数解析:
n :需要处理的数据总数
sx :需要处理数据的数组
_sx_offset :sx数组数据处理起始的位置
incx :步长,取数据的间隔

1
2
3
4
5
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
blas.sscal(2,2.0f,a,0,1)

println(a.mkString(","))
//2.0,4.0,3.0,4.0,5.0

以步长1从索引0开始取两个数据,也即(1,2),进行乘法,乘法的系数是2.0,所以结果就是(2,4)

saxpy

数组处理,处理公式为:$y=a \ast x+y$

1
2
3
4
5
6
7
public void saxpy(int n, float sa, float[] sx, int _sx_offset, int incx, float[] sy, int _sy_offset, int incy);
参数解析:
n :需要处理的数据总数
sa :数据处理的系数
sx|sy :互相处理数据的数组
_sx_offset|_sy_offset :sx|sy数组数据处理起始的位置
incx|incy :步长,取数据的间隔

1
2
3
4
5
6
7
8
9
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val b = Array(1.0f,2.0f,3.0f,4.0f,5.0f)

blas.saxpy(2,2.0f,a,0,1,b,1,1)

println(a.mkString(","))
println(b.mkString(","))
//1.0,2.0,3.0,4.0,5.0
//1.0,4.0,7.0,4.0,5.0

从索引0,以步长1从a取2个数字,也即(1.0,2.0),从索引1,以步长1从b取2个数字,也即(2.0,3.0),执行计算(1.0$\ast$2.0+2.0=4.0,2.0$\ast$2.0+3.0=7.0),放置到刚才b取出数据的位置即为结果

sswap

实现数组内数据的交换

1
2
3
4
5
6
public void sswap(int n, float[] sx, int _sx_offset, int incx, float[] sy, int _sy_offset, int incy);
参数解析:
n :需要交换的数据总数
sx|sy :互相交换数据的数组
_sx_offset|_sy_offset :sx|sy数组数据交换起始的位置
incx|incy :步长,取数据的间隔

1
2
3
4
5
6
7
8
val a =  Array(1.0f,3.0f,5.0f,2.0f,4.0f)
val b = Array(1.0f,3.0f,5.0f,2.0f,4.0f)
blas.sswap(2,a,1,1,b,2,2)

println(a.mkString(","))
println(b.mkString(","))
//1.0,5.0,4.0,2.0,4.0
//1.0,3.0,3.0,2.0,5.0

a数组从索引1开始取,步长为1,取俩,也就是(3.0,5.0)
b数组从索引2开始取,步长为2,取俩,也就是(5.0,4.0)
ab取出来的数据互相交换位置即为结果

dasum

实现数组内数据的累加

1
2
3
4
5
6
public double dasum(int n, double[] dx, int _dx_offset, int incx)
参数解析:
n :表示总共取多少数据累加,取0的话结果为0
dx :执行累加的数据集
_dx_offset :累加第一个数据在数组dx中的index
incx :步长,每次累加的数据的间隔

1
2
3
4
val d = blas.dasum(2,Array(1.0,2.0,3.0,4.0,5.0),2,2)

println(d)
//8.0

以步长2从索引2开始取2个数据,也即(3.0,5.0),执行加法,结果即8.0。

scopy

实现前一个数组复制到后一个数组中

1
2
3
4
5
6
public void scopy(int n, float[] sx, int _sx_offset, int incx, float[] sy, int _sy_offset, int incy);
参数解析:
n :需要复制的数据总数
sx|sy :sx为源数据,复制到sy
_sx_offset|_sy_offset :sx|sy数组数据复制|插入的起始的位置
incx|incy :步长,取数据的间隔

1
2
3
4
5
6
7
8
9
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val b = Array(1.0f,2.0f,3.0f,4.0f,5.0f)

blas.scopy(2,a,0,1,b,2,2)

println(a.mkString(","))
println(b.mkString(","))
//1.0,2.0,3.0,4.0,5.0
//1.0,2.0,1.0,4.0,2.0

从索引0,以步长1从a取2个数字,也即(1.0,2.0),放到b的从索引2开始,步长2的位置,原数据是(3.0,5.0),替换原数据,即得结果。

sdot

矢量的点积
$x = (a_1,a_2,…,a_n)$,$y = (b_1,b_2,…,b_n)$
$sdot(x,y) = a_1 \ast b_1+a_2 \ast b_2+\cdots+a_n \ast b_n$

1
2
3
4
5
6
public float sdot(int n, float[] sx, int _sx_offset, int incx, float[] sy, int _sy_offset, int incy);
参数解析:
n :需要计算的数据总数
sx|sy :sx|sy为数据源
_sx_offset|_sy_offset :sx|sy数组开始计算的位置
incx|incy :步长,取数据的间隔

1
2
3
4
5
6
7
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val b = Array(1.0f,2.0f,3.0f,4.0f,5.0f)

val c = blas.sdot(2,a,2,1,b,1,1)

println(c)
//18.0

从索引2,以步长1从a取2个数字,也即(3.0,4.0),从索引1,以步长1从b取2个数字,也即(2.0,3.0),结果=3.0$\ast$2.0+4.0$\ast$3.0=18.0

sdsdot

同sdot,不过多加一个额外的系数
$x = (a_1,a_2,…,a_n)$,$y = (b_1,b_2,…,b_n)$
$sdot(x,y) = a_1 \ast b_1+a_2 \ast b_2+\cdots+a_n \ast b_n+c$
其中c为系数

1
2
3
4
5
6
7
public float sdsdot(int n, float sb, float[] sx, int _sx_offset, int incx, float[] sy, int _sy_offset, int incy);
参数解析:
n :需要计算的数据总数
sb :额外需要加的系数
sx|sy :sx|sy为数据源
_sx_offset|_sy_offset :sx|sy数组开始计算的位置
incx|incy :步长,取数据的间隔

1
2
3
4
5
6
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val b = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val c = blas.sdsdot(2,11.0f,a,1,2,b,3,1)

println(c)
//39.0

从索引1,以步长2从a取2个数字,也即(2.0,4.0),从索引3,以步长1从b取2个数字,也即(4.0,5.0),结果=2.0$\ast$4.0+4.0$\ast$5.0+11.0=39.0

snrm2

欧几里得范数
$x = (a_1,a_2,…,a_n)$
$snrm2(x) = \sqrt{a_1^2+a_2^2+\cdots+a_n^2}$
其中c为系数

1
2
3
4
5
6
public float snrm2(int n, float[] x, int _x_offset, int incx);
参数解析:
n :需要计算的数据总数
x :x为数据源
_x_offset :x数组开始计算的位置
incx :步长,取数据的间隔

1
2
3
4
5
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val c = blas.snrm2(2,a,2,1)

println(c)
//5.0

从索引2,以步长1从a取2个数字,也即(3.0,4.0),勾三股四弦五结果=5.0

isamax

求最大值的位置

1
2
3
4
5
6
public int isamax(int n, float[] sx, int _sx_offset, int incx);
参数解析:
n :需要计算的数据总数
sx :sx为数据源
_x_offset :sx数组开始计算的位置
incx :步长,取数据的间隔

1
2
3
4
5
val a = Array(1.0f,2.0f,3.0f,4.0f,5.0f)
val c = blas.isamax(4,a,0,1)

println(c)
//4

从索引0开始以步长1取出4个数,也即(1.0f,2.0f,3.0f,4.0f),计算最大值的位置(从1开始)

为了加深对sparkRDD的理解,写一篇文章讲解下RDD各API的用法。

Actions 解释 简介
map(f: T=>U) RDD[T] => RDD[U] 一对一转换
flatMap(f: T=>Seq[U]) RDD[T] => RDD[U] 一对多转换
filter(f: T=>Bool) RDD[T] => RDD[T] 过滤
disdict() RDD[T] => RDD[T] 去重
sample(fract:Float) RDD[T] => RDD[T] 取样
groupByKey() RDD[(K,V)] => RDD[(K,Seq[V])] 分组
combineByKey() RDD[(K,V)] => RDD[(K,Seq[V])] 合并
reduceByKey(f: (V,V)=>V) RDD[(K,V)] => RDD[(K,V)] 合并
union() (RDD[T],RDD[T]) => RDD[T] 合并俩RDD
join() (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(V,W))] 俩RDD按KEY合并
cogroup() (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(Seq[V],Seq[W]))] 合并
mapValue(f: V=>W) RDD[(K,V)] => RDD[(K,W)] 转换Value
sort(c:Comparator[K]) RDD[(K,V)] => RDD[(K,V)] 排序
cartesian() (RDD[(K,V)],RDD[(X,W)]) => RDD[((K,X),(V,W))] 笛卡尔积
count() RDD[(K,V)] => Long 统计长度
collect() RDD[(K,V)] => Seq[T] 执行计算
reduce(f:(T,T)=>T) RDD[T] => T 聚合
lookup(k:K) RDD[(K,V)] => Seq[V] 查找
aggregate() RDD[T]=>RDD[U] 聚合
fold() RDD[T]=>RDD[U] 聚合
zip() (RDD[V],RDD[W]) => RDD[(V,W)] 集合依次合并
zipWithIndex() RDD[T] => RDD[(T,Long)] 生成索引
zipWithUniqueId() RDD[T] => RDD[(T,Long)] 生成唯一ID
intersection() RDD[T] => RDD[T] 计算交集
subtract() RDD[T] => RDD[T] 计算余集
coalesce()/repartition() RDD[T] => RDD[T] 重新分区

各个方法的实例如下

map

map执行数据集的转换操作,将RDD中每个元素执行f函数生成新的数据

1
def map[U: ClassTag](f: T => U): RDD[U]

1
2
3
4
5
val arr = Array((1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f"),(7,"g"))
val rdd = sc.parallelize(arr).map(f=>("A"+f._1*10,f._2+"#"))

println(rdd.collect().mkString(","))
//(A10,a#),(A20,b#),(A30,c#),(A40,d#),(A50,e#),(A60,f#),(A70,g#)

flatMap

作用同map,不过一个元素可能生成多个结果数据。

1
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

1
2
3
4
5
val arr = Array("1#2#3","4#5","6")
val rdd = sc.parallelize(arr).flatMap(f=>f.split("#"))

println(rdd.collect().mkString(","))
//1,2,3,4,5,6

filter

过滤操作,返回函数f为true的数据

1
def filter(f: T => Boolean): RDD[T]

1
2
3
4
5
val arr = Array("1#2#3","4#5","6")
val rdd = sc.parallelize(arr).filter(f=>f.length>=3)

println(rdd.collect().mkString(","))
//1#2#3,4#5

distinct

去重操作。

1
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

1
2
3
4
5
val arr = Array(1,2,3,2,3,4,5,4,6)
val rdd = sc.parallelize(arr).distinct()

println(rdd.collect().mkString(","))
//4,1,6,3,5,2

sample

随机取样本。

1
2
3
4
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

1
2
3
4
5
6
7
8
9
val arr = 1 to 20
val rdd = sc.parallelize(arr,3)
val a = rdd.sample(true,0.5,10)
val b = rdd.sample(false,0.5,10)

println("a:"+a.collect().mkString(","))
println("b:"+b.collect().mkString(","))
//a:2,7,11,12,12,15,15,18
//b:1,4,7,8,10,11,15,16,17,18,20

第一个参数如果为true,可能会有重复的元素,如果为false,不会有重复的元素;
第二个参数取值为[0,1],最后的数据个数大约等于第二个参数乘总数;
第三个参数为随机因子。

groupByKey

相同key值集合到一起

1
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"b"),(2,"c"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd = sc.parallelize(arr,3)
val a = rdd.groupByKey()

println(a.collect().mkString(","))
//(3,CompactBuffer(c)),(4,CompactBuffer(d)),(1,CompactBuffer(a, b)),(2,CompactBuffer(b, c, d))

combineByKey

对相同的key值的元素执行处理和聚合操作

1
2
3
4
5
6
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]

1
2
3
4
5
6
7
8
val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd = sc.parallelize(arr,3)
val a = rdd.combineByKey(f=>new StringBuffer(f),
(a:StringBuffer,b:String)=>(a.append(b)),
(c:StringBuffer,d:StringBuffer)=>(c.append(d)))

println(a.collect().mkString(","))
//(3,c),(4,d),(1,ab),(2,cbd)

combineByKey三个参数
createCombiner: V => C,相同key的第一个元素的进入的时候调用
mergeValue: (C, V) => C,相同key的第N个元素(N$\geq$2)进入的时候与第一个函数结果进行合并的函数
mergeCombiners: (C, C) => C,相同key不同分区生成的第二个函数结果的合并函数

reduceByKey

相同的key执行reduce操作

1
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd = sc.parallelize(arr,3)
val a = rdd.reduceByKey((a,b)=>a+"|"+b)

println(a.collect().mkString(","))
//(3,c),(4,d),(1,a|b),(2,c|b|d)

union

两个RDD聚合

1
def union(other: RDD[T]): RDD[T]

1
2
3
4
5
6
7
8
val arr = Array((1,"a"),(2,"c"),(2,"b"))
val arr2 = Array((1,"b"),(3,"c"),(4,"d"),(2,"d"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)
val a = rdd1.union(rdd2)

println(a.collect().mkString(","))
//(1,a),(2,c),(2,b),(1,b),(3,c),(4,d),(2,d)

join

两个RDD以key聚合
join 包含 join , fullOuterJoin , leftOuterJoin , rightOuterJoin 等多个方法,用法类似于SQL操作。

1
2
3
4
5
6
7
8
9
10
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def fullOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def leftOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Option[V], W))]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
val arr2 = Array((1,"B"),(3,"C"),(4,"D"),(2,"D"),(2,"E"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)

val a = rdd1.join(rdd2)
val b = rdd1.fullOuterJoin(rdd2)
val c = rdd1.leftOuterJoin(rdd2)
val d = rdd1.rightOuterJoin(rdd2)

println("join:"+a.collect().mkString(","))
println("fullOuterJoin:"+b.collect().mkString(","))
println("leftOuterJoin:"+c.collect().mkString(","))
println("rightOuterJoin:"+d.collect().mkString(","))
//join:(1,(a,B)),(2,(c,D)),(2,(c,E)),(2,(b,D)),(2,(b,E))

//fullOuterJoin:(3,(None,Some(C))),(4,(None,Some(D))),(1,(Some(a),Some(B))),(5,(Some(g),None)),(2,(Some(c),Some(D))),(2,(Some(c),Some(E))),(2,(Some(b),Some(D))),(2,(Some(b),Some(E)))

//leftOuterJoin:(1,(a,Some(B))),(5,(g,None)),(2,(c,Some(D))),(2,(c,Some(E))),(2,(b,Some(D))),(2,(b,Some(E)))

//rightOuterJoin:(3,(None,C)),(4,(None,D)),(1,(Some(a),B)),(2,(Some(c),D)),(2,(Some(c),E)),(2,(Some(b),D)),(2,(Some(b),E))

cogroup

多个RDD聚合

1
2
3
4
5
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

1
2
3
4
5
6
7
8
9
10
11
val arr = Array((1,"a"),(2,"c"))
val arr2 = Array((1,"B"),(3,"C"))
val arr3 = Array((1,"C"),(3,"D"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)
val rdd3 = sc.parallelize(arr3)

val a = rdd1.cogroup(rdd2,rdd3)//支持1-3个RDD参数

println(a.collect().mkString(","))
//(3,(CompactBuffer(),CompactBuffer(C),CompactBuffer(D))),(1,(CompactBuffer(a),CompactBuffer(B),CompactBuffer(C))),(2,(CompactBuffer(c),CompactBuffer(),CompactBuffer()))

demo结果显示,如果key不匹配会默认给一个空值

mapValue

对key,value结构数据(二元组)的value执行map

1
def mapValues[U](f: V => U): RDD[(K, U)]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.mapValues(f=>f.toUpperCase())

println(a.collect().mkString(","))
//(1,A),(2,C),(2,B),(5,G)

sort

RDD排序,sort 提供 sortBy 和sortByKey 等多个方法。

1
2
3
4
5
6
7
8
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def sortByKey(ascending: Boolean = true,
numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val arr = Array((1,"a"),(2,"c"),(3,"b"),(5,"g"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.sortByKey(true)
val b = rdd1.sortByKey(false)
val c = rdd1.sortBy(f=>f._2,true)
val d = rdd1.sortBy(f=>f._2,false)

println(a.collect().mkString(","))
println(b.collect().mkString(","))
println(c.collect().mkString(","))
println(d.collect().mkString(","))
//(1,a),(2,c),(3,b),(5,g)
//(5,g),(3,b),(2,c),(1,a)
//(1,a),(3,b),(2,c),(5,g)
//(5,g),(2,c),(3,b),(1,a)

第二个参数为倒序控制

cartesian

cartesian进行笛卡尔积运算

1
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

1
2
3
4
5
6
7
8
9
10
11
12
13
val arr = Array((1,"a"),(2,"c"))
val arr2 = Array((3,"C"),(4,"D"),(5,"E"))
val rdd1 = sc.parallelize(arr,3)
val rdd2 = sc.parallelize(arr2)
val a = rdd1.cartesian(rdd2)

println(a.collect().mkString("\n"))
//((1,a),(3,C))
//((1,a),(4,D))
//((1,a),(5,E))
//((2,c),(3,C))
//((2,c),(4,D))
//((2,c),(5,E))

count

count 返回RDD的长度

1
def count(): Long

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"c"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.count()

println(a)
//3

collect

collect会触发数据的执行,生成结果集

1
def collect(): Array[T]

reduce

1
def reduce(f: (T, T) => T): T
1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"D"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.reduce((a,b)=>(a._1+b._1,a._2+b._2))

println(a)
//(5,acD)

lookup

lookup查询key为入参的所有数据集

1
def lookup(key: K): Seq[V]

1
2
3
4
5
6
val arr = Array((1,"a"),(2,"c"),(2,"D"))
val rdd1 = sc.parallelize(arr,3)
val a = rdd1.lookup(2)

println(a.mkString(","))
//c,D

aggregate

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U={...}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val a = Array(1,2,3,4,5,6,7,8,9)
val m1 = sc.parallelize(a,3)
m1.foreachPartition(f=>println(f.toList))

val res = m1.aggregate[StringBuffer](new StringBuffer("a"))
((a,b)=>a.append(b),(a,b)=>a.append(b))
val res2 = m1.aggregate(3)((a,b)=>a+b,(a,b)=>a*b)

println(s"res:$res")
println(s"res2:$res2")
//List(1, 2, 3)
//List(4, 5, 6)
//List(7, 8, 9)
//res:aa456a123a789
//res2:13122

三个分区数据分别为(1,2,3),(4,5,6),(7,8,9)
res:第一个aggregate是把所有的数据拼成字符串,运行过程如下:
第一步:每个分区先进行第一个函数运算,zeroValue是“a”,所以依次执行append结果就是a123,a456,a789
第二步:继续执行append,初始值也是a,分区来计算的顺序是随机的(可以通过多运行几次看结果来验证),所有最后的结果是a开头,然后依次拼上第一步的结果。

res2:第二个aggregate是执行乘法运算,运行过程如下:
第一步:
3+1+2+3 = 9
3+4+5+6 = 18
3+7+8+9 = 27
第二步:
$3\times9\times18\times27=13122$

fold

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

1
def fold(zeroValue: T)(op: (T, T) => T): T

1
2
3
4
5
6
7
8
9
10
11
12
val a = Array(1,2,3,4,5,6,7,8,9)
val m1 = sc.parallelize(a,3)
m1.foreachPartition(f=>println(f.toList))


val res = m1.fold(3)((a,b)=>a+b)

println(s"res:$res")
//List(1, 2, 3)
//List(4, 5, 6)
//List(7, 8, 9)
//res:57

三个分区依次执行加法
3+1+2+3 = 9
3+4+5+6 = 18
3+7+8+9 = 27
最后累加3+9+18+27 = 57

zip

1
2
3
4
5
6
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
将两个RDD拼到一起
def zipWithIndex(): RDD[(T, Long)]
依次给每个数据一个从0开始的索引
def zipWithUniqueId(): RDD[(T, Long)]
依次给每个数据一个不重复的索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val a = Array(1,2,3,4,5,6,7,8,9)
val b = Array("a","b","c","d","e","f","g","h","i")
val m1 = sc.parallelize(a,3)
val m2 = sc.parallelize(b,3)

val m3 = m1.zip(m2)
val m4 = m1.zipWithIndex()
val m5 = m1.zipWithUniqueId()
println("m3:"+m3.collect().mkString(","))
println("m4:"+m4.collect().mkString(","))
println("m5:"+m5.collect().mkString(","))
//m3:(1,a),(2,b),(3,c),(4,d),(5,e),(6,f),(7,g),(8,h),(9,i)
//m4:(1,0),(2,1),(3,2),(4,3),(5,4),(6,5),(7,6),(8,7),(9,8)
//m5:(1,0),(2,3),(3,6),(4,1),(5,4),(6,7),(7,2),(8,5),(9,8)

注意:
zip时两个RDD的分区数必须一样,长度也必须一样。

intersection

计算两个RDD的交集

1
def intersection(other: RDD[T]): RDD[T]

1
2
3
4
5
6
7
8
val a = Array(1,2,3,4,5,6,7,8,9)
val b = Array(3,4)
val m1 = sc.parallelize(a,3)
val m2 = sc.parallelize(b,3)

val m3 = m1.intersection(m2)
println("m3:"+m3.collect().mkString(","))
//m3:3,4

注意:两个RDD类型须一致

subtract

计算两个RDD的余集

1
def subtract(other: RDD[T], numPartitions: Int): RDD[T]

1
2
3
4
5
6
7
8
val a = Array(1,2,3,4,5,6,7,8,9)
val b = Array(3,4)
val m1 = sc.parallelize(a,3)
val m2 = sc.parallelize(b,2)

val m3 = m1.subtract(m2)
println("m3:"+m3.collect().mkString(","))
//m3:6,9,1,7,2,5,8

coalesce/repartition

重新分区

1
2
3
4
def coalesce(numPartitions: Int, shuffle: Boolean = false)
(implicit ord: Ordering[T] = null) : RDD[T]
def repartition(numPartitions: Int)
(implicit ord: Ordering[T] = null): RDD[T]

1
2
3
4
5
6
7
8
9
10
val rdd = sc.parallelize(Array(1,2,3,4,5,6),3)
val rdd1 = rdd.coalesce(1,false)
val rdd2 = rdd.repartition(1)

println("rdd:"+rdd.partitions.length)
println("rdd1:"+rdd1.partitions.length)
println("rdd2:"+rdd2.partitions.length)
//rdd:3
//rdd1:1
//rdd2:1

本文介绍Mathjax的一些写法。


MathJax简介

MathJax是一款运行在浏览器中的开源的数学符号渲染引擎,使用MathJax可以方便的在浏览器中显示数学公式,不需要使用图片。目前,MathJax可以解析Latex、MathML和ASCIIMathML的标记语言。

基础

公式标记与查看公式

MathJax中的公式排版有两种方式,inline和displayed。inline表示公式嵌入到文本段中,displayed表示公式独自成为一个段落。例如,$f(x)=a+b$这是一个inline公式,而下面$$f(x)=a+b$$是一个displayed公式。
在MathJax中,默认的displayed公式分隔符有\$\$…\$\$,而默认的inline公式分隔符为 (…) ,当然这些都是可以自定义的,具体配置请参考文档。下文中,使用\$\$…\$\$作为displayed分隔符,\$…\$作为inline分隔符(hexo默认也是这两种方式)。

希腊字母

名称 大写 Tex 小写 Tex
alpha A A α \alpha
beta B B β \beta
gamma $\Gamma$ \Gamma $\gamma$ \gamma
delta $\Delta$ \Delta $\delta$ \delta
epsilon E E $\epsilon$ \epsilon
zeta Z Z $\zeta$ \zeta
eta H H $\eta$ \eta
theta $\Theta$ \Theta $\theta$ \theta
iota I I $\iota$ \iota
kappa K K $\kappa$ \kappa
lambda $\Lambda$ \Lambda $\lambda$ \lambda
mu M M $\mu$ \mu
nu N N $\nu$ \nu
xi $\Xi$ \Xi $\xi$ \xi
omicron O O $\omicron$ \omicron
pi $\Pi$ \Pi $\pi$ \pi
rho P P $\rho$ \rho
sigma $\Sigma$ \Sigma $\sigma$ \sigma
tau T T $\tau$ \tau
upsilon $\Upsilon$ \Upsilon $\upsilon$ \upsilon
phi $\Phi$ \Phi $\phi$ \phi
chi X X $\chi$ \chi
psi $\Psi$ \Psi $\psi$ \psi
omega $\Omega$ \Omega $\omega$ \omega

上标与下标

上标和下标分别使用^与_。例如x_i^2:$x_i^2$。
默认情况下,上下标符号仅仅对下一个组起作用。一个组即单个字符或者使用{..}包裹起来的内容。
也就是说,如果使用10^10,会得到$10^10$,而10^{10}才是$10^{10}$。同时,大括号还能消除二义性,如x^5^6将得到一个错误,必须使用大括号来界定^的结合性,如{x^5}^6:${x^5}^6$ 或者 x^{5^6}:$x^{5^6}$。
另外,如果要在左右两边都有上下标,可以用\sideset命令。

例子:\sideset{^1_2}{^3_4}\bigotimes

显示:$\sideset{^1_2}{^3_4}\bigotimes$

括号

  1. 小括号与方括号:使用原始的( )、\left、\right,[ ]即可,如(2+3)[4+4]:(2+3)[4+4]
  2. 大括号:时由于大括号{}被用来分组,因此需要使用{和}表示大括号,也可以
    使用\lbrace 和\rbrace来表示。如{ab}:${ab}$,\lbrace ab \rbrace:$\lbrace ab \rbrace$。
  3. 尖括号:使用\langle 和 \rangle表示左尖括号和右尖括号。如\langle x \rangle:$\langle x \rangle$。
  4. 上取整:使用\lceil 和 \rceil 表示。 如,\lceil x \rceil:$\lceil x \rceil$。
  5. 下取整:使用\lfloor 和 \rfloor 表示。如,\lfloor x \rfloor:$\lfloor x \rfloor$。
  6. 不可见括号:使用.表示。

当要显示大号的括号或分隔符时,要用\left、\right和\lbrace 、\rbrace命令。
例子:f(x,y,z) = 3y^2z \left( 3+\frac{7x+5}{1+y^2} \right)
显示:$f(x,y,z) = 3y^2z \left( 3+\frac{7x+5}{1+y^2} \right)$

求和与积分

\sum用来表示求和符号,其下标表示求和下限,上标表示上限。如,\sum_1^n:$\sum_1^n$。
\int用来表示积分符号,同样地,其上下标表示积分的上下限。如,\int_1^\infty:$\int_1^\infty$。
与此类似的符号还有,\prod:$\prod$,\bigcup:$\bigcup$,\bigcap:$\bigcap$,\iint:$\iint$。

分式与根式

分式的表示。第一种,使用\frac ab,\frac作用于其后的两个组a,b,结果为$\frac ab$。如果你的分子或分母不是单个字符,请使用{..}来分组。第二种,使用\over来分隔一个组的前后两部分,如{a+1\over b+1}:${a+1\over b+1}$。

根式使用\sqrt来表示。如,\sqrt[4]{\frac xy} :$\sqrt[4]{\frac xy}$

字体

  • 使用\mathbb或\Bbb显示黑板粗体字,此字体经常用来表示代表实数、整数、有理数、复数的大写字母。如,
    $\Bbb{CHNQRZ}$。
  • 使用\mathbf显示黑体字,如,
    $\mathbf{ABCDEFGHIJKLMNOPQRSTUVWXYZ}$,$\mathbf{abcdefghijklmnopqrstuvwxyz}$。
  • 使用\mathtt显示打印机字体,如,
    $\mathtt{ABCDEFGHIJKLMNOPQRSTUVWXYZ}$,$\mathtt{abcdefghijklmnopqrstuvwxyz}$。
  • 使用\mathrm显示罗马字体,如,
    $\mathrm{ABCDEFGHIJKLMNOPQRSTUVWXYZ}$,$\mathrm{abcdefghijklmnopqrstuvwxyz}$。
  • 使用\mathscr显示手写体,如,
    $\mathscr{ABCDEFGHIJKLMNOPQRSTUVWXYZ}$。
  • 使用\mathfrak显示Fraktur字母(一种德国字体),如
    $\mathfrak{ABCDEFGHIJKLMNOPQRSTUVWXYZ}$,$\mathfrak{abcdefghijklmnopqrstuvwxyz}$。

要对公式的某一部分字符进行字体转换,可以用{\rm 需转换的部分字符}命令,其中\rm可以参照下表选择合适的字体。一般情况下,公式默认为意大利体。
\rm  罗马体       \it  意大利体
\bf  黑体        \cal  花体
\sl  倾斜体       \sf  等线体
\mit  数学斜体      \tt  打字机字体
\sc  小体大写字母

特殊函数与符号

  1. 常见的三角函数,求极限符号可直接使用+缩写即可。如$sinx$,$arctanx$,$lim_{1→\infty}$。
  2. 比较运算符:\lt \gt \le \ge \neq : $\lt \gt \le \ge \neq$。可以在这些运算符前面加上\not,如\not\lt:$\not\lt$。
  3. \times \div \pm \mp表示:$\times \div \pm \mp$,\cdot表示居中的点,x \cdot y : $x \cdot y$。
  4. 集合关系与运算:\cup \cap \setminus \subset \subseteq \subsetneq \supset \in \notin \emptyset \varnothing :$\cup \cap \setminus \subset \subseteq \subsetneq \supset \in \notin \emptyset \varnothing$
  5. 表示排列使用{n+1 \choose 2k} 或 \binom{n+1}{2k},$\binom{n+1}{2k}$。
  6. 箭头:\to \rightarrow \leftarrow \Rightarrow \Leftarrow \mapsto : $\to \rightarrow \leftarrow \Rightarrow \Leftarrow \mapsto$
  7. 逻辑运算符:\land \lor \lnot \forall \exists \top \bot \vdash \vDash : $\land \lor \lnot \forall \exists \top \bot \vdash \vDash$
  8. \star \ast \oplus \circ \bullet : $\star \ast \oplus \circ \bullet$。
  9. \approx \sim \cong \equiv \prec : $\approx \sim \cong \equiv \prec$。
  10. \infty \aleph_0 $\infty \aleph_0$
    \nabla \partial $\nabla \partial$
    \Im \Re $\Im \Re$。
  11. 模运算 \pmode, 如,a\equiv b\pmod n:$a\equiv b\pmod n$。
  12. \ldots与\cdots,其区别是dots的位置不同,ldots位置稍低,cdots位置居中。a1+a2+⋯+an,a1,a2,…,an。
  13. 一些希腊字母具有变体形式,如 \epsilon \varepsilon : $\epsilon \varepsilon$, \phi \varphi : $\phi \varphi$。

空间

通常MathJax通过内部策略自己管理公式内部的空间,因此a…b与a…….b(.表示空格)都会显示为ab。可以通过在ab间加入\,增加些许间隙,\;增加较宽的间隙,\quad 与 \qquad 会增加更大的间隙,如,$a\quad{b}$。

顶部符号

对于单字符,\hat:$\hat{x}$,多字符可以使用\widehat,$\widehat{xy}$.类似的还有\hat,\overline,\vec,\overrightarrow, \dot \ddot : $\hat{x}\overline{xyz}\vec{a}\overrightarrow{x}\dot{x} \ddot{x} $。

表格

使用\begin {array}{列样式}…\end{array}这样的形式来创建表格,列样式可以是clr表示居中,左,右对齐,还可以使用|表示一条竖线。表格中 各行使用\\分隔,各列使用&分隔。使用\hline在本行前加入一条直线。 例如, \begin {array}{c|lcr} n & \text{Left} & \text{Center} & \text{Right} \\ \hline 1 & 0.24 & 1 & 125 \\ 2 & -1 & 189 & -8 \\ 3 & -20 & 2000 & 1+10i \\ \end{array} 结果:

$$ \begin{array}{c|lcr} n & \text{Left} & \text{Center} & \text{Right} \\ \hline 1 & 0.24 & 1 & 125 \\ 2 & -1 & 189 & -8 \\ 3 & -20 & 2000 & 1+10i \\ \end{array} $$

矩阵

基本用法

使用\$\$\begin {matrix}…\end{matrix}\$\$这样的形式来表示矩阵,在\begin与\end之间加入矩阵中的元素即可。矩阵的行之间使用\分隔,列之间使用&分隔。

例如 \$\$ \begin {matrix} 1 & x & x^2 \\ 1 & y & y^2 \\ 1 & z & z^2 \\ \end{matrix} \$\$ 结果:

$$\begin{matrix} 1 & x & x^2 \\ 1 & y & y^2 \\ 1 & z & z^2 \\ \end{matrix}$$

加括号

如果要对矩阵加括号,可以像上文中提到的一样,使用\left与\right配合表示括号符号。也可以使用特殊的matrix。即替换\begin {matrix}…\end{matrix}中的matrix为pmatrix,bmatrix,Bmatrix,vmatrix,Vmatrix.

如pmatrix:$\begin{pmatrix}1&2\\3&4\end{pmatrix}$
bmatrix:$\begin{bmatrix}1&2\\3&4\end{bmatrix}$
Bmatrix:$\begin{Bmatrix}1&2\\3&4\end{Bmatrix}$
vmatrix:$\begin{vmatrix}1&2\\3&4\end{vmatrix}$
Vmatrix:$\begin{Vmatrix}1&2\\3&4\end{Vmatrix}$

省略元素

可以使用\cdots $\cdots$ \ddots $\ddots$\vdots $\vdots$来省略矩阵中的元素,如:
$$\begin{pmatrix}1&2&\cdots&n\\3&4&\cdots&n\\\vdots&\vdots&\ddots&\vdots\\m&m+1&\cdots&n\end{pmatrix}$$

增广矩阵

增广矩阵需要使用前面的array来实现,如 \$\$ \left[ \begin {array}{cc|c} 1&2&3\ 4&5&6 \end{array} \right] \$\$ 结果:

$$\left[ \begin {array}{cc|c} 1&2&3\\ 4&5&6 \end{array} \right]$$

对齐的公式

有时候可能需要一系列的公式中等号对齐,如:

$$\begin{align}
\sqrt{37} & = \sqrt{\frac{73^2-1}{12^2}} \\
& = \sqrt{\frac{73^2}{12^2}\cdot\frac{73^2-1}{73^2}} \\
& = \sqrt{\frac{73^2}{12^2}}\sqrt{\frac{73^2-1}{73^2}} \\
& = \frac{73}{12}\sqrt{1 - \frac{1}{73^2}} \\
& \approx \frac{73}{12}\left(1 - \frac{1}{2\cdot73^2}\right)
\end{align}$$
这需要使用形如\begin {align}…\end{align}的格式,其中需要使用&来指示需要对齐的位置。请使用右键查看上述公式的代码。

分类表达式

定义函数的时候经常需要分情况给出表达式,可使用\begin {cases}…\end{cases}。其中,使用\来分类,使用&指示需要对齐的位置。如:

$$f(n) =
\begin{cases}
n/2, & \text{if $n$ is even} \\
3n+1, & \text{if $n$ is odd} \\
\end{cases}$$
上述公式的括号也可以移动到右侧,不过需要使用array来实现,如下:

$$\left.
\begin{array}{l}
\text{if $n$ is even:}&n/2\\
\text{if $n$ is odd:}&3n+1
\end{array}
\right\}
=f(n)$$
最后,如果想分类之间的垂直间隔变大,可以使用[2ex]代替\来分隔不同的情况。(3ex,4ex也可以用,1ex相当于原始距离)。

数学符号查询

一般而言,从一个巨大的符号表中查询所需要的特定符号是一件令人沮丧的事情。在此向大家介绍一个LATEX手写符号识别系统,尽情享用吧~ Detexify²

空间问题

在使用Latex公式时,有一些不会影响公式正确性,但却会使其看上去很槽糕的问题。

不要在再指数或者积分中使用 \frac

在指数或者积分表达式中使用\frac会使表达式看起来不清晰,因此在专业的数学排版中很少被使用。应该使用一个水平的/来代替,效果如下:
$$\begin{array}{cc}
\mathrm{Bad} \\
\hline \\
e^{i\frac{\pi}2} \quad e^{\frac{i\pi}2}\\
\int_{-\frac\pi2}^\frac\pi2 \sin x\,dx \\
\end{array}$$

$$\begin{array}{cc}
\mathrm{Better} \\
\hline \\
e^{i\pi/2} \\
\ \int_{-\pi/2}^{\pi/2}\sin x\,dx \\
\end{array}$$

使用 \mid 代替 | 作为分隔符

符号|作为分隔符时有排版空间大小的问题,应该使用\mid代替。效果如下:

$$\begin{array}{cc}
\mathrm{Bad} & \mathrm{Better} \\
\hline \\
{x|x^2\in\Bbb Z} & {x\mid x^2\in\Bbb Z} \\
\end{array}$$

多重积分

对于多重积分,不要使用\int\int此类的表达,应该使用\iint \iiint等特殊形式。效果如下:

$$\begin{array}{cc}
\mathrm{Bad} & \mathrm{Better} \\
\hline \\
\int\int_S f(x)\,dy\,dx & \iint_S f(x)\,dy\,dx \\
\int\int\int_V f(x)\,dz\,dy\,dx & \iiint_V f(x)\,dz\,dy\,dx
\end{array}$$
此外,在微分前应该使用\,来增加些许空间,否则TEX会将微分紧凑地排列在一起。如下:

$$\begin{array}{cc}
\mathrm{Bad} & \mathrm{Better} \\
\hline \\
\iiint_V f(x)dz dy dx & \iiint_V f(x)\,dz\,dy\,dx
\end{array}$$

连分数

书写连分数表达式时,请使用\cfrac代替\frac或者\over两者效果对比如下:

$$x = a_0 + \cfrac{1^2}{a_1+ \cfrac{2^2}{a_2+ \cfrac{3^2}{a_3 + \cfrac{4^4}{a_4 + \cdots}}}} \tag{\cfrac}$$
$$x = a_0 + \frac{1^2}{a_1+ \frac{2^2}{a_2+ \frac{3^2}{a_3 + \frac{4^4}{a_4 + \cdots}}}} \tag{\frac}$$

方程组

(\left{…\right.在hexo中目前不支持)
使用\begin {cases} … \end{cases}表达方程组,如:
$$\begin{cases}
a_1x+b_1y+c_1z=d_1 \\
a_2x+b_2y+c_2z=d_2 \\
a_3x+b_3y+c_3z=d_3
\end{cases}$$

公式标记与引用

使用\tag{yourtag}来标记公式,如果想在之后引用该公式,则还需要加上\label{yourlabel}在\tag之后,如:

$$a:= x^2-y^3\tag{1-1}\label{1-1}$$

为了引用公式,可以使用\eqref {rlabel},如:
$$a+y^3 \stackrel{\eqref{1-1}}= x^2$$
可以看到,通过超链接可以跳转到被引用公式位置。

如何输入其它特殊字符

关系运算符:

$\pm$:\pm
$\times$:\times
$\div$:\div
$\mid$:\mid
$\nmid$:\nmid
$\cdot$:\cdot
$\circ$:\circ
$\ast$:\ast
$\bigodot$:\bigodot
$\bigotimes$:\bigotimes
$\bigoplus$:\bigoplus
$\leq$:\leq
$\geq$:\geq
$\neq$:\neq
$\approx$:\approx
$\equiv$:\equiv
$\sum$:\sum
$\prod$:\prod
$\coprod$:\coprod

集合运算符:

$\emptyset$:\emptyset
$\in$:\in
$\notin$:\notin
$\subset$:\subset
$\supset$:\supset
$\subseteq$:\subseteq
$\supseteq$:\supseteq
$\bigcap$:\bigcap
$\bigcup$:\bigcup
$\bigvee$:\bigvee
$\bigwedge$:\bigwedge
$\biguplus$:\biguplus
$\bigsqcup$:\bigsqcup

对数运算符:

$\log$:\log
$\lg$:\lg
$\ln$:\ln

三角运算符:

$\bot$:\bot
$\angle$:\angle
$30^\circ$:30^\circ
sin:\sin
cos:\cos
tan:\tan
cot:\cot
sec:\sec
csc:\csc

微积分运算符:

$\prime$:\prime
$\int$:\int
$\iint$:\iint
$\iiint$:\iiint
$\iiiint$:\iiiint
$\oint$:\oint
$\lim$:\lim
$\infty$:\infty
$\nabla$:\nabla

逻辑运算符:

$\because$:\because
$\therefore$:\therefore
$\forall$:\forall
$\exists$:\exists
$\not=$:\not=
$\not>$:\not>
$\not\subset$:\not\subset

戴帽符号:

$\hat{y}$:\hat{y}
$\check{y}$:\check{y}
$\breve{y}$:\breve{y}

连线符号:

$\overline{a+b+c+d}$:\overline{a+b+c+d}
$\underline{a+b+c+d}$:\underline{a+b+c+d}
$\overbrace{a+\underbrace{b+c}_{1.0}+d}^{2.0}$:\overbrace{a+\underbrace{b+c}_{1.0}+d}^{2.0}

箭头符号:

$\uparrow$:\uparrow
$\downarrow$:\downarrow
$\Uparrow$:\Uparrow
$\Downarrow$:\Downarrow
$\rightarrow$:\rightarrow
$\leftarrow$:\leftarrow
$\Rightarrow$:\Rightarrow
$\Leftarrow$:\Leftarrow
$\longrightarrow$:\longrightarrow
$\longleftarrow$:\longleftarrow
$\Longrightarrow$:\Longrightarrow
$\Longleftarrow$:\Longleftarrow

要输出字符 空格 # $ % &  { } ,用命令: \空格 # \$ \% \& \ { }

参考文章:
Mathjax与LaTex公式简介
MathJax使用LaTeX语法编写数学公式教程

本文分享一下基于共现矩阵的推荐算法实现


算法讲解

本文采用的是Jaccard相似系数:
$$w_{ij}=\frac {|N_i\cap N_j|}{|N_i|}$$
分母 $|N_i|$ 表示喜欢物品i的用户数,
分子 $|N_i\cap N_j|$ 表示同时喜欢物品i和物品j的用户数。
但是由于物品j可能是热门物品,很多人都喜欢,可能会导致最后结果接近于1,为了避免热门物品的影响,采用以下公式:
$$w_{ij}=\frac {|N_i\cap N_j|}{\sqrt{|N_i||N_j|}}$$

数据准备

本次仅需要用户偏好集
数据准备详细见[用spark实现基于物品属性相似度的推荐算法]

生成物品相似度矩阵

初始化spark运行环境

1
2
val conf = new SparkConf().setAppName("MatrixCal")
val sc = new SparkContext(conf)

不推荐在conf是设置master值,硬编码…

读取用户偏好集文件

1
val src = sc.textFile("hdfs://master:9000/user/hive/warehouse/pdata.db/user_weight")

处理物品属性文件

将物品属性集处理成(索引,(物品ID,属性数组))

1
2
3
4
5
//itemid1:userid1:rating1_1
val rdd = src.map{
line => val lines = line.split(":")
(lines(1),lines(0).toInt,lines(2).toDouble)
}.sortByKey().cache()

获取物品与物品的属性集

1
2
//(用户ID,物品ID)
val user_rdd2 = rdd.map(f=>(f._1,f._2))

计算物品间的共现次数

1
2
3
4
//((物品AID,物品BID),1)
val rdd2 = user_rdd2.join(user_rdd2).map(f => (data._2,1))
//统计相同的物品对出现的次数(((物品AID,物品BID),同时出现的总数))
val rdd3 = rdd2.reduceByKey(_+_)

计算物品间的相似度

采用Jaccard相似系数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//对角线上((4,4),4) ((10,10),6)
val rdd4 = rdd3.filter(f => f._1._1 == f._1._2)
//非对角线上的矩阵
val rdd5 = rdd3.filter(f => f._1._1 != f._1._2)
//(4,((4,10,2),4))
val rdd6 = rdd5.map(f => (f._1._1,(f._1._1,f._1._2,f._2))).join(rdd4.map(f => (f._1._1,f._2) ))
//(10,(4,10,2,5))
val rdd7 = rdd6.map(f => (f._2._1._2,(f._2._1._1,f._2._1._2,f._2._1._3,f._2._2)))
//(10,((4,10,3,4),6))
val rdd8 = rdd7.join(rdd4.map(f => (f._1._1,f._2)))
//(4,10,3,4,6) f._3为同时喜欢f._1,f._2的用户数,f._4,f._5为喜欢f._1,f._2的用户数
val rdd9 = rdd8.map(f => (f._2._1._1,f._2._1._2,f._2._1._3,f._2._1._4,f._2._2))
//
val rdd10 = rdd9.map(f=> (f._1, f._2, (f._3 / sqrt(f._4 * f._5)) ))
rdd10.foreach(println)

本文分享一下老虎最近做的项目学到的一些东西,由于系统的局限性,后采用物品属性来计算相似度,实现推荐算法。


简单说明

item : 物品
user : 用户
rating : 用户对物品的评分结果

数据准备

需要准备两份数据:物品的属性向量集和用户对物品的偏好集。
本文数据均存储于hdfs,其他的数据来源自行修改RDD来源就好。

物品的属性向量集

物品的属性向量集大致如下:

1
2
3
4
#每一行表示一个物品,物品的属性值按照一个固定的设置好的顺序依次排列(分隔符可自定义,本例使用“:”和空格)
itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
itemid2:attr2_1 attr2_2 attr2_3 attr2_4 ...
...

注:

属性的提取取决于需求,本文不设置属性的权值,也即所有属性权值为1,需要权值的同学可以采用多元回归或者其他算法去提取(以后有机会再起一文)

用户对物品的偏好集

格式为:

1
2
3
4
5
#用户id:物品id:评分(分隔符可自定义,本例为“:”)
itemid1:userid1:rating1_1
itemid1:userid2:rating1_2
itemid2:userid1:rating2_1
...

注:

评分等于用户对物品的行为乘权重的总和,各行为的权值可自行拿捏,也可用算法去推导,行为应包含显性(用户购买物品等)和隐形(用户查看物品等)等方面。

生成物品相似度矩阵

IndexedRowMatrix矩阵默认是从0开始的连续索引,故物品属性集实际还多一列:

1
2
3
4
# index为从0开始的连续索引
index1:itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
index2:itemid2:attr2_1 attr2_2 attr2_3 attr2_4 ...
...

用户偏好集也多一列:

1
2
3
4
5
#indexitem的对应关系必须和物品属性集
index1:itemid1:userid1:rating1_1
index1:itemid1:userid2:rating1_2
index2:itemid2:userid1:rating2_1
...

初始化spark运行环境

1
2
val conf = new SparkConf().setAppName("MatrixCal")
val sc = new SparkContext(conf)

不推荐在conf是设置master值,硬编码…

读取物品属性文件

1
val rdd = sc.textFile("hdfs://master:9000/user/root/advertising")

处理物品属性文件

将物品属性集处理成(索引,(物品ID,属性数组))

1
2
3
4
5
6
//index1:itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
val itemsdata = rdd.map{
line => val lines = line.split(":")
val attrs = lines(2).split(" ").map(_.toInt)
(lines(0).toInt,(lines(1).toInt,attrs))
}.sortByKey().cache()

索引与itemid的隐射关系与物品索引集

保存索引与itemid的对应关系和物品索引集,以备后用

1
2
val idxs = itemsdata.map(f=>(f._1,f._2._1)).collectAsMap()
val items = itemsdata.map(f=>(f._1)).distinct().sortBy(f =>f )

获取物品与物品的属性集

1
2
3
4
//(物品索引,物品属性数组)
val itemattrs = itemsdata.map(f=>(f._1,f._2._2))
//((物品A索引,物品B索引),(物品A属性数组,物品B属性数组))
val rdd2 = itemattrs.cartesian(itemattrs).map(f => ((f._1._1,f._2._1),(f._1._2,f._2._2)))

此处有一个笛卡尔积运算[cartesian],可实现前一个RDD与后一个RDD依次进行组合,形成一个新的RDD,KEY(此处KEY为二元组的前一个属性,VALUE指后一个,下同。)为两个RDD中的各一个KEY组成的元组,VALUE为RDD中对应KEY的VALUE组成的元组。
例:rdd1=[(a,1),(b,2)],rdd2=[(c,3),(d,4)]
rdd1 cartesian rdd2 =[((a,c)(1,3)),((a,d)(1,4)),((b,c)(2,3)),((b,d)(2,4))]
注意[]不是数组,只是为了形象的表示RDD

定义相似度算法

本例采用欧氏距离计算相似度

1
2
3
4
5
6
7
def cal(attr1:Array[Int],attr2:Array[Int]) :Double={
var sum = 0
for(i <- 0 to attr1.length-1){
sum += (attr1(i) - attr2(i))*(attr1(i) - attr2(i))
}
1/(1+sqrt(sum))
}

计算物品间的相似度

1
2
3
4
//((物品A索引,物品B索引),物品A与B的相似度)
val rdd4 = rdd2.map(f => (f._1,cal(f._2._1,f._2._2)))
//排序
val rdd5 = rdd4.map(f=>(f._1._1,(f._1._2,f._2))).sortByKey().map(f=>(f._2._1,(f._1,f._2._2))).sortByKey()

获取相似度矩阵

1
2
3
4
5
//(物品索引,该物品与其他物品(按索引从小到大排列)相似度的数组)
val rdd6 = rdd5.map(f=>(f._1,f._2._2)).groupByKey().sortByKey().map(f=>(f._1,f._2.toArray))
val rdd7 = rdd6.map(f => IndexedRow(f._1,Vectors.dense(f._2)) )
//相似度矩阵
val rdd8 = new IndexedRowMatrix(rdd7)

读取用户偏好文件

1
val data = sc.textFile("hdfs://master:9000/user/hive/warehouse/pdata.db/user_weight")

处理用户偏好文件

1
2
3
4
5
6
7
//(用户ID,(物品索引,评分))
val user1 = data.map{
line => val lines = line.split(":")
(lines(2),(lines(0).toInt,lines(3).toDouble))
}.sortByKey().cache()
//((物品ID,用户ID),评分)
val user2 = user1.map(f=> ((f._2._1,f._1),f._2._2))

获取所有的用户集

1
val users = user1.map(f=>f._1).distinct().sortBy(f =>f )

生成用户对所有物品的评分集

由于user2是稀疏的,所以需要补上其他位置的0

1
2
3
4
5
6
7
8
val mm = items cartesian users
//给所有的组合赋值0
val mm1 = mm.map(f=>(f,0.0))
//将零集与真实评分集合并
val mm2 = mm1 union user2
//获取完整的评分集
val mm3 = mm2.groupByKey().map(f=>(f._1,f._2.reduce(_ + _))).sortByKey()
val mm4 = mm3.map(f=>(f._1._1,(f._1._2,f._2)))

生成用户对物品的评分矩阵

1
2
3
4
//(物品索引,用户(用户ID从小到大)对该物品的评分数组)
val mm5 = mm4.map(f=>(f._1,f._2._2)).groupByKey().sortByKey().map(f=>(f._1,f._2.toArray))
val mm6 = mm5.map(f => IndexedRow(f._1,(Vectors.dense(f._2))))
val mm7 = new IndexedRowMatrix(mm6)

生成推荐结果

矩阵乘法采用分块法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//10为分块数
val mm8 = rdd8.toBlockMatrix().multiply(mm7.toBlockMatrix())
//矩阵转置
val mm9 = mm8.transpose.toIndexedRowMatrix()
//(Long,String),用户索引(从0开始的连续整数)与用户ID
val users1 = users.zipWithIndex().map(f=>(f._2,f._1)).sortByKey()
//把索引都换成ID
val mm10 = mm9.rows.map(f=>(f.index,f.vector)).sortByKey().join(users1)
.map(f=>(f._2._2,f._2._1)).map{
f=>
val temp = f._2.toArray.zipWithIndex.map(m=>(m._2,m._1))
.map(f=>(adIdxs(f._1),f._2)).sortWith(_._2 > _._2).toList
(f._1,temp)
}
//保存运算结果
mm10.saveAsTextFile("hdfs://master:9000/addata/recres.data")

  1. 生成的结果集格式为(userid,[(itemid1,score1),(itemid2,score2),…,(itemidn,scoren)])
    且score按照倒序排列
  2. 当前结果没有去重(即用户已评分物品)

后话

这只是一个demo版本,还存在不少问题。