avatar

目录
一种处理Sqoop导出过程中数据的方法

一、Java代码调用Sqoop API导出数据

当前测试用大数据集群版本:cdh6.3.2,Sqoop依赖包的版本为1.4.7-cdh6.3.2。调用Sqoop API的Java代码如下:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package blog;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;


/**
* @author 0x3E6
* @date 2020/02/05 20:58 PM
*/
@Slf4j
public class App {

private static String[] getArgs() {
String tb = "tb1";
return new String[]{
"--connect", "jdbc:mysql://192.168.0.101:3306/test?useSSL=false",
"--username", "test",
"--password", "G00d!1uck",
"--table", tb,
"--hcatalog-database", "test",
"--hcatalog-table", tb
};
}

private static int execSqoop(String toolName, String[] args) throws Exception {
String[] expandArguments = OptionsFileUtil.expandArguments(args);
SqoopTool tool = SqoopTool.getTool(toolName);
Configuration conf = new Configuration();
// 以本地模式运行MapReduce程序,这样可以调试MapReduce的整个过程,连Mapper中的map方法都能调试
conf.set("mapreduce.framework.name", "local");
conf.set("custom.checkColumn", "id");
conf.set("custom.lastValue", "2");
Configuration loadPlugins = SqoopTool.loadPlugins(conf);
Sqoop sqoop = new Sqoop(tool, loadPlugins);
return Sqoop.runSqoop(sqoop, expandArguments);
}

static void exportData() throws Exception {
log.info("{}", execSqoop("export", getArgs()));
}

public static void main(String[] args) throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
BasicConfigurator.configure();
System.setProperty("HADOOP_USER_NAME", "hdfs");
exportData();
}
}

二、部分导出过程分析

ExportTool中

Code
1
2
3
4
5
6
private void exportTable(SqoopOptions options, String tableName) {
...
// INSERT-based export.
// 调用MySQLManager.exportTable
manager.exportTable(context);
}

而MySQLManager的继承关系如下:

Code
1
MySQLManager->InformationSchemaManager->CatalogQueryManager->GenericJdbcManager->SqlManager

且只有SqlManager实现了exportTable方法,所以实际调用的SqlManager的exportTable方法:

Code
1
2
3
4
5
6
7
8
9
/**
* Export data stored in HDFS into a table in a database.
*/
public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
throws IOException, ExportException {
context.setConnManager(this);
JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
exportJob.runExport();
}

在exportJob.runExport方法中执行了配置MapReduce相关操作,由于JdbcExportJob继承了ExportJobBase且未重写runExport方法,所以实际调用的ExportJobBase中的runExport方法:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void runExport() throws ExportException, IOException {

......

Job job = createJob(conf);
try {
// Set the external jar to use for the job.
job.getConfiguration().set("mapred.jar", ormJarFile);
if (options.getMapreduceJobName() != null) {
job.setJobName(options.getMapreduceJobName());
}

propagateOptionsToJob(job);
if (isHCatJob) {
LOG.info("Configuring HCatalog for export job");
SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance();
hCatUtils.configureHCat(options, job, cmgr, tableName,
job.getConfiguration());
}
// 使用hcatalog导出数据时,配置的InputFormat类为org.apache.sqoop.mapreduce.hcat.SqoopHCatExportFormat
configureInputFormat(job, tableName, tableClassName, null);
// 从ExportJobBase的getOutputFormatClass方法获取,若不是batch模式则使用的OutPutFormat类为org.apache.sqoop.mapreduce.ExportOutputFormat
configureOutputFormat(job, tableName, tableClassName);
// 当前this对象为JdbcExportJob,调用的ExportJobBase的configureMapper,
// 其中调用JdbcExportJob中的getMapperClass方法,该方法中判断用了hcatalog则直接返回
// SqoopHCatUtilities.getExportMapperClass()即org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper
configureMapper(job, tableName, tableClassName);
configureNumTasks(job);
cacheJars(job, context.getConnManager());

jobSetup(job);
setJob(job);
boolean success = runJob(job);
if (!success) {
LOG.error("Export job failed!");
throw new ExportException("Export job failed!");
}

if (options.isValidationEnabled()) {
validateExport(tableName, conf, job);
}
......
}
}

三、一种处理Sqoop导出过程中数据的方法

如上面runExport方法中注释所述,导出时配置的Mapper通过SqoopHCatUtilities.getExportMapperClass()获取,实际获取的是该工具类中的static变量exportMapperClass,该成员变量在SqoopHCatUtilities的static代码块中赋值为类org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper,且整个导出过程都未修改,SqoopHCatExportMapper内容参考SqoopHCatExportMapper.java

因此,可在运行Sqoop命令之前,修改SqoopHCatUtilitiesexportMapperClass的值,添加自定义逻辑,对导出的数据进行处理或过滤。

如,有一个打印导出过程中所有数据的需求(当然实际上肯定没有这么无聊的需求),将SqoopHCatExportMapper类拷出来,修改为如下内容:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package blog;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
import org.apache.sqoop.mapreduce.hcat.SqoopHCatExportHelper;

import java.io.IOException;

/**
* A mapper that works on combined hcat splits.
*/
public class ModifiedSqoopHCatExportMapper
extends
AutoProgressMapper<WritableComparable, HCatRecord,
SqoopRecord, WritableComparable> {
public static final Log LOG = LogFactory
.getLog(ModifiedSqoopHCatExportMapper.class.getName());
private SqoopHCatExportHelper helper;

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);

Configuration conf = context.getConfiguration();
helper = new SqoopHCatExportHelper(conf);
}

@Override
public void map(WritableComparable key, HCatRecord value,
Context context)
throws IOException, InterruptedException {
// value的class为org.apache.hive.hcatalog.data.DefaultHCatRecord
// 继承链:org.apache.hive.hcatalog.data.DefaultHCatRecord->org.apache.hive.hcatalog.data.HCatRecord->java.lang.Object
// Context中还可获取更多参数,如Configuration等。
SqoopRecord record = helper.convertToSqoopRecord(value);
LOG.info("===" + record.getFieldMap().toString() + "===");
context.write(record, NullWritable.get());
}

}

主要就是在map方法中将数据打印出来。

再在前面的main方法中,调用导出数据的命令之前,设置修改后的Mapper:

java
1
2
3
4
5
6
7
8
public static void main(String[] args) throws Exception {
Logger.getRootLogger().setLevel(Level.INFO);
BasicConfigurator.configure();
System.setProperty("HADOOP_USER_NAME", "hdfs");
// 指定Mapper为修改后的类
SqoopHCatUtilities.setExportMapperClass(ModifiedSqoopHCatExportMapper.class);
exportData();
}

日志中打印的数据如下:

Code
1
2
3
4
...
14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper - ==={s=s3, id=3, ft=3.3}===
14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper - ==={s=s4, id=4, ft=4.4}===
...

参考链接

文章作者: 0x3E6
文章链接: http://longwang.live/2020/02/21/%E4%B8%80%E7%A7%8D%E5%A4%84%E7%90%86Sqoop%E5%AF%BC%E5%87%BA%E8%BF%87%E7%A8%8B%E4%B8%AD%E6%95%B0%E6%8D%AE%E7%9A%84%E6%96%B9%E6%B3%95/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 0x3E6的博客
打赏
  • 微信
    微信
  • 支付寶
    支付寶