MapReduce和Spark写入Hbase多表总结
作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处
大家都知道用mapreduce或者spark写入已知的hbase中的表时,直接在mapreduce或者spark的driver class中声明如下代码
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tablename);
随后mapreduce在mapper或者reducer中直接context写入即可,而spark则是构造好包含Put的PairRDDFunctions后saveAsHadoopDataset即可.
而经常会碰到一些要求是根据输入数据,处理后需要写入hbase多个表或者表名是未知的,需要按照数据中某个字段来构造表名写入hbase.
由于表名未知,所以不能设置TableOutputFormat.OUTPUT_TABLE,那么这种要求也容易实现,分别总结mapreduce和spark的实现方法(其实到最后会发现殊途同归)
一.MapReduce写入Hbase多表
在MR的main方法中加入如下代码即可
job.setOutputFormatClass(MultiTableOutputFormat.class);
随后就可以在mapper或者reducer的context中根据相关字段构造表名和put写入多个hbase表.
二.Spark写入Hbase多表
这里直接用我测试过的spark streaming程序写入多个hbase表,上代码
object SparkStreamingWriteToHbase {<br/> def main(args: Array[String]): Unit = {<br/> var masterUrl = "yarn-client"<br/> if (args.length > 0) {<br/> masterUrl = args(0)<br/> }<br/> val conf = new SparkConf().setAppName("Write to several tables of Hbase").setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(5)) val topics = Set("app_events") val brokers = PropertiesUtil.getValue("BROKER_ADDRESS") val kafkaParams = Map[String, String](<br/> "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") val hbaseTableSuffix = "_clickcounts" val hConf = HBaseConfiguration.create()<br/> val zookeeper = PropertiesUtil.getValue("ZOOKEEPER_ADDRESS")<br/> hConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper) val jobConf = new JobConf(hConf, this.getClass) val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val appUserClicks = kafkaDStreams.flatMap(rdd => {<br/> val data = JSONObject.fromObject(rdd._2)<br/> Some(data)<br/> }).map{jsonLine =><br/> val key = jsonLine.getString("appId") + "_" + jsonLine.getString("uid")<br/> val value = jsonLine.getString("click_count")<br/> (key, value)<br/> } val result = appUserClicks.map { item =><br/> val rowKey = item._1<br/> val value = item._2<br/> convertToHbasePut(rowKey, value, hbaseTableSuffix)<br/> } result.foreachRDD { rdd =><br/> rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf)<br/> } ssc.start()<br/> ssc.awaitTermination()<br/> } def convertToHbasePut(key: String, value: String, tableNameSuffix: String): (ImmutableBytesWritable, Put) = {<br/> val rowKey = key<br/> val tableName = rowKey.split("_")(0) + tableNameSuffix<br/> val put = new Put(Bytes.toBytes(rowKey))<br/> put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(value))<br/> (new ImmutableBytesWritable(Bytes.toBytes(tableName)), put)<br/> } }
简单描述下,这里spark streaming中处理的是从kafka中读取的json数据,其中的appId字段用来构造tablename区分写入不同的hbase table.最后以saveAsNewAPIHadoopFile把rdd写入hbase表
进入saveAsNewAPIHadoopFile会发现其实和mapreduce的配置没什么区别,如下
def saveAsNewAPIHadoopFile(<br/> path: String,<br/> keyClass: Class[_],<br/> valueClass: Class[_],<br/> outputFormatClass: Class[_ <: NewOutputFormat[_, _]],<br/> conf: Configuration = self.context.hadoopConfiguration)<br/> {<br/> // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).<br/> val hadoopConf = conf<br/> val job = new NewAPIHadoopJob(hadoopConf)<br/> job.setOutputKeyClass(keyClass)<br/> job.setOutputValueClass(valueClass)<br/> job.setOutputFormatClass(outputFormatClass)<br/> job.getConfiguration.set("mapred.output.dir", path)<br/> saveAsNewAPIHadoopDataset(job.getConfiguration)<br/> }
这个方法的参数分别是ouput path,这里写入hbase,传入为空即可,其他参数outputKeyClass,outputValueClass,outputFormatClass,jobconf
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