因为导入CSV格式数据到hive的时候,会出现错行错列的现象,经过排查,主要原因是CSV文件内容中含有换行符(\r\n,\n,\r)和转义符(\)导致。
正常我们使用一下建表语句默认支持CSV文件解析(关键设置:ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde')
如果出现类似如下数据则会出现数据错行错列现象:
因为都是历史数据,源头(数据提供方)也不愿意修改,只能自己通过写相关小程序(小脚本)去预处理这些特殊情况,第一反应其实是直接使用linux的sed命令去处理,但是水平有限,作罢。
自己主要是写Java比较多,但是Java不方便在内网调试(内网数据不让拷贝出来,内网无开发环境),内网服务器正好有安装Python,那就试着用Python去写吧(也算是第一次真正写Python)
啰里啰嗦一大堆,上代码吧,写得不好还望各位指正。
#!/usr/bin/python3 # -*- coding:utf-8 -*- #处理csv文件中换行符等特殊字符(\r\n,\n,\r,\\) #python handle_csv_newline_char.py filepath import os import sys import csv import codecs import time #需要处理的文件 来自参数1(filepath) filename = sys.argv[1] print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename, ']开始处理') with codecs.open(filename, 'r', encoding='utf-8') as srcFile, codecs.open(filename + '.tmp', 'w', encoding='utf-8') as dstFile: fileReader = csv.reader(srcFile, delimiter=',', quotechar='"', escapechar='\\') fileWriter = csv.writer(dstFile, quoting=csv.QUOTE_ALL, lineterminator = '\n') for d in list(fileReader): for ii,dd in enumerate(d): if dd.find('\r\n') != -1: dd = dd.replace('\r\n', ' ') if dd.find('\n') != -1: dd = dd.replace('\n', ' ') if dd.find('\r') != -1: dd = dd.replace('\r', ' ') if dd.find('\\') != -1: dd = dd.replace('\\', '') d[ii] = dd fileWriter.writerow(d) dstFile.close() srcFile.close() #重命名 os.remove(filename) os.rename(filename + '.tmp', filename) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),'[', filename, ']处理完成') |
我们这边的测试数据是一共差不多60个压缩包(每天1个、2个月的),需要先解压,每个压缩包中4个.csv文件,然后会有240个左右.csv文件的待入库。
通过以下shell命令可以批量解压测试目录中所有.tar.gz的压缩包
文件大小最大的1G左右,内容行数120万行,单个文件测试处理时间为2分钟左右,全量处理(近1亿行数据)差不多2小时。
单个文件处理参考如下:
批量处理脚本参考如下:
以上完美解决了最近遇到的问题,可能会有更好的办法,这次先简单记录一下此次问题的处理过程。
文章来源:微信公众号【志明】