博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用MapReduce将HDFS数据导入Mysql
阅读量:4308 次
发布时间:2019-06-06

本文共 6447 字,大约阅读时间需要 21 分钟。

使用MapReduce

将HDFS数据导入Mysql,代码示例

package com.zhen.mysqlToHDFS;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.lib.db.DBWritable;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * @author FengZhen * 将hdfs数据导入mysql * 使用DBOutputFormat将HDFS路径下的结构化数据写入mysql中,结构化数据如下,第一列为key,后边三列为数据 * 0    1    Enzo    180.66 * 1    2    Din    170.666 *  */public class DBOutputFormatApp extends Configured implements Tool{    /**     * JavaBean     * 需要实现Hadoop序列化接口Writable以及与数据库交互时的序列化接口DBWritable     * 官方API中解释如下:     * public class DBInputFormat
* extends InputFormat
implements Configurable * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean */ public static class BeanWritable implements Writable, DBWritable { private int id; private String name; private double height; public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getInt(1); this.name = resultSet.getString(2); this.height = resultSet.getDouble(3); } public void write(PreparedStatement preparedStatement) throws SQLException { preparedStatement.setInt(1, id); preparedStatement.setString(2, name); preparedStatement.setDouble(3, height); } public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readInt(); this.name = dataInput.readUTF(); this.height = dataInput.readDouble(); } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(id); dataOutput.writeUTF(name); dataOutput.writeDouble(height); } public void set(int id,String name,double height){ this.id = id; this.name = name; this.height = height; } @Override public String toString() { return id + "\t" + name + "\t" + height; } } public static class DBOutputMapper extends Mapper
{ private NullWritable outputKey; private BeanWritable outputValue; @Override protected void setup(Mapper
.Context context) throws IOException, InterruptedException { this.outputKey = NullWritable.get(); this.outputValue = new BeanWritable(); } @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { //插入数据库成功的计数器 final Counter successCounter = context.getCounter("exec", "successfully"); //插入数据库失败的计数器 final Counter faildCounter = context.getCounter("exec", "faild"); //解析结构化数据 String[] fields = value.toString().split("\t"); //DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始 if (fields.length > 3) { int id = Integer.parseInt(fields[1]); String name = fields[2]; double height = Double.parseDouble(fields[3]); this.outputValue.set(id, name, height); context.write(outputKey, outputValue); //如果插入数据库成功则递增1,表示成功计数 successCounter.increment(1L); }else{ //如果插入数据库失败则递增1,表示失败计数 faildCounter.increment(1L); } } } /** * 输出的key必须是继承自DBWritable的类型,DBOutputFormat要求输出的key必须是DBWritable类型 * */ public static class DBOutputReducer extends Reducer
{ @Override protected void reduce(NullWritable key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { for (BeanWritable beanWritable : values) { context.write(beanWritable, key); } } } public int run(String[] arg0) throws Exception { Configuration configuration = getConf(); //在创建Configuration的时候紧接着配置数据库连接信息 DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop", "root", "123qwe"); Job job = Job.getInstance(configuration, DBOutputFormatApp.class.getSimpleName()); job.setJarByClass(DBOutputFormatApp.class); job.setMapperClass(DBOutputMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(BeanWritable.class); job.setReducerClass(DBOutputReducer.class); job.setOutputFormatClass(DBOutputFormat.class); job.setOutputKeyClass(BeanWritable.class); job.setOutputValueClass(NullWritable.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.setInputPaths(job, arg0[0]); //配置当前作业输出到数据库表、字段信息 DBOutputFormat.setOutput(job, "people", new String[]{ "id","name","height"}); return job.waitForCompletion(true)?0:1; } public static int createJob(String[] args){ Configuration conf = new Configuration(); conf.set("dfs.datanode.socket.write.timeout", "7200000"); conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456"); conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912"); int status = 0; try { status = ToolRunner.run(conf,new DBOutputFormatApp(), args); } catch (Exception e) { e.printStackTrace(); } return status; } public static void main(String[] args) { args = new String[]{ "/user/hadoop/mapreduce/mysqlToHdfs/people"}; int status = createJob(args); System.exit(status); } }

打成jar包,放在服务器上,执行hadoop jar命令

hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/HDFSToMysql.jar com.zhen.mysqlToHDFS.DBOutputFormatApp

任务结束后mysql表中即可发现数据已经有了。

转载于:https://www.cnblogs.com/EnzoDin/p/8429992.html

你可能感兴趣的文章
vnpy通过jqdatasdk初始化实时数据及历史数据下载
查看>>
设计模式19_状态
查看>>
设计模式20_观察者
查看>>
vnpy学习10_常见坑02
查看>>
用时三个月,终于把所有的Python库全部整理了!拿去别客气!
查看>>
pd.stats.ols.MovingOLS以及替代
查看>>
vnpy学习11_增加测试评估指标
查看>>
资金流入流出计算方法
查看>>
海龟交易法则07_如何衡量风险
查看>>
海龟交易法则08_风险与资金管理
查看>>
海龟交易法则09_海龟式积木
查看>>
海龟交易法则10_通用积木
查看>>
海龟交易法则14_掌控心魔
查看>>
海龟交易法则16_附原版海龟交易法则
查看>>
克罗谈投资策略01_期货交易中的墨菲法则
查看>>
克罗谈投资策略02_赢家和输家
查看>>
克罗谈投资策略03_你所期望的赌博方式
查看>>
克罗谈投资策略04_感觉与现实
查看>>
通向财务自由之路01_导读
查看>>
通向财务自由之路02_成功的决定因素:你
查看>>