用spark实现基于物品属性相似度的推荐算法

本文分享一下老虎最近做的项目学到的一些东西,由于系统的局限性,后采用物品属性来计算相似度,实现推荐算法。


简单说明

item : 物品
user : 用户
rating : 用户对物品的评分结果

数据准备

需要准备两份数据:物品的属性向量集和用户对物品的偏好集。
本文数据均存储于hdfs,其他的数据来源自行修改RDD来源就好。

物品的属性向量集

物品的属性向量集大致如下:

1
2
3
4
#每一行表示一个物品,物品的属性值按照一个固定的设置好的顺序依次排列(分隔符可自定义,本例使用“:”和空格)
itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
itemid2:attr2_1 attr2_2 attr2_3 attr2_4 ...
...

注:

属性的提取取决于需求,本文不设置属性的权值,也即所有属性权值为1,需要权值的同学可以采用多元回归或者其他算法去提取(以后有机会再起一文)

用户对物品的偏好集

格式为:

1
2
3
4
5
#用户id:物品id:评分(分隔符可自定义,本例为“:”)
itemid1:userid1:rating1_1
itemid1:userid2:rating1_2
itemid2:userid1:rating2_1
...

注:

评分等于用户对物品的行为乘权重的总和,各行为的权值可自行拿捏,也可用算法去推导,行为应包含显性(用户购买物品等)和隐形(用户查看物品等)等方面。

生成物品相似度矩阵

IndexedRowMatrix矩阵默认是从0开始的连续索引,故物品属性集实际还多一列:

1
2
3
4
# index为从0开始的连续索引
index1:itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
index2:itemid2:attr2_1 attr2_2 attr2_3 attr2_4 ...
...

用户偏好集也多一列:

1
2
3
4
5
#indexitem的对应关系必须和物品属性集
index1:itemid1:userid1:rating1_1
index1:itemid1:userid2:rating1_2
index2:itemid2:userid1:rating2_1
...

初始化spark运行环境

1
2
val conf = new SparkConf().setAppName("MatrixCal")
val sc = new SparkContext(conf)

不推荐在conf是设置master值,硬编码…

读取物品属性文件

1
val rdd = sc.textFile("hdfs://master:9000/user/root/advertising")

处理物品属性文件

将物品属性集处理成(索引,(物品ID,属性数组))

1
2
3
4
5
6
//index1:itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
val itemsdata = rdd.map{
line => val lines = line.split(":")
val attrs = lines(2).split(" ").map(_.toInt)
(lines(0).toInt,(lines(1).toInt,attrs))
}.sortByKey().cache()

索引与itemid的隐射关系与物品索引集

保存索引与itemid的对应关系和物品索引集,以备后用

1
2
val idxs = itemsdata.map(f=>(f._1,f._2._1)).collectAsMap()
val items = itemsdata.map(f=>(f._1)).distinct().sortBy(f =>f )

获取物品与物品的属性集

1
2
3
4
//(物品索引,物品属性数组)
val itemattrs = itemsdata.map(f=>(f._1,f._2._2))
//((物品A索引,物品B索引),(物品A属性数组,物品B属性数组))
val rdd2 = itemattrs.cartesian(itemattrs).map(f => ((f._1._1,f._2._1),(f._1._2,f._2._2)))

此处有一个笛卡尔积运算[cartesian],可实现前一个RDD与后一个RDD依次进行组合,形成一个新的RDD,KEY(此处KEY为二元组的前一个属性,VALUE指后一个,下同。)为两个RDD中的各一个KEY组成的元组,VALUE为RDD中对应KEY的VALUE组成的元组。
例:rdd1=[(a,1),(b,2)],rdd2=[(c,3),(d,4)]
rdd1 cartesian rdd2 =[((a,c)(1,3)),((a,d)(1,4)),((b,c)(2,3)),((b,d)(2,4))]
注意[]不是数组,只是为了形象的表示RDD

定义相似度算法

本例采用欧氏距离计算相似度

