1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| val conf = new SparkConf().setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf)
val data = ... val hConf = HBaseConfiguration.create() hConf.set("hbase.zookeeper.quorum","10.10.40.112") hConf.set("hbase.zookeeper.property.clientPort","2181") //hConf.set("hbase.rootdir","hdfs://10.10.40.111:9000/hbase113") //hConf.setBoolean("hbase.cluster.distributed", true) //hConf.setInt("hbase.client.scanner.caching", 2000) //hConf.set("zookeeper.znode.parent","/hbase") hConf.set("hbase.defaults.for.version.skip","true") hConf.set(TableOutputFormat.OUTPUT_TABLE,"user_m_info" val job = new Job(hConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
dData..map{ case (mid,tag,value) => val put = new Put(Bytes.toBytes(mid)) put.add("m".getBytes,tag.getBytes,Bytes.toBytes(value) (new ImmutableBytesWritable(),put) }.saveAsNewAPIHadoopDataset(job.getConfiguration)
|