spark读写Hbase

写HBase

方式一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)

val data = ... //要写HBase的RDD
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum","10.10.40.112")
hConf.set("hbase.zookeeper.property.clientPort","2181")
//hConf.set("hbase.rootdir","hdfs://10.10.40.111:9000/hbase113")
//hConf.setBoolean("hbase.cluster.distributed", true)
//hConf.setInt("hbase.client.scanner.caching", 2000)
//hConf.set("zookeeper.znode.parent","/hbase")

hConf.set("hbase.defaults.for.version.skip","true")
hConf.set(TableOutputFormat.OUTPUT_TABLE,"user_m_info")//要写入的表名
val job = new Job(hConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

dData..map{
case (mid,tag,value) =>
val put = new Put(Bytes.toBytes(mid))
put.add("m".getBytes,tag.getBytes,Bytes.toBytes(value))//列族为m
(new ImmutableBytesWritable(),put)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)

方式二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)

val data = ... //要写HBase的RDD
data.foreachPartition{x=>
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum","192.168.0.180")
hConf.set("hbase.zookeeper.property.clientPort","2181")
hConf.set("hbase.defaults.for.version.skip","true")
val table = new HTable(hConf,TableName.valueOf("user_m_info"))//表名
table.setAutoFlush(false,false)
table.setWriteBufferSize(3*1024*1024)
x.foreach{y=>
val put = new Put(Bytes.toBytes(y._1._1))
put.addColumn("m".getBytes,(y._1._2.toString).getBytes,Bytes.toBytes(y._2.toString))
table.put(put)
}
table.flushCommits()
}

读HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)

val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum","10.10.40.112")
hConf.set("hbase.zookeeper.property.clientPort","2181")
//hConf.set("hbase.rootdir","hdfs://10.10.40.111:9000/hbase113")
//hConf.setBoolean("hbase.cluster.distributed", true)
//hConf.setInt("hbase.client.scanner.caching", 2000)
//hConf.set("zookeeper.znode.parent","/hbase")
hConf.set("hbase.defaults.for.version.skip","true")
hConf.set(TableInputFormat.INPUT_TABLE,"user_m_info")

val data = sc.newAPIHadoopRDD(hConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

data.foreach(println)

热评文章