一、Java代码调用Sqoop API导出数据 当前测试用大数据集群版本:cdh6.3.2,Sqoop依赖包的版本为1.4.7-cdh6.3.2。调用Sqoop API的Java代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package blog;import lombok.extern.slf4j.Slf4j;import org.apache.hadoop.conf.Configuration;import org.apache.log4j.BasicConfigurator;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.sqoop.Sqoop;import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;import org.apache.sqoop.tool.SqoopTool;import org.apache.sqoop.util.OptionsFileUtil;@Slf 4jpublic class App { private static String[] getArgs() { String tb = "tb1" ; return new String[]{ "--connect" , "jdbc:mysql://192.168.0.101:3306/test?useSSL=false" , "--username" , "test" , "--password" , "G00d!1uck" , "--table" , tb, "--hcatalog-database" , "test" , "--hcatalog-table" , tb }; } private static int execSqoop (String toolName, String[] args) throws Exception { String[] expandArguments = OptionsFileUtil.expandArguments(args); SqoopTool tool = SqoopTool.getTool(toolName); Configuration conf = new Configuration(); conf.set("mapreduce.framework.name" , "local" ); conf.set("custom.checkColumn" , "id" ); conf.set("custom.lastValue" , "2" ); Configuration loadPlugins = SqoopTool.loadPlugins(conf); Sqoop sqoop = new Sqoop(tool, loadPlugins); return Sqoop.runSqoop(sqoop, expandArguments); } static void exportData () throws Exception { log.info("{}" , execSqoop("export" , getArgs())); } public static void main (String[] args) throws Exception { Logger.getRootLogger().setLevel(Level.INFO); BasicConfigurator.configure(); System.setProperty("HADOOP_USER_NAME" , "hdfs" ); exportData(); } }
二、部分导出过程分析 ExportTool中
1 2 3 4 5 6 private void exportTable(SqoopOptions options, String tableName) { ... // INSERT-based export. // 调用MySQLManager.exportTable manager.exportTable(context); }
而MySQLManager的继承关系如下:
1 MySQLManager->InformationSchemaManager->CatalogQueryManager->GenericJdbcManager->SqlManager
且只有SqlManager实现了exportTable方法,所以实际调用的SqlManager的exportTable方法:
1 2 3 4 5 6 7 8 9 /** * Export data stored in HDFS into a table in a database. */ public void exportTable(org.apache.sqoop.manager.ExportJobContext context) throws IOException, ExportException { context.setConnManager(this); JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator()); exportJob.runExport(); }
在exportJob.runExport方法中执行了配置MapReduce相关操作,由于JdbcExportJob继承了ExportJobBase且未重写runExport方法,所以实际调用的ExportJobBase中的runExport方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public void runExport() throws ExportException, IOException { ...... Job job = createJob(conf); try { // Set the external jar to use for the job. job.getConfiguration().set("mapred.jar", ormJarFile); if (options.getMapreduceJobName() != null) { job.setJobName(options.getMapreduceJobName()); } propagateOptionsToJob(job); if (isHCatJob) { LOG.info("Configuring HCatalog for export job"); SqoopHCatUtilities hCatUtils = SqoopHCatUtilities.instance(); hCatUtils.configureHCat(options, job, cmgr, tableName, job.getConfiguration()); } // 使用hcatalog导出数据时,配置的InputFormat类为org.apache.sqoop.mapreduce.hcat.SqoopHCatExportFormat configureInputFormat(job, tableName, tableClassName, null); // 从ExportJobBase的getOutputFormatClass方法获取,若不是batch模式则使用的OutPutFormat类为org.apache.sqoop.mapreduce.ExportOutputFormat configureOutputFormat(job, tableName, tableClassName); // 当前this对象为JdbcExportJob,调用的ExportJobBase的configureMapper, // 其中调用JdbcExportJob中的getMapperClass方法,该方法中判断用了hcatalog则直接返回 // SqoopHCatUtilities.getExportMapperClass()即org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper configureMapper(job, tableName, tableClassName); configureNumTasks(job); cacheJars(job, context.getConnManager()); jobSetup(job); setJob(job); boolean success = runJob(job); if (!success) { LOG.error("Export job failed!"); throw new ExportException("Export job failed!"); } if (options.isValidationEnabled()) { validateExport(tableName, conf, job); } ...... } }
三、一种处理Sqoop导出过程中数据的方法 如上面runExport方法中注释所述,导出时配置的Mapper通过SqoopHCatUtilities.getExportMapperClass()
获取,实际获取的是该工具类中的static变量exportMapperClass
,该成员变量在SqoopHCatUtilities
的static代码块中赋值为类org.apache.sqoop.mapreduce.hcat.SqoopHCatExportMapper
,且整个导出过程都未修改,SqoopHCatExportMapper
内容参考SqoopHCatExportMapper.java 。
因此,可在运行Sqoop命令之前,修改SqoopHCatUtilities
的exportMapperClass
的值,添加自定义逻辑,对导出的数据进行处理或过滤。
如,有一个打印导出过程中所有数据的需求(当然实际上肯定没有这么无聊的需求),将SqoopHCatExportMapper
类拷出来,修改为如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package blog;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.WritableComparable;import org.apache.hive.hcatalog.data.HCatRecord;import org.apache.sqoop.lib.SqoopRecord;import org.apache.sqoop.mapreduce.AutoProgressMapper;import org.apache.sqoop.mapreduce.hcat.SqoopHCatExportHelper;import java.io.IOException;public class ModifiedSqoopHCatExportMapper extends AutoProgressMapper <WritableComparable , HCatRecord , SqoopRecord , WritableComparable > { public static final Log LOG = LogFactory .getLog(ModifiedSqoopHCatExportMapper.class .getName ()) ; private SqoopHCatExportHelper helper; @Override protected void setup (Context context) throws IOException, InterruptedException { super .setup(context); Configuration conf = context.getConfiguration(); helper = new SqoopHCatExportHelper(conf); } @Override public void map (WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException { SqoopRecord record = helper.convertToSqoopRecord(value); LOG.info("===" + record.getFieldMap().toString() + "===" ); context.write(record, NullWritable.get()); } }
主要就是在map方法中将数据打印出来。
再在前面的main方法中,调用导出数据的命令之前,设置修改后的Mapper:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws Exception { Logger.getRootLogger().setLevel(Level.INFO); BasicConfigurator.configure(); System.setProperty("HADOOP_USER_NAME" , "hdfs" ); SqoopHCatUtilities.setExportMapperClass(ModifiedSqoopHCatExportMapper.class ) ; exportData(); }
日志中打印的数据如下:
1 2 3 4 ... 14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper - ==={s=s3, id=3, ft=3.3}=== 14587 [LocalJobRunner Map Task Executor #0] INFO blog.ModifiedSqoopHCatExportMapper - ==={s=s4, id=4, ft=4.4}=== ...
参考链接