avatar

目录
一种处理Hive元数据与文件类型不同时SQL查询失败的方法(二)

继上一篇之后,又发现了一种新的报错位置,本篇对这种情况进行处理,并验证这种处理方式是否适用于Hive on Spark环境。

一、异常触发SQL

构造测试数据
(1) 建表,插入数据

sql
1
2
3
create table t1(id float,content string) stored as parquet;
insert into t1 vlaues(1.1,'content1'),(2.2,'content2');
create table error_type(id int,content string) stored as parquet;

(2) 拷贝文件到类型不兼容的表

Code
1
hdfs dfs -cp /user/hive/warehouse/testdb.db/t1/000000_0 /user/hive/warehouse/testdb.db/error_type/

在前面两步之后,执行如下SQL:

sql
1
select * from error_type where content='content1';

报错并有如下错误日志:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row [Error getting row data with exception java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.io.IntWritable
at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36)
at org.apache.hadoop.hive.serde2.SerDeUtils.buildJSONString(SerDeUtils.java:227)
at org.apache.hadoop.hive.serde2.SerDeUtils.buildJSONString(SerDeUtils.java:364)
at org.apache.hadoop.hive.serde2.SerDeUtils.getJSONString(SerDeUtils.java:200)
at org.apache.hadoop.hive.serde2.SerDeUtils.getJSONString(SerDeUtils.java:186)
at org.apache.hadoop.hive.ql.exec.MapOperator.toErrorMessage(MapOperator.java:520)
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:489)
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133)
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232)
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
]
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:494) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) ~[scala-library-2.11.12.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
... 3 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.io.IntWritable
at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.get(WritableIntObjectInspector.java:36) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:251) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:55) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:732) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.FilterOperator.process(FilterOperator.java:126) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:882) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:130) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:146) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:484) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:133) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85) ~[hive-exec-2.1.1-cdh6.3.0.jar:2.1.1-cdh6.3.0]
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42) ~[scala-library-2.11.12.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) ~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) ~[scala-library-2.11.12.jar:?]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$12.apply(AsyncRDDActions.scala:127) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2232) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.scheduler.Task.run(Task.scala:121) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) ~[spark-core_2.11-2.4.0-cdh6.3.0.jar:2.4.0-cdh6.3.0]
... 3 more

二、异常处理

其中org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231)函数中有序列化每个字段的逻辑:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Serialize a row of data.
*
* @param obj
* The row object
* @param objInspector
* The ObjectInspector for the row object
* @return The serialized Writable object
* @throws IOException
* @see SerDe#serialize(Object, ObjectInspector)
*/
@Override
public Writable doSerialize(Object obj, ObjectInspector objInspector)
throws SerDeException {

if (objInspector.getCategory() != Category.STRUCT) {
throw new SerDeException(getClass().toString()
+ " can only serialize struct types, but we got: "
+ objInspector.getTypeName());
}

// Prepare the field ObjectInspectors
StructObjectInspector soi = (StructObjectInspector) objInspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
List<Object> list = soi.getStructFieldsDataAsList(obj);
List<? extends StructField> declaredFields = (serdeParams.getRowTypeInfo() != null && ((StructTypeInfo) serdeParams.getRowTypeInfo())
.getAllStructFieldNames().size() > 0) ? ((StructObjectInspector) getObjectInspector())
.getAllStructFieldRefs()
: null;

serializeStream.reset();
serializedSize = 0;

// Serialize each field
for (int i = 0; i < fields.size(); i++) {
// Append the separator if needed.
if (i > 0) {
serializeStream.write(serdeParams.getSeparators()[0]);
}
// Get the field objectInspector and the field object.
ObjectInspector foi = fields.get(i).getFieldObjectInspector();
Object f = (list == null ? null : list.get(i));

if (declaredFields != null && i >= declaredFields.size()) {
throw new SerDeException("Error: expecting " + declaredFields.size()
+ " but asking for field " + i + "\n" + "data=" + obj + "\n"
+ "tableType=" + serdeParams.getRowTypeInfo().toString() + "\n"
+ "dataType="
+ TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector));
}

serializeField(serializeStream, f, foi, serdeParams);
}

// TODO: The copy of data is unnecessary, but there is no work-around
// since we cannot directly set the private byte[] field inside Text.
serializeCache
.set(serializeStream.getData(), 0, serializeStream.getLength());
serializedSize = serializeStream.getLength();
lastOperationSerialize = true;
lastOperationDeserialize = false;
return serializeCache;
}

for循环中的最后一行serializeField(serializeStream, f, foi, serdeParams);调用的即是异常堆栈中的org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247)函数:

java
1
2
3
4
5
6
7
8
9
protected void serializeField(ByteStream.Output out, Object obj, ObjectInspector objInspector,
LazySerDeParameters serdeParams) throws SerDeException {
try {
serialize(out, obj, objInspector, serdeParams.getSeparators(), 1, serdeParams.getNullSequence(),
serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams.getNeedsEscape());
} catch (IOException e) {
throw new SerDeException(e);
}
}

