本文分享一下老虎最近做的项目学到的一些东西,由于系统的局限性,后采用物品属性来计算相似度,实现推荐算法。
简单说明
item : 物品
user : 用户
rating : 用户对物品的评分结果
数据准备
需要准备两份数据:物品的属性向量集和用户对物品的偏好集。
本文数据均存储于hdfs,其他的数据来源自行修改RDD来源就好。
物品的属性向量集
物品的属性向量集大致如下:
1 | #每一行表示一个物品,物品的属性值按照一个固定的设置好的顺序依次排列(分隔符可自定义,本例使用“:”和空格) |
注:
属性的提取取决于需求,本文不设置属性的权值,也即所有属性权值为1,需要权值的同学可以采用多元回归或者其他算法去提取(以后有机会再起一文)
用户对物品的偏好集
格式为:
1
2
3
4
5 #用户id:物品id:评分(分隔符可自定义,本例为“:”)
itemid1:userid1:rating1_1
itemid1:userid2:rating1_2
itemid2:userid1:rating2_1
...
注:
评分等于用户对物品的行为乘权重的总和,各行为的权值可自行拿捏,也可用算法去推导,行为应包含显性(用户购买物品等)和隐形(用户查看物品等)等方面。
生成物品相似度矩阵
IndexedRowMatrix矩阵默认是从0开始的连续索引,故物品属性集实际还多一列:
1
2
3
4 # index为从0开始的连续索引
index1:itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
index2:itemid2:attr2_1 attr2_2 attr2_3 attr2_4 ...
...
用户偏好集也多一列:1
2
3
4
5#index与item的对应关系必须和物品属性集
index1:itemid1:userid1:rating1_1
index1:itemid1:userid2:rating1_2
index2:itemid2:userid1:rating2_1
...
初始化spark运行环境
1 | val conf = new SparkConf().setAppName("MatrixCal") |
不推荐在conf是设置master值,硬编码…
读取物品属性文件
1 | val rdd = sc.textFile("hdfs://master:9000/user/root/advertising") |
处理物品属性文件
将物品属性集处理成(索引,(物品ID,属性数组))1
2
3
4
5
6//index1:itemid1:attr1_1 attr1_2 attr1_3 attr1_4 ...
val itemsdata = rdd.map{
line => val lines = line.split(":")
val attrs = lines(2).split(" ").map(_.toInt)
(lines(0).toInt,(lines(1).toInt,attrs))
}.sortByKey().cache()
索引与itemid的隐射关系与物品索引集
保存索引与itemid的对应关系和物品索引集,以备后用1
2val idxs = itemsdata.map(f=>(f._1,f._2._1)).collectAsMap()
val items = itemsdata.map(f=>(f._1)).distinct().sortBy(f =>f )
获取物品与物品的属性集
1 | //(物品索引,物品属性数组) |
此处有一个笛卡尔积运算[cartesian],可实现前一个RDD与后一个RDD依次进行组合,形成一个新的RDD,KEY(此处KEY为二元组的前一个属性,VALUE指后一个,下同。)为两个RDD中的各一个KEY组成的元组,VALUE为RDD中对应KEY的VALUE组成的元组。
例:rdd1=[(a,1),(b,2)],rdd2=[(c,3),(d,4)]
rdd1 cartesian rdd2 =[((a,c)(1,3)),((a,d)(1,4)),((b,c)(2,3)),((b,d)(2,4))]
注意[]不是数组,只是为了形象的表示RDD
定义相似度算法
本例采用欧氏距离计算相似度1
2
3
4
5
6
7def cal(attr1:Array[Int],attr2:Array[Int]) :Double={
var sum = 0
for(i <- 0 to attr1.length-1){
sum += (attr1(i) - attr2(i))*(attr1(i) - attr2(i))
}
1/(1+sqrt(sum))
}
计算物品间的相似度
1 | //((物品A索引,物品B索引),物品A与B的相似度) |
获取相似度矩阵
1 | //(物品索引,该物品与其他物品(按索引从小到大排列)相似度的数组) |
读取用户偏好文件
1 | val data = sc.textFile("hdfs://master:9000/user/hive/warehouse/pdata.db/user_weight") |
处理用户偏好文件
1 | //(用户ID,(物品索引,评分)) |
获取所有的用户集
1 | val users = user1.map(f=>f._1).distinct().sortBy(f =>f ) |
生成用户对所有物品的评分集
由于user2是稀疏的,所以需要补上其他位置的01
2
3
4
5
6
7
8val mm = items cartesian users
//给所有的组合赋值0
val mm1 = mm.map(f=>(f,0.0))
//将零集与真实评分集合并
val mm2 = mm1 union user2
//获取完整的评分集
val mm3 = mm2.groupByKey().map(f=>(f._1,f._2.reduce(_ + _))).sortByKey()
val mm4 = mm3.map(f=>(f._1._1,(f._1._2,f._2)))
生成用户对物品的评分矩阵
1 | //(物品索引,用户(用户ID从小到大)对该物品的评分数组) |
生成推荐结果
矩阵乘法采用分块法实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//10为分块数
val mm8 = rdd8.toBlockMatrix().multiply(mm7.toBlockMatrix())
//矩阵转置
val mm9 = mm8.transpose.toIndexedRowMatrix()
//(Long,String),用户索引(从0开始的连续整数)与用户ID
val users1 = users.zipWithIndex().map(f=>(f._2,f._1)).sortByKey()
//把索引都换成ID
val mm10 = mm9.rows.map(f=>(f.index,f.vector)).sortByKey().join(users1)
.map(f=>(f._2._2,f._2._1)).map{
f=>
val temp = f._2.toArray.zipWithIndex.map(m=>(m._2,m._1))
.map(f=>(adIdxs(f._1),f._2)).sortWith(_._2 > _._2).toList
(f._1,temp)
}
//保存运算结果
mm10.saveAsTextFile("hdfs://master:9000/addata/recres.data")
- 生成的结果集格式为(userid,[(itemid1,score1),(itemid2,score2),…,(itemidn,scoren)])
且score按照倒序排列- 当前结果没有去重(即用户已评分物品)
后话
这只是一个demo版本,还存在不少问题。