1.kerberos验证

a.下载对应版本JCE(Java Cryptography Extension),解压拷贝local_policy.jar/US_export_policy.jar到$JAVA_HOME//jre/lib/security

b.在resource即classes路径下添加hbase-site.xml

<configuration><br/>
    <!-- 访问的hbase集群的名字 --><br/>
    <property><br/>
        <name>hbase.cluster.name</name><br/>
        <value>${hbase.cluster.name}</value><br/>
    </property>

    <!-- rpc调用的超时 单位ms--><br/>
    <property><br/>
        <name>hbase.rpc.timeout</name><br/>
        <value>200</value><br/>
    </property>

    <!-- API的超时  --><br/>
    <property><br/>
        <name>hbase.client.operation.timeout</name><br/>
        <value>200</value><br/>
        <discription>in ms</discription><br/>
    </property>

    <!-- 失败重试次数 --><br/>
    <property><br/>
        <name>hbase.client.retries.number</name><br/>
        <value>2</value><br/>
    </property>

    <!-- 客户端并发调用HBaseClient API的线程数 --><br/>
    <property><br/>
        <name>hbase.client.tablepool.maxsize</name><br/>
        <value>30</value><br/>
    </property><br/>
</configuration>

c.加JVM启动参数设置验证参数

-Dhadoop.property.hadoop.security.authentication=kerberos<br/>
-Djava.security.krb5.conf=${conf_path}/krb5-hadoop.conf<br/>
-Dhadoop.property.hadoop.client.keytab.file=${conf_path}/${kerberos_principal}.keytab<br/>
-Dhadoop.property.hadoop.client.kerberos.principal=${kerberos_principal}@XIAOMI.HADOOP

具体的读写代码就不列了,网上例子比较多。

2.MapReduce批量写数据到Hbase

import org.apache.commons.lang.StringUtils;<br/>
import org.apache.hadoop.conf.Configuration;<br/>
import org.apache.hadoop.fs.Path;<br/>
import org.apache.hadoop.hbase.HBaseConfiguration;<br/>
import org.apache.hadoop.hbase.client.Put;<br/>
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;<br/>
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;<br/>
import org.apache.hadoop.hbase.mapreduce.TableReducer;<br/>
import org.apache.hadoop.hbase.util.Bytes;<br/>
import org.apache.hadoop.io.IntWritable;<br/>
import org.apache.hadoop.io.LongWritable;<br/>
import org.apache.hadoop.io.Text;<br/>
import org.apache.hadoop.mapreduce.Job;<br/>
import org.apache.hadoop.mapreduce.Mapper;<br/>
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WriteToHbase {<br/>
    private static final String INPUT = "";<br/>
    //指定hbase集群地址及表名<br/>
    private static final String TABLE = "hbase://hytst-staging/namespace:tablename";

    //读源文件<br/>
    public static class SourceMapper<br/>
            extends Mapper<LongWritable, Text, Text, Text>{<br/>
        //something<br/>
    }

    //写入hbase<br/>
    public static class WriteReducer<br/>
            extends TableReducer<Text,IntWritable,ImmutableBytesWritable> {<br/>
        private byte[] family = "W".getBytes();//列簇<br/>
        private byte[] qualifier = "i".getBytes();//子列<br/>
        private int rowDone;<br/>
        private long startTime;<br/>
        @Override<br/>
        protected void setup(Context context) throws IOException, InterruptedException {<br/>
            rowDone = 0;<br/>
            startTime = System.currentTimeMillis();<br/>
            super.setup(context);<br/>
        }

        public void reduce(Text key, Iterable<Text> values, Context context)  {<br/>
            byte[] rowkey = key.getBytes();<br/>
            Put put = new Put(rowkey);<br/>
            put.add(family, qualifier, Bytes.toBytes(StringUtils.join(values.iterator(), ",")));<br/>
            context.write(new ImmutableBytesWritable(rowkey), put);<br/>
            //或者如下<br/>
            /*HTable table = new HTable(context.getConfiguration(), TABLE);<br/>
            table.put(put);<br/>
            table.close();*/<br/>
            ++rowDone;<br/>
            //限制写QPS, 800/s<br/>
            TableMapReduceUtil.limitScanRate(800, rowDone, System.currentTimeMillis() - startTime);<br/>
        }<br/>
    }

    public static void main(String[] args) throws Exception {<br/>
        Configuration conf = HBaseConfiguration.create();<br/>
        Job job = Job.getInstance(conf, "HdfsToHbase");<br/>
        job.setJarByClass(WriteToHbase.class);<br/>
        // Turn off speculative to avoid write to hbase more than once<br/>
        job.setSpeculativeExecution(false);

        job.setMapperClass(SourceMapper.class);<br/>
        job.setMapOutputKeyClass(Text.class);<br/>
        job.setMapOutputValueClass(IntWritable.class);<br/>
        FileInputFormat.addInputPath(job, new Path(INPUT));<br/>
     //初始化TableReduceJob<br/>
        TableMapReduceUtil.initTableReducerJob(TABLE, WriteReducer.class, job);<br/>
        job.setNumReduceTasks(2);

        System.exit(job.waitForCompletion(true) ? 0 : 1);<br/>
    }<br/>
}