1
2
3
4
5
6
7
def cal(attr1:Array[Int],attr2:Array[Int]) :Double={
var sum = 0
for(i <- 0 to attr1.length-1){
sum += (attr1(i) - attr2(i))*(attr1(i) - attr2(i))
}
1/(1+sqrt(sum))
}

计算物品间的相似度

1
2
3
4
//((物品A索引,物品B索引),物品A与B的相似度)
val rdd4 = rdd2.map(f => (f._1,cal(f._2._1,f._2._2)))
//排序
val rdd5 = rdd4.map(f=>(f._1._1,(f._1._2,f._2))).sortByKey().map(f=>(f._2._1,(f._1,f._2._2))).sortByKey()

获取相似度矩阵

1
2
3
4
5
//(物品索引,该物品与其他物品(按索引从小到大排列)相似度的数组)
val rdd6 = rdd5.map(f=>(f._1,f._2._2)).groupByKey().sortByKey().map(f=>(f._1,f._2.toArray))
val rdd7 = rdd6.map(f => IndexedRow(f._1,Vectors.dense(f._2)) )
//相似度矩阵
val rdd8 = new IndexedRowMatrix(rdd7)

读取用户偏好文件

1
val data = sc.textFile("hdfs://master:9000/user/hive/warehouse/pdata.db/user_weight")

处理用户偏好文件

1
2
3
4
5
6
7
//(用户ID,(物品索引,评分))
val user1 = data.map{
line => val lines = line.split(":")
(lines(2),(lines(0).toInt,lines(3).toDouble))
}.sortByKey().cache()
//((物品ID,用户ID),评分)
val user2 = user1.map(f=> ((f._2._1,f._1),f._2._2))

获取所有的用户集

1
val users = user1.map(f=>f._1).distinct().sortBy(f =>f )

生成用户对所有物品的评分集

由于user2是稀疏的,所以需要补上其他位置的0

1
2
3
4
5
6
7
8
val mm = items cartesian users
//给所有的组合赋值0
val mm1 = mm.map(f=>(f,0.0))
//将零集与真实评分集合并
val mm2 = mm1 union user2
//获取完整的评分集
val mm3 = mm2.groupByKey().map(f=>(f._1,f._2.reduce(_ + _))).sortByKey()
val mm4 = mm3.map(f=>(f._1._1,(f._1._2,f._2)))

生成用户对物品的评分矩阵

1
2
3
4
//(物品索引,用户(用户ID从小到大)对该物品的评分数组)
val mm5 = mm4.map(f=>(f._1,f._2._2)).groupByKey().sortByKey().map(f=>(f._1,f._2.toArray))
val mm6 = mm5.map(f => IndexedRow(f._1,(Vectors.dense(f._2))))
val mm7 = new IndexedRowMatrix(mm6)

生成推荐结果

矩阵乘法采用分块法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//10为分块数
val mm8 = rdd8.toBlockMatrix().multiply(mm7.toBlockMatrix())
//矩阵转置
val mm9 = mm8.transpose.toIndexedRowMatrix()
//(Long,String),用户索引(从0开始的连续整数)与用户ID
val users1 = users.zipWithIndex().map(f=>(f._2,f._1)).sortByKey()
//把索引都换成ID
val mm10 = mm9.rows.map(f=>(f.index,f.vector)).sortByKey().join(users1)
.map(f=>(f._2._2,f._2._1)).map{
f=>
val temp = f._2.toArray.zipWithIndex.map(m=>(m._2,m._1))
.map(f=>(adIdxs(f._1),f._2)).sortWith(_._2 > _._2).toList
(f._1,temp)
}
//保存运算结果
mm10.saveAsTextFile("hdfs://master:9000/addata/recres.data")

  1. 生成的结果集格式为(userid,[(itemid1,score1),(itemid2,score2),…,(itemidn,scoren)])
    且score按照倒序排列
  2. 当前结果没有去重(即用户已评分物品)

后话

这只是一个demo版本,还存在不少问题。

热评文章