serializeField中调用的serialize函数为异常堆栈中的org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292)函数:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Serialize the row into the StringBuilder.
*
* @param out
* The StringBuilder to store the serialized data.
* @param obj
* The object for the current field.
* @param objInspector
* The ObjectInspector for the current Object.
* @param separators
* The separators array.
* @param level
* The current level of separator.
* @param nullSequence
* The byte sequence representing the NULL value.
* @param escaped
* Whether we need to escape the data when writing out
* @param escapeChar
* Which char to use as the escape char, e.g. '\\'
* @param needsEscape
* Which byte needs to be escaped for 256 bytes.
* @throws IOException
* @throws SerDeException
*/
public static void serialize(ByteStream.Output out, Object obj,
ObjectInspector objInspector, byte[] separators, int level,
Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape)
throws IOException, SerDeException {

if (obj == null) {
out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
return;
}

char separator;
List<?> list;
switch (objInspector.getCategory()) {
case PRIMITIVE:
LazyUtils.writePrimitiveUTF8(out, obj,
(PrimitiveObjectInspector) objInspector, escaped, escapeChar,
needsEscape);
return;
case LIST:
......
return;
case MAP:
......
return;
case STRUCT:
......
return;
case UNION:
......
return;
default:
break;
}

throw new RuntimeException("Unknown category type: "
+ objInspector.getCategory());
}

在这个函数中,调用后续针对特定类型的函数对特定类型进行序列化,类型不兼容时则抛出异常。可以看到当前字段数据为空时,有如下逻辑:

java
1
2
3
4
if (obj == null) {
out.write(nullSequence.getBytes(), 0, nullSequence.getLength());
return;
}

所以还是可以在LazySimpleSerDe.doSerialize函数中处理每个字段的逻辑中,捕获ClassCastException,并参考serialize函数这种逻辑写入空值,将LazySimpleSerDe.doSerialize

java
1
serializeField(serializeStream, f, foi, serdeParams);

改成

java
1
2
3
4
5
try {
serializeField(serializeStream, f, foi, serdeParams);
} catch (ClassCastException | UnsupportedOperationException e) {
serializeStream.write(serdeParams.getNullSequence().getBytes(), 0, serdeParams.getNullSequence().getLength());
}

三、Hive on Spark依赖的Hive jar包部署

上面代码修改后,用前一篇文章中的copy_jars.sh脚本将hive*.jar部署后,Hive默认的MR执行引擎已经可以执行本文开始提到的会报错的SQL,但是当Hive使用Spark作为执行引擎时(如beeline中可通过set hive.execution.engine=spark;设置),仍然会报错,猜测Spark使用的Hive依赖包在另外的位置也存放了一份。

从前面的日志可以看出,一部分日志后面都显示了hive-exec-2.1.1-cdh6.3.0.jar这个jar包名,在部署了CDH的主机上搜索这个jar包:

shell
1
2
3
4
5
6
7
[root@dev-master2 tmp]# find / -name hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/cm/cloudera-navigator-server/libs/cdh6/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/cm/cloudera-scm-telepub/libs/cdh6/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hive/lib/hive-exec-2.1.1-cdh6.3.0.jar
/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/oozie/embedded-oozie-server/webapp/WEB-INF/lib/hive-exec-2.1.1-cdh6.3.0.jar

看起来/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/hive-exec-2.1.1-cdh6.3.0.jar这个就是Spark使用的Hive依赖包存放位置,且这个目录下只有一个jar包:

shell
1
2
3
[root@dev-master2 tmp]# ls -lh /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/
total 35M
-rw-r--r--. 1 root root 35M Jul 19 2019 hive-exec-2.1.1-cdh6.3.0.jar

所以在copy_jars.sh中添加一句

Code
1
cp $current_dir/hive-exec-2.1.1-cdh6.3.0.jar /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/

再重新部署,经测试,Hive on Spark已经可以查询类型不兼容的类型,结果显示为空值。

文章作者: 0x3E6
文章链接: http://longwang.live/2022/03/24/%E4%B8%80%E7%A7%8D%E5%A4%84%E7%90%86Hive%E5%85%83%E6%95%B0%E6%8D%AE%E4%B8%8E%E6%96%87%E4%BB%B6%E7%B1%BB%E5%9E%8B%E4%B8%8D%E5%90%8C%E6%97%B6SQL%E6%9F%A5%E8%AF%A2%E5%A4%B1%E8%B4%A5%E7%9A%84%E6%96%B9%E6%B3%95%EF%BC%88%E4%BA%8C%EF%BC%89/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 0x3E6的博客
打赏
  • 微信
    微信
  • 支付寶
    支付寶