企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 分析 为什么需要用mapreduce去访问hbase的数据? ——加快分析速度和扩展分析能力 Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用 ![](https://box.kancloud.cn/197b9d0d56a8fd2e8d80d1070b2a3e15_737x418.png) # 代码 ## 从Hbase中读取数据分析写入hdfs ~~~ package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class HbaseReader { public static String t_user_info = "t_user_info"; //这边泛型决定出去 static class HdfsSinkMapper extends TableMapper<Text, NullWritable> { //key代表row key,value代表这一行结果 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { byte[] bytes = key.copyBytes(); //把row key变为string String rowkey = new String(bytes); byte[] usernameBytes = value.getValue("base_info".getBytes(), "username".getBytes()); String username = new String(usernameBytes); context.write(new Text(rowkey + "\t" + username), NullWritable.get()); } } //reduce从map中拿数据 static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); Job job = Job.getInstance(conf); job.setJarByClass(HbaseReader.class); // job.setMapperClass(HdfsSinkMapper.class); Scan scan = new Scan(); //初始化 TableMapReduceUtil.initTableMapperJob(t_user_info, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job); job.setReducerClass(HdfsSinkReducer.class); FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/output")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } } ~~~ ## 从hdfs中读取数据写入Hbase ~~~ package com.study; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class HbaseSinker { public static String flow_fields_import = "flow_fields_import"; //正常读取文件 static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //进不来 String line = value.toString(); String[] fields = line.split(" "); String phone = fields[0]; String url = fields[1]; FlowBean bean = new FlowBean(phone, url); context.write(bean, NullWritable.get()); } } //输出到hbase static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable> { @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Put put = new Put(key.getPhone().getBytes()); put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes()); context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); //创建表,有就删除 boolean tableExists = hBaseAdmin.tableExists(flow_fields_import); if (tableExists) { hBaseAdmin.disableTable(flow_fields_import); hBaseAdmin.deleteTable(flow_fields_import); } HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import)); HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("f1".getBytes()); desc.addFamily(hColumnDescriptor); hBaseAdmin.createTable(desc); Job job = Job.getInstance(conf); job.setJarByClass(HbaseSinker.class); job.setMapperClass(HbaseSinkMrMapper.class); TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/data")); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Mutation.class); job.waitForCompletion(true); } } ~~~