diff --git a/README.md b/README.md index 315c72ac07..5699be4b25 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,15 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。 +- [datax_v202309](https://github.com/alibaba/DataX/releases/tag/datax_v202309) + - 支持Phoenix 同步数据添加 where条件 + - 支持华为 GuassDB读写插件 + - 修复ClickReader 插件运行报错 Can't find bundle for base name + - 增加 DataX调试模块 + - 修复 orc空文件报错问题 + - 优化obwriter性能 + - txtfilewriter 增加导出为insert语句功能支持 + - [datax_v202308](https://github.com/alibaba/DataX/releases/tag/datax_v202308) - OTS 插件更新 - databend 插件更新 diff --git a/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java b/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java index bf3cad12c4..cfa6be999b 100644 --- a/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java +++ b/clickhousereader/src/main/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReader.java @@ -27,8 +27,6 @@ public class ClickhouseReader extends Reader { private static final Logger LOG = LoggerFactory.getLogger(ClickhouseReader.class); public static class Job extends Reader.Job { - private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(ClickhouseReader.class); - private Configuration jobConfig = null; private CommonRdbmsReader.Job commonRdbmsReaderMaster; diff --git a/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java b/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java deleted file mode 100644 index a409402045..0000000000 --- a/clickhousereader/src/test/java/com/alibaba/datax/plugin/reader/clickhousereader/ClickhouseReaderTest.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.alibaba.datax.plugin.reader.clickhousereader; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.dataxservice.face.eventcenter.EventLogStore; -import com.alibaba.datax.dataxservice.face.eventcenter.RuntimeContext; -import com.alibaba.datax.test.simulator.BasicReaderPluginTest; -import com.alibaba.datax.test.simulator.junit.extend.log.LoggedRunner; -import com.alibaba.datax.test.simulator.junit.extend.log.TestLogger; -import com.alibaba.fastjson.JSON; - -import org.apache.commons.lang3.ArrayUtils; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; - - -@RunWith(LoggedRunner.class) -@Ignore -public class ClickhouseReaderTest extends BasicReaderPluginTest { - @TestLogger(log = "测试basic1.json. 配置常量.") - @Test - public void testBasic1() { - RuntimeContext.setGlobalJobId(-1); - EventLogStore.init(); - List noteRecordForTest = new ArrayList(); - - List subjobs = super.doReaderTest("basic1.json", 1, noteRecordForTest); - - Assert.assertEquals(1, subjobs.size()); - Assert.assertEquals(1, noteRecordForTest.size()); - - Assert.assertEquals("[8,16,32,64,-8,-16,-32,-64,\"3.2\",\"6.4\",1,\"str_col\",\"abc\"," + "\"417ddc5d-e556-4d27-95dd-a34d84e46a50\",1580745600000,1580752800000,\"hello\",\"[1,2,3]\"," + "\"[\\\"abc\\\",\\\"cde\\\"]\",\"(8,'uint8_type')\",null,\"[1,2]\",\"[\\\"x\\\",\\\"y\\\"]\",\"127.0.0.1\",\"::\",\"23.345\"]", JSON.toJSONString(listData(noteRecordForTest.get(0)))); - } - - @Override - protected OutputStream buildDataOutput(String optionalOutputName) { - File f = new File(optionalOutputName + "-output.txt"); - try { - return new FileOutputStream(f); - } catch (FileNotFoundException e) { - e.printStackTrace(); - } - return null; - } - - @Override - public String getTestPluginName() { - return "clickhousereader"; - } - - private Object[] listData(Record record) { - if (null == record) { - return ArrayUtils.EMPTY_OBJECT_ARRAY; - } - Object[] arr = new Object[record.getColumnNumber()]; - for (int i = 0; i < arr.length; i++) { - Column col = record.getColumn(i); - if (null != col) { - arr[i] = col.getRawData(); - } - } - return arr; - } -} diff --git a/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java b/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java index f688d1639f..df5e1e4a17 100755 --- a/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java +++ b/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java @@ -5,6 +5,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.sql.Time; import java.util.Date; /** @@ -12,18 +13,54 @@ */ public class DateColumn extends Column { - private DateType subType = DateType.DATETIME; - - public static enum DateType { - DATE, TIME, DATETIME - } - - /** - * 构建值为null的DateColumn,使用Date子类型为DATETIME - * */ - public DateColumn() { - this((Long)null); - } + private DateType subType = DateType.DATETIME; + + private int nanos = 0; + + private int precision = -1; + + public static enum DateType { + DATE, TIME, DATETIME + } + + /** + * 构建值为time(java.sql.Time)的DateColumn,使用Date子类型为TIME,只有时间,没有日期 + */ + public DateColumn(Time time, int nanos, int jdbcPrecision) { + this(time); + if (time != null) { + setNanos(nanos); + } + if (jdbcPrecision == 10) { + setPrecision(0); + } + if (jdbcPrecision >= 12 && jdbcPrecision <= 17) { + setPrecision(jdbcPrecision - 11); + } + } + + public long getNanos() { + return nanos; + } + + public void setNanos(int nanos) { + this.nanos = nanos; + } + + public int getPrecision() { + return precision; + } + + public void setPrecision(int precision) { + this.precision = precision; + } + + /** + * 构建值为null的DateColumn,使用Date子类型为DATETIME + */ + public DateColumn() { + this((Long) null); + } /** * 构建值为stamp(Unix时间戳)的DateColumn,使用Date子类型为DATETIME diff --git a/common/src/main/java/com/alibaba/datax/common/util/LimitLogger.java b/common/src/main/java/com/alibaba/datax/common/util/LimitLogger.java new file mode 100644 index 0000000000..a307e0fb4e --- /dev/null +++ b/common/src/main/java/com/alibaba/datax/common/util/LimitLogger.java @@ -0,0 +1,34 @@ +package com.alibaba.datax.common.util; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author jitongchen + * @date 2023/9/7 9:47 AM + */ +public class LimitLogger { + + private static Map lastPrintTime = new HashMap<>(); + + public static void limit(String name, long limit, LoggerFunction function) { + if (StringUtils.isBlank(name)) { + name = "__all__"; + } + if (limit <= 0) { + function.apply(); + } else { + if (!lastPrintTime.containsKey(name)) { + lastPrintTime.put(name, System.currentTimeMillis()); + function.apply(); + } else { + if (System.currentTimeMillis() > lastPrintTime.get(name) + limit) { + lastPrintTime.put(name, System.currentTimeMillis()); + function.apply(); + } + } + } + } +} diff --git a/common/src/main/java/com/alibaba/datax/common/util/LoggerFunction.java b/common/src/main/java/com/alibaba/datax/common/util/LoggerFunction.java new file mode 100644 index 0000000000..ef24504f9f --- /dev/null +++ b/common/src/main/java/com/alibaba/datax/common/util/LoggerFunction.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.common.util; + +/** + * @author molin.lxd + * @date 2021-05-09 + */ +public interface LoggerFunction { + + void apply(); +} diff --git a/hdfsreader/pom.xml b/hdfsreader/pom.xml index a5c2da2c4f..de7c0e2182 100644 --- a/hdfsreader/pom.xml +++ b/hdfsreader/pom.xml @@ -1,5 +1,6 @@ - + datax-all com.alibaba.datax @@ -111,6 +112,42 @@ ${datax-project-version} + + org.apache.parquet + parquet-column + 1.12.0 + + + org.apache.parquet + parquet-avro + 1.12.0 + + + org.apache.parquet + parquet-common + 1.12.0 + + + org.apache.parquet + parquet-format + 2.3.0 + + + org.apache.parquet + parquet-jackson + 1.12.0 + + + org.apache.parquet + parquet-encoding + 1.12.0 + + + org.apache.parquet + parquet-hadoop + 1.12.0 + + diff --git a/hdfsreader/src/main/assembly/package.xml b/hdfsreader/src/main/assembly/package.xml index 3f1393b764..a5f28e5c60 100644 --- a/hdfsreader/src/main/assembly/package.xml +++ b/hdfsreader/src/main/assembly/package.xml @@ -37,6 +37,28 @@ + + + + + + + + + + src/main/libs + + *.* + + plugin/reader/ossreader/libs + + + src/main/libs + + *.* + + plugin/reader/hivereader/libs + diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Constant.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Constant.java index 6bfb9bf7e5..061c55a0ef 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Constant.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Constant.java @@ -10,4 +10,5 @@ public class Constant { public static final String CSV = "CSV"; public static final String SEQ = "SEQ"; public static final String RC = "RC"; + public static final String PARQUET = "PARQUET"; } diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java index 5ba572e1a4..0b297d55fc 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java @@ -9,12 +9,16 @@ import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode; import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; @@ -29,14 +33,30 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.PrimitiveType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Created by mingya.wmy on 2015/8/12. @@ -56,6 +76,10 @@ public class DFSUtil { public static final String HDFS_DEFAULTFS_KEY = "fs.defaultFS"; public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; + private Boolean skipEmptyOrcFile = false; + + private Integer orcFileEmptySize = null; + public DFSUtil(Configuration taskConfig) { hadoopConf = new org.apache.hadoop.conf.Configuration(); @@ -79,6 +103,7 @@ public DFSUtil(Configuration taskConfig) { this.hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos"); } this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath); + this.skipEmptyOrcFile = taskConfig.getBool(Key.SKIP_EMPTY_ORCFILE, false); LOG.info(String.format("hadoopConfig details:%s", JSON.toJSONString(this.hadoopConf))); } @@ -102,10 +127,11 @@ private void kerberosAuthentication(String kerberosPrincipal, String kerberosKey * @param srcPaths 路径列表 * @param specifiedFileType 指定文件类型 */ - public HashSet getAllFiles(List srcPaths, String specifiedFileType) { + public HashSet getAllFiles(List srcPaths, String specifiedFileType, Boolean skipEmptyOrcFile, Integer orcFileEmptySize) { this.specifiedFileType = specifiedFileType; - + this.skipEmptyOrcFile = skipEmptyOrcFile; + this.orcFileEmptySize = orcFileEmptySize; if (!srcPaths.isEmpty()) { for (String eachPath : srcPaths) { LOG.info(String.format("get HDFS all files in path = [%s]", eachPath)); @@ -127,9 +153,13 @@ public HashSet getHDFSAllFiles(String hdfsPath) { FileStatus stats[] = hdfs.globStatus(path); for (FileStatus f : stats) { if (f.isFile()) { - if (f.getLen() == 0) { + long fileLength = f.getLen(); + if (fileLength == 0) { String message = String.format("文件[%s]长度为0,将会跳过不作处理!", hdfsPath); LOG.warn(message); + } else if (BooleanUtils.isTrue(this.skipEmptyOrcFile) && this.orcFileEmptySize != null && fileLength <= this.orcFileEmptySize) { + String message = String.format("The orc file [%s] is empty, file size: %s, DataX will skip it !", f.getPath().toString(), fileLength); + LOG.warn(message); } else { addSourceFileByType(f.getPath().toString()); } @@ -167,7 +197,16 @@ private HashSet getHDFSAllFilesNORegex(String path, FileSystem hdfs) thr LOG.info(String.format("[%s] 是目录, 递归获取该目录下的文件", f.getPath().toString())); getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } else if (f.isFile()) { - + long fileLength = f.getLen(); + if (fileLength == 0) { + String message = String.format("The file [%s] is empty, DataX will skip it !", f.getPath().toString()); + LOG.warn(message); + continue; + } else if (BooleanUtils.isTrue(this.skipEmptyOrcFile) && this.orcFileEmptySize != null && fileLength <= this.orcFileEmptySize) { + String message = String.format("The orc file [%s] is empty, file size: %s, DataX will skip it !", f.getPath().toString(), fileLength); + LOG.warn(message); + continue; + } addSourceFileByType(f.getPath().toString()); } else { String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", @@ -332,7 +371,19 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice //Each file as a split //TODO multy threads // OrcInputFormat getSplits params numSplits not used, splits size = block numbers - InputSplit[] splits = in.getSplits(conf, -1); + InputSplit[] splits; + try { + splits = in.getSplits(conf, 1); + } catch (Exception splitException) { + if (Boolean.TRUE.equals(this.skipEmptyOrcFile)) { + boolean isOrcFileEmptyException = checkIsOrcEmptyFileExecption(splitException); + if (isOrcFileEmptyException) { + LOG.info("skipEmptyOrcFile: true, \"{}\" is an empty orc file, skip it!", sourceOrcFilePath); + return; + } + } + throw splitException; + } for (InputSplit split : splits) { { RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL); @@ -349,8 +400,11 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice Object field = inspector.getStructFieldData(value, fields.get(i)); recordFields.add(field); } + List hivePartitionColumnEntrys = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.HIVE_PARTION_COLUMN); + ArrayList hivePartitionColumns = new ArrayList<>(); + hivePartitionColumns = UnstructuredStorageReaderUtil.getHivePartitionColumns(sourceOrcFilePath, hivePartitionColumnEntrys); transportOneRecord(column, recordFields, recordSender, - taskPluginCollector, isReadAllColumns, nullFormat); + taskPluginCollector, isReadAllColumns, nullFormat,hivePartitionColumns); } reader.close(); } @@ -367,8 +421,20 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice } } + private boolean checkIsOrcEmptyFileExecption(Exception e) { + if (e == null) { + return false; + } + + String fullStackTrace = ExceptionUtils.getStackTrace(e); + if (fullStackTrace.contains("org.apache.orc.impl.ReaderImpl.getRawDataSizeOfColumn") && fullStackTrace.contains("Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1")) { + return true; + } + return false; + } + private Record transportOneRecord(List columnConfigs, List recordFields - , RecordSender recordSender, TaskPluginCollector taskPluginCollector, boolean isReadAllColumns, String nullFormat) { + , RecordSender recordSender, TaskPluginCollector taskPluginCollector, boolean isReadAllColumns, String nullFormat, ArrayList hiveParitionColumns) { Record record = recordSender.createRecord(); Column columnGenerated; try { @@ -693,4 +759,332 @@ private boolean isSequenceFile(String filepath, FSDataInputStream in) { return false; } + public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { + String schemaString = readerSliceConfig.getString(Key.PARQUET_SCHEMA); + if (StringUtils.isNotBlank(schemaString)) { + LOG.info("You config parquet schema, use it {}", schemaString); + } else { + schemaString = getParquetSchema(sourceParquetFilePath, hadoopConf); + LOG.info("Parquet schema parsed from: {} , schema is {}", sourceParquetFilePath, schemaString); + if (StringUtils.isBlank(schemaString)) { + throw DataXException.asDataXException("ParquetSchema is required, please check your config"); + } + } + MessageType parquetSchema = null; + List parquetTypes = null; + Map parquetMetaMap = null; + int fieldCount = 0; + try { + parquetSchema = MessageTypeParser.parseMessageType(schemaString); + fieldCount = parquetSchema.getFieldCount(); + parquetTypes = parquetSchema.getFields(); + parquetMetaMap = ParquetMessageHelper.parseParquetTypes(parquetTypes); + } catch (Exception e) { + String message = String.format("Error parsing to MessageType via Schema string [%s]", schemaString); + LOG.error(message); + throw DataXException.asDataXException(HdfsReaderErrorCode.PARSE_MESSAGE_TYPE_FROM_SCHEMA_ERROR, e); + } + List column = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN); + String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT); + boolean isUtcTimestamp = readerSliceConfig.getBool(Key.PARQUET_UTC_TIMESTAMP, false); + boolean isReadAllColumns = (column == null || column.size() == 0) ? true : false; + LOG.info("ReadingAllColums: " + isReadAllColumns); + + /** + * 支持 hive 表中间加列场景 + * + * 开关默认 false,在 hive表存在中间加列的场景打开,需要根据 name排序 + * 不默认打开的原因 + * 1、存量hdfs任务,只根据 index获取字段,无name字段配置 + * 2、中间加列场景比较少 + * 3、存量任务可能存在列错位的问题,不能随意纠正 + */ + boolean supportAddMiddleColumn = readerSliceConfig.getBool(Key.SUPPORT_ADD_MIDDLE_COLUMN, false); + + boolean printNullValueException = readerSliceConfig.getBool("printNullValueException", false); + List ignoreIndex = readerSliceConfig.getList("ignoreIndex", new ArrayList(), Integer.class); + + JobConf conf = new JobConf(hadoopConf); + ParquetReader reader = null; + try { + Path parquetFilePath = new Path(sourceParquetFilePath); + GroupReadSupport readSupport = new GroupReadSupport(); + readSupport.init(conf, null, parquetSchema); + // 这里初始化parquetReader的时候,会getFileSystem,如果是HA集群,期间会根据hadoopConfig中区加载failover类,这里初始化builder带上conf + ParquetReader.Builder parquetReaderBuilder = ParquetReader.builder(readSupport, parquetFilePath); + parquetReaderBuilder.withConf(hadoopConf); + reader = parquetReaderBuilder.build(); + Group g = null; + + // 从文件名中解析分区信息 + List hivePartitionColumnEntrys = UnstructuredStorageReaderUtil.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.HIVE_PARTION_COLUMN); + ArrayList hivePartitionColumns = new ArrayList<>(); + hivePartitionColumns = UnstructuredStorageReaderUtil.getHivePartitionColumns(sourceParquetFilePath, hivePartitionColumnEntrys); + List schemaFieldList = null; + Map colNameIndexMap = null; + Map indexMap = null; + if (supportAddMiddleColumn) { + boolean nonName = column.stream().anyMatch(columnEntry -> StringUtils.isEmpty(columnEntry.getName())); + if (nonName) { + throw new DataXException("You configured column item without name, please correct it"); + } + List parquetFileFields = getParquetFileFields(parquetFilePath, hadoopConf); + schemaFieldList = parquetFileFields.stream().map(org.apache.parquet.schema.Type::getName).collect(Collectors.toList()); + colNameIndexMap = new ConcurrentHashMap<>(); + Map finalColNameIndexMap = colNameIndexMap; + column.forEach(columnEntry -> finalColNameIndexMap.put(columnEntry.getIndex(), columnEntry.getName())); + Iterator> iterator = finalColNameIndexMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + if (!schemaFieldList.contains(next.getValue())) { + finalColNameIndexMap.remove((next.getKey())); + } + } + LOG.info("SupportAddMiddleColumn is true, fields from parquet file is {}, " + + "colNameIndexMap is {}", JSON.toJSONString(schemaFieldList), JSON.toJSONString(colNameIndexMap)); + fieldCount = column.size(); + indexMap = new HashMap<>(); + for (int j = 0; j < fieldCount; j++) { + if (colNameIndexMap.containsKey(j)) { + int index = findIndex(schemaFieldList, findEleInMap(colNameIndexMap, j)); + indexMap.put(j, index); + } + } + } + while ((g = reader.read()) != null) { + List formattedRecord = new ArrayList(fieldCount); + try { + for (int j = 0; j < fieldCount; j++) { + Object data = null; + try { + if (null != ignoreIndex && !ignoreIndex.isEmpty() && ignoreIndex.contains(j)) { + data = null; + } else { + if (supportAddMiddleColumn) { + if (!colNameIndexMap.containsKey(j)) { + formattedRecord.add(null); + continue; + } else { + data = DFSUtil.this.readFields(g, parquetTypes.get(indexMap.get(j)), indexMap.get(j), parquetMetaMap, isUtcTimestamp); + } + } else { + data = DFSUtil.this.readFields(g, parquetTypes.get(j), j, parquetMetaMap, isUtcTimestamp); + } + } + } catch (RuntimeException e) { + if (printNullValueException) { + LOG.warn(e.getMessage()); + } + } + formattedRecord.add(data); + } + transportOneRecord(column, formattedRecord, recordSender, taskPluginCollector, isReadAllColumns, nullFormat, hivePartitionColumns); + } catch (Exception e) { + throw DataXException.asDataXException(HdfsReaderErrorCode.READ_PARQUET_ERROR, e); + } + } + } catch (Exception e) { + throw DataXException.asDataXException(HdfsReaderErrorCode.READ_PARQUET_ERROR, e); + } finally { + org.apache.commons.io.IOUtils.closeQuietly(reader); + } + } + + private String findEleInMap(Map map, Integer key) { + Iterator> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry next = iterator.next(); + if (key.equals(next.getKey())) { + return next.getValue(); + } + } + return null; + } + + private int findIndex(List schemaFieldList, String colName) { + for (int i = 0; i < schemaFieldList.size(); i++) { + if (schemaFieldList.get(i).equals(colName)) { + return i; + } + } + return -1; + } + + private List getParquetFileFields(Path filePath, org.apache.hadoop.conf.Configuration configuration) { + try (org.apache.parquet.hadoop.ParquetFileReader reader = org.apache.parquet.hadoop.ParquetFileReader.open(HadoopInputFile.fromPath(filePath, configuration))) { + org.apache.parquet.schema.MessageType schema = reader.getFooter().getFileMetaData().getSchema(); + List fields = schema.getFields(); + return fields; + } catch (IOException e) { + LOG.error("Fetch parquet field error", e); + throw new DataXException(String.format("Fetch parquet field error, msg is %s", e.getMessage())); + } + } + + private String getParquetSchema(String sourceParquetFilePath, org.apache.hadoop.conf.Configuration hadoopConf) { + GroupReadSupport readSupport = new GroupReadSupport(); + ParquetReader.Builder parquetReaderBuilder = ParquetReader.builder(readSupport, new Path(sourceParquetFilePath)); + ParquetReader reader = null; + try { + parquetReaderBuilder.withConf(hadoopConf); + reader = parquetReaderBuilder.build(); + Group g = null; + if ((g = reader.read()) != null) { + return g.getType().toString(); + } + } catch (Throwable e) { + LOG.error("Inner error, getParquetSchema failed, message is {}", e.getMessage()); + } finally { + org.apache.commons.io.IOUtils.closeQuietly(reader); + } + return null; + } + + /** + * parquet 相关 + */ + private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588; + private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); + private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + + private long julianDayToMillis(int julianDay) { + return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; + } + + private org.apache.parquet.schema.OriginalType getOriginalType(org.apache.parquet.schema.Type type, Map parquetMetaMap) { + ParquetMeta meta = parquetMetaMap.get(type.getName()); + return meta.getOriginalType(); + } + + private org.apache.parquet.schema.PrimitiveType asPrimitiveType(org.apache.parquet.schema.Type type, Map parquetMetaMap) { + ParquetMeta meta = parquetMetaMap.get(type.getName()); + return meta.getPrimitiveType(); + } + + private Object readFields(Group g, org.apache.parquet.schema.Type type, int index, Map parquetMetaMap, boolean isUtcTimestamp) { + if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.MAP) { + Group groupData = g.getGroup(index, 0); + List parquetTypes = groupData.getType().getFields(); + JSONObject data = new JSONObject(); + for (int i = 0; i < parquetTypes.size(); i++) { + int j = groupData.getFieldRepetitionCount(i); + // map key value 的对数 + for (int k = 0; k < j; k++) { + Group groupDataK = groupData.getGroup(0, k); + List parquetTypesK = groupDataK.getType().getFields(); + if (2 != parquetTypesK.size()) { + // warn: 不是key value成对出现 + throw new RuntimeException(String.format("bad parquet map type: %s", groupData.getValueToString(index, 0))); + } + Object subDataKey = this.readFields(groupDataK, parquetTypesK.get(0), 0, parquetMetaMap, isUtcTimestamp); + Object subDataValue = this.readFields(groupDataK, parquetTypesK.get(1), 1, parquetMetaMap, isUtcTimestamp); + if (StringUtils.equalsIgnoreCase("key", parquetTypesK.get(0).getName())) { + ((JSONObject) data).put(subDataKey.toString(), subDataValue); + } else { + ((JSONObject) data).put(subDataValue.toString(), subDataKey); + } + } + } + return data; + } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.MAP_KEY_VALUE) { + Group groupData = g.getGroup(index, 0); + List parquetTypes = groupData.getType().getFields(); + JSONObject data = new JSONObject(); + for (int i = 0; i < parquetTypes.size(); i++) { + int j = groupData.getFieldRepetitionCount(i); + // map key value 的对数 + for (int k = 0; k < j; k++) { + Group groupDataK = groupData.getGroup(0, k); + List parquetTypesK = groupDataK.getType().getFields(); + if (2 != parquetTypesK.size()) { + // warn: 不是key value成对出现 + throw new RuntimeException(String.format("bad parquet map type: %s", groupData.getValueToString(index, 0))); + } + Object subDataKey = this.readFields(groupDataK, parquetTypesK.get(0), 0, parquetMetaMap, isUtcTimestamp); + Object subDataValue = this.readFields(groupDataK, parquetTypesK.get(1), 1, parquetMetaMap, isUtcTimestamp); + if (StringUtils.equalsIgnoreCase("key", parquetTypesK.get(0).getName())) { + ((JSONObject) data).put(subDataKey.toString(), subDataValue); + } else { + ((JSONObject) data).put(subDataValue.toString(), subDataKey); + } + } + } + return data; + } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.LIST) { + Group groupData = g.getGroup(index, 0); + List parquetTypes = groupData.getType().getFields(); + JSONArray data = new JSONArray(); + for (int i = 0; i < parquetTypes.size(); i++) { + Object subData = this.readFields(groupData, parquetTypes.get(i), i, parquetMetaMap, isUtcTimestamp); + data.add(subData); + } + return data; + } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.DECIMAL) { + Binary binaryDate = g.getBinary(index, 0); + if (null == binaryDate) { + return null; + } else { + org.apache.hadoop.hive.serde2.io.HiveDecimalWritable decimalWritable = new org.apache.hadoop.hive.serde2.io.HiveDecimalWritable(binaryDate.getBytes(), this.asPrimitiveType(type, parquetMetaMap).getDecimalMetadata().getScale()); + // g.getType().getFields().get(1).asPrimitiveType().getDecimalMetadata().getScale() + HiveDecimal hiveDecimal = decimalWritable.getHiveDecimal(); + if (null == hiveDecimal) { + return null; + } else { + return hiveDecimal.bigDecimalValue(); + } + // return decimalWritable.doubleValue(); + } + } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.DATE) { + return java.sql.Date.valueOf(LocalDate.ofEpochDay(g.getInteger(index, 0))); + } else if (this.getOriginalType(type, parquetMetaMap) == org.apache.parquet.schema.OriginalType.UTF8) { + return g.getValueToString(index, 0); + } else { + if (type.isPrimitive()) { + PrimitiveType.PrimitiveTypeName primitiveTypeName = this.asPrimitiveType(type, parquetMetaMap).getPrimitiveTypeName(); + if (PrimitiveType.PrimitiveTypeName.BINARY == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.BOOLEAN == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.DOUBLE == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.FLOAT == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.INT32 == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.INT64 == primitiveTypeName) { + return g.getValueToString(index, 0); + } else if (PrimitiveType.PrimitiveTypeName.INT96 == primitiveTypeName) { + Binary dataInt96 = g.getInt96(index, 0); + if (null == dataInt96) { + return null; + } else { + ByteBuffer buf = dataInt96.toByteBuffer(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = buf.getLong(); + int julianDay = buf.getInt(); + if (isUtcTimestamp) { + // UTC + LocalDate localDate = LocalDate.ofEpochDay(julianDay - JULIAN_EPOCH_OFFSET_DAYS); + LocalTime localTime = LocalTime.ofNanoOfDay(timeOfDayNanos); + return Timestamp.valueOf(LocalDateTime.of(localDate, localTime)); + } else { + // local time + long mills = julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); + Timestamp timestamp = new Timestamp(mills); + timestamp.setNanos((int) (timeOfDayNanos % TimeUnit.SECONDS.toNanos(1))); + return timestamp; + } + } + } else { + return g.getValueToString(index, 0); + } + } else { + return g.getValueToString(index, 0); + } + } + } + + } diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsPathFilter.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsPathFilter.java new file mode 100644 index 0000000000..88dd1fa773 --- /dev/null +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsPathFilter.java @@ -0,0 +1,21 @@ +package com.alibaba.datax.plugin.reader.hdfsreader; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * Created by wmy on 16/11/29. + */ +public class HdfsPathFilter implements PathFilter { + + private String regex = null; + + public HdfsPathFilter(String regex) { + this.regex = regex; + } + + @Override + public boolean accept(Path path) { + return regex != null ? path.getName().matches(regex) : true; + } +} diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java index c953ef162e..1d9e90a0e6 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReader.java @@ -41,6 +41,8 @@ public static class Job extends Reader.Job { private String specifiedFileType = null; private DFSUtil dfsUtil = null; private List path = null; + private boolean skipEmptyOrcFile = false; + private Integer orcFileEmptySize = null; @Override public void init() { @@ -115,6 +117,16 @@ public void validate(){ UnstructuredStorageReaderUtil.validateCompress(this.readerOriginConfig); UnstructuredStorageReaderUtil.validateCsvReaderConfig(this.readerOriginConfig); } + if (this.specifiedFileType.equalsIgnoreCase(Constant.ORC)) { + skipEmptyOrcFile = this.readerOriginConfig.getBool(Key.SKIP_EMPTY_ORCFILE, false); + orcFileEmptySize = this.readerOriginConfig.getInt(Key.ORCFILE_EMPTYSIZE); + //将orcFileEmptySize必填项检查去掉,仅需要配置skipEmptyOrcFile即可,考虑历史任务兼容性(For中华保险),保留orcFileEmptySize参数配置 + //if (skipEmptyOrcFile && orcFileEmptySize == null) { + // throw new IllegalArgumentException("When \"skipEmptyOrcFile\" is configured, " + // + "parameter \"orcFileEmptySize\" cannot be null."); + //} + } + LOG.info("skipEmptyOrcFile: {}, orcFileEmptySize: {}", skipEmptyOrcFile, orcFileEmptySize); } @@ -166,7 +178,7 @@ private void validateColumns(){ @Override public void prepare() { LOG.info("prepare(), start to getAllFiles..."); - this.sourceFiles = dfsUtil.getAllFiles(path, specifiedFileType); + this.sourceFiles = dfsUtil.getAllFiles(path, specifiedFileType,skipEmptyOrcFile, orcFileEmptySize); LOG.info(String.format("您即将读取的文件数为: [%s], 列表为: [%s]", this.sourceFiles.size(), StringUtils.join(this.sourceFiles, ","))); @@ -273,7 +285,9 @@ public void startRead(RecordSender recordSender) { }else if(specifiedFileType.equalsIgnoreCase(Constant.RC)){ dfsUtil.rcFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector()); - }else { + } else if (specifiedFileType.equalsIgnoreCase(Constant.PARQUET)) { + dfsUtil.parquetFileStartRead(sourceFile, this.taskConfig, recordSender, this.getTaskPluginCollector()); + } else { String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC五种格式的文件," + "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE 或者 RC"; diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReaderErrorCode.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReaderErrorCode.java index 8dd3f37095..f2caa1a81e 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReaderErrorCode.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/HdfsReaderErrorCode.java @@ -19,7 +19,12 @@ public enum HdfsReaderErrorCode implements ErrorCode { FILE_TYPE_UNSUPPORT("HdfsReader-12", "文件类型目前不支持"), KERBEROS_LOGIN_ERROR("HdfsReader-13", "KERBEROS认证失败"), READ_SEQUENCEFILE_ERROR("HdfsReader-14", "读取SequenceFile文件出错"), - READ_RCFILE_ERROR("HdfsReader-15", "读取RCFile文件出错"),; + READ_RCFILE_ERROR("HdfsReader-15", "读取RCFile文件出错"), + INIT_RCFILE_SERDE_ERROR("HdfsReader-16", "Deserialize RCFile, initialization failed!"), + PARSE_MESSAGE_TYPE_FROM_SCHEMA_ERROR("HdfsReader-17", "Error parsing ParquetSchema"), + INVALID_PARQUET_SCHEMA("HdfsReader-18", "ParquetSchema is invalid"), + READ_PARQUET_ERROR("HdfsReader-19", "Error reading Parquet file"), + CONNECT_HDFS_IO_ERROR("HdfsReader-20", "I/O exception in establishing connection with HDFS"); private final String code; private final String description; diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Key.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Key.java index 7b985a8832..7f9b3a0ab3 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Key.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/Key.java @@ -7,9 +7,60 @@ public final class Key { */ public final static String PATH = "path"; public final static String DEFAULT_FS = "defaultFS"; + public final static String HIVE_VERSION = "hiveVersion"; public static final String FILETYPE = "fileType"; public static final String HADOOP_CONFIG = "hadoopConfig"; public static final String HAVE_KERBEROS = "haveKerberos"; public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath"; + public static final String KERBEROS_CONF_FILE_PATH = "kerberosConfFilePath"; public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal"; + public static final String PATH_FILTER = "pathFilter"; + public static final String PARQUET_SCHEMA = "parquetSchema"; + /** + * hive 3.x 或 cdh高版本,使用UTC时区存储时间戳,如果发现时区偏移,该配置项要配置成 true + */ + public static final String PARQUET_UTC_TIMESTAMP = "parquetUtcTimestamp"; + public static final String SUCCESS_ON_NO_FILE = "successOnNoFile"; + public static final String PROTECTION = "protection"; + + /** + * 用于显示地指定hdfs客户端的用户名 + */ + public static final String HDFS_USERNAME = "hdfsUsername"; + + /** + * ORC FILE空文件大小 + */ + public static final String ORCFILE_EMPTYSIZE = "orcFileEmptySize"; + + /** + * 是否跳过空的OrcFile + */ + public static final String SKIP_EMPTY_ORCFILE = "skipEmptyOrcFile"; + + /** + * 是否跳过 orc meta 信息 + */ + + public static final String SKIP_ORC_META = "skipOrcMetaInfo"; + /** + * 过滤_或者.开头的文件 + */ + public static final String REGEX_PATTERN = "^.*[/][^._].*"; + + public static final String FILTER_TAG_FILE = "filterTagFile"; + + // high level params refs https://github.com/aliyun/alibabacloud-jindodata/blob/master/docs/user/4.x/4.4.0/oss/configuration/jindosdk_configuration_list.md + // + public static final String FS_OSS_DOWNLOAD_QUEUE_SIZE = "ossDownloadQueueSize"; + + // + public static final String FS_OSS_DOWNLOAD_THREAD_CONCURRENCY = "ossDownloadThreadConcurrency"; + + public static final String FS_OSS_READ_READAHEAD_BUFFER_COUNT = "ossDownloadBufferCount"; + + public static final String FILE_SYSTEM_TYPE = "fileSystemType"; + public static final String CDH_3_X_HIVE_VERSION = "3.1.3-cdh"; + + public static final String SUPPORT_ADD_MIDDLE_COLUMN = "supportAddMiddleColumn"; } diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/ParquetMessageHelper.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/ParquetMessageHelper.java new file mode 100644 index 0000000000..e5838d6eff --- /dev/null +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/ParquetMessageHelper.java @@ -0,0 +1,33 @@ +package com.alibaba.datax.plugin.reader.hdfsreader; + +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author jitongchen + * @date 2023/9/7 10:20 AM + */ +public class ParquetMessageHelper { + public static Map parseParquetTypes(List parquetTypes) { + int fieldCount = parquetTypes.size(); + Map parquetMetaMap = new HashMap(); + for (int i = 0; i < fieldCount; i++) { + org.apache.parquet.schema.Type type = parquetTypes.get(i); + String name = type.getName(); + ParquetMeta parquetMeta = new ParquetMeta(); + parquetMeta.setName(name); + OriginalType originalType = type.getOriginalType(); + parquetMeta.setOriginalType(originalType); + if (type.isPrimitive()) { + PrimitiveType primitiveType = type.asPrimitiveType(); + parquetMeta.setPrimitiveType(primitiveType); + } + parquetMetaMap.put(name, parquetMeta); + } + return parquetMetaMap; + } +} diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/ParquetMeta.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/ParquetMeta.java new file mode 100644 index 0000000000..6f99e9b599 --- /dev/null +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/ParquetMeta.java @@ -0,0 +1,38 @@ +package com.alibaba.datax.plugin.reader.hdfsreader; + +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +/** + * @author jitongchen + * @date 2023/9/7 10:20 AM + */ +public class ParquetMeta { + private String name; + private OriginalType originalType; + private PrimitiveType primitiveType; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public OriginalType getOriginalType() { + return originalType; + } + + public void setOriginalType(OriginalType originalType) { + this.originalType = originalType; + } + + public PrimitiveType getPrimitiveType() { + return primitiveType; + } + + public void setPrimitiveType(PrimitiveType primitiveType) { + this.primitiveType = primitiveType; + } +} \ No newline at end of file diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java index a9e157b7d2..09fd272389 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java @@ -27,9 +27,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.Types; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.*; import java.io.IOException; import java.text.SimpleDateFormat; @@ -626,4 +625,61 @@ public static String generateParquetSchemaFromColumnAndType(List } return typeBuilder.named("m").toString(); } + + public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector, Configuration taskConfig) { + MessageType messageType = null; + ParquetFileProccessor proccessor = null; + Path outputPath = new Path(fileName); + String schema = config.getString(Key.PARQUET_SCHEMA); + try { + messageType = MessageTypeParser.parseMessageType(schema); + } catch (Exception e) { + String message = String.format("Error parsing the Schema string [%s] into MessageType", schema); + LOG.error(message); + throw DataXException.asDataXException(HdfsWriterErrorCode.PARSE_MESSAGE_TYPE_FROM_SCHEMA_ERROR, e); + } + + // determine the compression codec + String compress = config.getString(Key.COMPRESS, null); + // be compatible with the old NONE + if ("NONE".equalsIgnoreCase(compress)) { + compress = "UNCOMPRESSED"; + } + CompressionCodecName compressionCodecName = CompressionCodecName.fromConf(compress); + LOG.info("The compression codec used for parquet writing is: {}", compressionCodecName, compress); + try { + proccessor = new ParquetFileProccessor(outputPath, messageType, compressionCodecName, false, taskConfig, taskPluginCollector, hadoopConf); + } catch (Exception e) { + String message = String.format("Initializing ParquetFileProccessor based on Schema[%s] failed.", schema); + LOG.error(message); + throw DataXException.asDataXException(HdfsWriterErrorCode.INIT_PROCCESSOR_FAILURE, e); + } + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); + String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0"; + conf.set(JobContext.TASK_ATTEMPT_ID, attempt); + FileOutputFormat outFormat = new TextOutputFormat(); + outFormat.setOutputPath(conf, outputPath); + outFormat.setWorkOutputPath(conf, outputPath); + try { + Record record = null; + while ((record = lineReceiver.getFromReader()) != null) { + proccessor.write(record); + } + } catch (Exception e) { + String message = String.format("An exception occurred while writing the file file [%s]", fileName); + LOG.error(message); + Path path = new Path(fileName); + deleteDir(path.getParent()); + throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); + } finally { + if (proccessor != null) { + try { + proccessor.close(); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } + } + } diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index 59ec6d18ea..4f8c505a0b 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -415,6 +415,9 @@ public void startWrite(RecordReceiver lineReceiver) { //写ORC FILE hdfsHelper.orcFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName, this.getTaskPluginCollector()); + } else if (fileType.equalsIgnoreCase("PARQUET")) { + //写PARQUET FILE + hdfsHelper.parquetFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName, this.getTaskPluginCollector(), this.writerSliceConfig); } LOG.info("end do write"); diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriterErrorCode.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriterErrorCode.java index a9e1cb30e6..8a729f9787 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriterErrorCode.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriterErrorCode.java @@ -16,7 +16,11 @@ public enum HdfsWriterErrorCode implements ErrorCode { CONNECT_HDFS_IO_ERROR("HdfsWriter-06", "与HDFS建立连接时出现IO异常."), COLUMN_REQUIRED_VALUE("HdfsWriter-07", "您column配置中缺失了必须填写的参数值."), HDFS_RENAME_FILE_ERROR("HdfsWriter-08", "将文件移动到配置路径失败."), - KERBEROS_LOGIN_ERROR("HdfsWriter-09", "KERBEROS认证失败"); + KERBEROS_LOGIN_ERROR("HdfsWriter-09", "KERBEROS认证失败"), + PARSE_MESSAGE_TYPE_FROM_SCHEMA_ERROR("HdfsWriter-10", "Parse parquet schema error"), + + INIT_PROCCESSOR_FAILURE("HdfsWriter-11", "Init processor failed"); + private final String code; private final String description; diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/Key.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/Key.java index 2b1fab9802..05f4cd0a4f 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/Key.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/Key.java @@ -46,4 +46,32 @@ public class Key { public static final String PARQUET_SCHEMA = "parquetSchema"; public static final String PARQUET_MERGE_RESULT = "parquetMergeResult"; + + /** + * hive 3.x 或 cdh高版本,使用UTC时区存储时间戳,如果发现时区偏移,该配置项要配置成 true + */ + public static final String PARQUET_UTC_TIMESTAMP = "parquetUtcTimestamp"; + + // Kerberos + public static final String KERBEROS_CONF_FILE_PATH = "kerberosConfFilePath"; + + // PanguFS + public final static String PANGU_FS_CONFIG = "panguFSConfig"; + public final static String PANGU_FS_CONFIG_NUWA_CLUSTER = "nuwaCluster"; + public final static String PANGU_FS_CONFIG_NUWA_SERVERS = "nuwaServers"; + public final static String PANGU_FS_CONFIG_NUWA_PROXIES = "nuwaProxies"; + public final static String PANGU_FS_CONFIG_CAPABILITY = "capability"; + + + public static final String FS_OSS_UPLOAD_THREAD_CONCURRENCY = "ossUploadConcurrency"; + // + public static final String FS_OSS_UPLOAD_QUEUE_SIZE = "ossUploadQueueSize"; + // + public static final String FS_OSS_UPLOAD_MAX_PENDING_TASKS_PER_STREAM = "ossUploadMaxPendingTasksPerStream"; + + public static final String FS_OSS_BLOCKLET_SIZE_MB = "ossBlockSize"; + + public static final String FILE_SYSTEM_TYPE = "fileSystemType"; + public static final String ENABLE_COLUMN_EXCHANGE = "enableColumnExchange"; + public static final String SUPPORT_HIVE_DATETIME = "supportHiveDateTime"; } diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileProccessor.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileProccessor.java new file mode 100644 index 0000000000..90d0f6e5b6 --- /dev/null +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileProccessor.java @@ -0,0 +1,30 @@ +package com.alibaba.datax.plugin.writer.hdfswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import java.io.IOException; + +/** + * @author jitongchen + * @date 2023/9/7 9:41 AM + */ +public class ParquetFileProccessor extends ParquetWriter { + + public ParquetFileProccessor(Path file, MessageType schema, boolean enableDictionary, Configuration taskConfig, TaskPluginCollector taskPluginCollector, org.apache.hadoop.conf.Configuration configuration) throws IOException { + this(file, schema, CompressionCodecName.UNCOMPRESSED, enableDictionary, taskConfig, taskPluginCollector, configuration); + } + + public ParquetFileProccessor(Path file, MessageType schema, CompressionCodecName codecName, boolean enableDictionary, Configuration taskConfig, TaskPluginCollector taskPluginCollector) throws IOException { + super(file, new ParquetFileSupport(schema, taskConfig, taskPluginCollector), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false, DEFAULT_WRITER_VERSION); + } + + public ParquetFileProccessor(Path file, MessageType schema, CompressionCodecName codecName, boolean enableDictionary, Configuration taskConfig, TaskPluginCollector taskPluginCollector, org.apache.hadoop.conf.Configuration configuration) throws IOException { + super(file, new ParquetFileSupport(schema, taskConfig, taskPluginCollector), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false, DEFAULT_WRITER_VERSION, configuration); + } +} diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java new file mode 100644 index 0000000000..410d52318d --- /dev/null +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/ParquetFileSupport.java @@ -0,0 +1,642 @@ +package com.alibaba.datax.plugin.writer.hdfswriter; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.LimitLogger; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import parquet.column.ColumnDescriptor; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.*; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoField; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author jitongchen + * @date 2023/9/7 9:41 AM + */ +public class ParquetFileSupport extends WriteSupport { + public static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileSupport.class); + private MessageType schema; + private List columns; + private RecordConsumer recordConsumer; + private boolean useRawDataTransf = true; + private boolean printStackTrace = true; + + // 不通类型的nullFormat + private String nullFormat; + + private String dateFormat; + private boolean isUtcTimestamp; + private SimpleDateFormat dateParse; + private Binary binaryForNull; + private TaskPluginCollector taskPluginCollector; + private String dataxParquetMode; + + public ParquetFileSupport(MessageType schema, com.alibaba.datax.common.util.Configuration taskConfig, TaskPluginCollector taskPluginCollector) { + this.schema = schema; + this.columns = schema.getColumns(); + this.useRawDataTransf = taskConfig.getBool(Key.PARQUET_FILE_USE_RAW_DATA_TRANSF, true); + + // 不通类型的nullFormat + this.nullFormat = taskConfig.getString(Key.NULL_FORMAT, Constant.DEFAULT_NULL_FORMAT); + this.binaryForNull = Binary.fromString(this.nullFormat); + + this.dateFormat = taskConfig.getString(Key.DATE_FORMAT, null); + if (StringUtils.isNotBlank(this.dateFormat)) { + this.dateParse = new SimpleDateFormat(dateFormat); + } + + this.isUtcTimestamp = taskConfig.getBool(Key.PARQUET_UTC_TIMESTAMP, false); + + this.taskPluginCollector = taskPluginCollector; + if (taskConfig.getKeys().contains("dataxParquetMode")) { + this.dataxParquetMode = taskConfig.getString("dataxParquetMode"); + } else { + // 默认值是columns + this.dataxParquetMode = "columns"; + } + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(schema, new HashMap()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(Record values) { + if (dataxParquetMode.equalsIgnoreCase("fields")) { + writeBaseOnFields(values); + return; + } + + // NOTE: 下面的实现其实是不对的,只是看代码注释貌似有用户已经在用 + // 所以暂时不动下面的逻辑。 + // 默认走的就是下面的这条代码路径 + if (values != null && columns != null && values.getColumnNumber() == columns.size()) { + recordConsumer.startMessage(); + for (int i = 0; i < columns.size(); i++) { + Column value = values.getColumn(i); + ColumnDescriptor columnDescriptor = columns.get(i); + Type type = this.schema.getFields().get(i); + if (value != null) { + try { + if (this.useRawDataTransf) { + if (value.getRawData() == null) { + continue; + } + recordConsumer.startField(columnDescriptor.getPath()[0], i); + // 原来使用Column->RawData的方法其实是错误的类型转换策略,会将DataX的数据内部表示形象序列化出去 + // 但是 Parquet 已经有用户使用了,故暂时只是配置项切换 + String rawData = value.getRawData().toString(); + switch (columnDescriptor.getType()) { + case BOOLEAN: + recordConsumer.addBoolean(Boolean.parseBoolean(rawData)); + break; + case FLOAT: + recordConsumer.addFloat(Float.parseFloat(rawData)); + break; + case DOUBLE: + recordConsumer.addDouble(Double.parseDouble(rawData)); + break; + case INT32: + OriginalType originalType = type.getOriginalType(); + if (originalType != null && StringUtils.equalsIgnoreCase("DATE", originalType.name())) { + int realVal = (int) (new java.sql.Date(Long.parseLong(rawData)).toLocalDate().toEpochDay()); + recordConsumer.addInteger(realVal); + } else { + recordConsumer.addInteger(Integer.parseInt(rawData)); + } + break; + case INT64: + recordConsumer.addLong(Long.valueOf(rawData)); + break; + case INT96: + recordConsumer.addBinary(timestampColToBinary(value)); + break; + case BINARY: + recordConsumer.addBinary(Binary.fromString(rawData)); + break; + case FIXED_LEN_BYTE_ARRAY: + PrimitiveType primitiveType = type.asPrimitiveType(); + if (primitiveType.getDecimalMetadata() != null) { + // decimal + recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale())); + break; + } + /* fall through */ + default: + recordConsumer.addBinary(Binary.fromString(rawData)); + break; + } + + recordConsumer.endField(columnDescriptor.getPath()[0], i); + } else { + boolean isNull = null == value.getRawData(); + + if (!isNull) { + recordConsumer.startField(columnDescriptor.getPath()[0], i); + + // no skip: empty fields are illegal, the field should be ommited completely instead + switch (columnDescriptor.getType()) { + case BOOLEAN: + recordConsumer.addBoolean(value.asBoolean()); + break; + case FLOAT: + recordConsumer.addFloat(value.asDouble().floatValue()); + break; + case DOUBLE: + recordConsumer.addDouble(value.asDouble()); + break; + case INT32: + OriginalType originalType = type.getOriginalType(); + if (originalType != null && StringUtils.equalsIgnoreCase("DATE", originalType.name())) { + int realVal = (int) (new java.sql.Date(value.asLong()).toLocalDate().toEpochDay()); + recordConsumer.addInteger(realVal); + } else { + recordConsumer.addInteger(value.asLong().intValue()); + } + break; + case INT64: + recordConsumer.addLong(value.asLong()); + break; + case INT96: + recordConsumer.addBinary(timestampColToBinary(value)); + break; + case BINARY: + String valueAsString2Write = null; + if (Column.Type.DATE == value.getType() && null != this.dateParse) { + valueAsString2Write = dateParse.format(value.asDate()); + } else { + valueAsString2Write = value.asString(); + } + recordConsumer.addBinary(Binary.fromString(valueAsString2Write)); + break; + case FIXED_LEN_BYTE_ARRAY: + PrimitiveType primitiveType = type.asPrimitiveType(); + if (primitiveType.getDecimalMetadata() != null) { + // decimal + recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale())); + break; + } + /* fall through */ + default: + recordConsumer.addBinary(Binary.fromString(value.asString())); + break; + } + recordConsumer.endField(columnDescriptor.getPath()[0], i); + } + } + } catch (Exception e) { + if (printStackTrace) { + printStackTrace = false; + LOGGER.warn("write to parquet error: {}", e.getMessage(), e); + } + // dirty data + if (null != this.taskPluginCollector) { + // job post 里面的merge taskPluginCollector 为null + this.taskPluginCollector.collectDirtyRecord(values, e, e.getMessage()); + } + } + } else { + recordConsumer.addBinary(this.binaryForNull); + } + } + recordConsumer.endMessage(); + } + } + + private Binary decimalToBinary(Column value, int precision, int scale) { + BigDecimal bigDecimal = value.asBigDecimal(); + bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + byte[] decimalBytes = bigDecimal.unscaledValue().toByteArray(); + + int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[precision - 1]; + if (precToBytes == decimalBytes.length) { + // No padding needed. + return Binary.fromByteArray(decimalBytes); + } + + byte[] tgt = new byte[precToBytes]; + + // padding -1 for negative number + if (bigDecimal.compareTo(new BigDecimal("0")) < 0) { + Arrays.fill(tgt, 0, precToBytes - decimalBytes.length, (byte) -1); + } + + System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); + return Binary.fromByteArray(tgt); + } + + private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588; + private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); + private static final long MILLS_PER_SECOND = TimeUnit.SECONDS.toMillis(1); + private static final long NANOS_PER_DAY = TimeUnit.DAYS.toNanos(1); + private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + private static final ZoneOffset defaultOffset = OffsetDateTime.now().getOffset(); + + /** + * int 96 is timestamp in parquet + * + * @param valueColumn + * @return + */ + private Binary timestampColToBinary(Column valueColumn) { + if (valueColumn.getRawData() == null) { + return Binary.EMPTY; + } + long mills; + long nanos = 0; + if (valueColumn instanceof DateColumn) { + DateColumn dateColumn = (DateColumn) valueColumn; + mills = dateColumn.asLong(); + nanos = dateColumn.getNanos(); + } else { + mills = valueColumn.asLong(); + } + int julianDay; + long nanosOfDay; + if (isUtcTimestamp) { + // utc ignore current timezone (task should set timezone same as hive/hdfs) + long seconds = mills >= 0 ? mills / MILLS_PER_SECOND : (mills / MILLS_PER_SECOND - 1); + LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(seconds, (int) nanos, defaultOffset); + julianDay = (int) (localDateTime.getLong(ChronoField.EPOCH_DAY) + JULIAN_EPOCH_OFFSET_DAYS); + nanosOfDay = localDateTime.getLong(ChronoField.NANO_OF_DAY); + } else { + // local date + julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS); + if (mills >= 0) { + nanosOfDay = ((mills % MILLIS_IN_DAY) / MILLS_PER_SECOND) * NANOS_PER_SECOND + nanos; + } else { + julianDay--; + nanosOfDay = (((mills % MILLIS_IN_DAY) / MILLS_PER_SECOND) - 1) * NANOS_PER_SECOND + nanos; + nanosOfDay += NANOS_PER_DAY; + } + } + + ByteBuffer buf = ByteBuffer.allocate(12); + buf.order(ByteOrder.LITTLE_ENDIAN); + buf.putLong(nanosOfDay); + buf.putInt(julianDay); + buf.flip(); + return Binary.fromByteBuffer(buf); + } + + private void writeBaseOnFields(Record values) { + //LOGGER.info("Writing parquet data using fields mode(The correct mode.)"); + List types = this.schema.getFields(); + + if (values != null && types != null && values.getColumnNumber() == types.size()) { + recordConsumer.startMessage(); + writeFields(types, values); + recordConsumer.endMessage(); + } + } + + private void writeFields(List types, Record values) { + for (int i = 0; i < types.size(); i++) { + Type type = types.get(i); + Column value = values.getColumn(i); + if (value != null) { + try { + if (type.isPrimitive()) { + writePrimitiveType(type, value, i); + } else { + writeGroupType(type, (JSON) JSON.parse(value.asString()), i); + } + } catch (Exception e) { + if (printStackTrace) { + printStackTrace = false; + LOGGER.warn("write to parquet error: {}", e.getMessage(), e); + } + // dirty data + if (null != this.taskPluginCollector) { + // job post 里面的merge taskPluginCollector 为null + this.taskPluginCollector.collectDirtyRecord(values, e, e.getMessage()); + } + } + } + } + } + + private void writeFields(List types, JSONObject values) { + for (int i = 0; i < types.size(); i++) { + Type type = types.get(i); + Object value = values.get(type.getName()); + + if (value != null) { + try { + if (type.isPrimitive()) { + writePrimitiveType(type, value, i); + } else { + writeGroupType(type, (JSON) value, i); + } + } catch (Exception e) { + if (printStackTrace) { + printStackTrace = false; + LOGGER.warn("write to parquet error: {}", e.getMessage(), e); + } + } + } else { + recordConsumer.addBinary(this.binaryForNull); + } + } + } + + private void writeGroupType(Type type, JSON value, int index) { + GroupType groupType = type.asGroupType(); + OriginalType originalType = groupType.getOriginalType(); + if (originalType != null) { + switch (originalType) { + case MAP: + writeMap(groupType, value, index); + break; + case LIST: + writeList(groupType, value, index); + break; + default: + break; + } + } else { + // struct + writeStruct(groupType, value, index); + } + } + + private void writeMap(GroupType groupType, JSON value, int index) { + if (value == null) { + return; + } + + JSONObject json = (JSONObject) value; + + if (json.isEmpty()) { + return; + } + + recordConsumer.startField(groupType.getName(), index); + + recordConsumer.startGroup(); + + // map + // key_value start + recordConsumer.startField("key_value", 0); + recordConsumer.startGroup(); + + List keyValueFields = groupType.getFields().get(0).asGroupType().getFields(); + Type keyType = keyValueFields.get(0); + Type valueType = keyValueFields.get(1); + for (String key : json.keySet()) { + // key + writePrimitiveType(keyType, key, 0); + + // value + if (valueType.isPrimitive()) { + writePrimitiveType(valueType, json.get(key), 1); + } else { + writeGroupType(valueType, (JSON) json.get(key), 1); + } + } + + recordConsumer.endGroup(); + recordConsumer.endField("key_value", 0); + // key_value end + + recordConsumer.endGroup(); + recordConsumer.endField(groupType.getName(), index); + } + + private void writeList(GroupType groupType, JSON value, int index) { + if (value == null) { + return; + } + + JSONArray json = (JSONArray) value; + + if (json.isEmpty()) { + return; + } + + recordConsumer.startField(groupType.getName(), index); + // list + recordConsumer.startGroup(); + + // list start + recordConsumer.startField("list", 0); + recordConsumer.startGroup(); + + Type elementType = groupType.getFields().get(0).asGroupType().getFields().get(0); + + if (elementType.isPrimitive()) { + for (Object elementValue : json) { + writePrimitiveType(elementType, elementValue, 0); + } + } else { + for (Object elementValue : json) { + writeGroupType(elementType, (JSON) elementValue, 0); + } + } + + recordConsumer.endGroup(); + recordConsumer.endField("list", 0); + // list end + recordConsumer.endGroup(); + + recordConsumer.endField(groupType.getName(), index); + } + + private void writeStruct(GroupType groupType, JSON value, int index) { + if (value == null) { + return; + } + JSONObject json = (JSONObject) value; + if (json.isEmpty()) { + return; + } + + recordConsumer.startField(groupType.getName(), index); + // struct start + recordConsumer.startGroup(); + + writeFields(groupType.getFields(), json); + recordConsumer.endGroup(); + // struct end + recordConsumer.endField(groupType.getName(), index); + } + + private void writePrimitiveType(Type type, Object value, int index) { + if (value == null) { + return; + } + + recordConsumer.startField(type.getName(), index); + PrimitiveType primitiveType = type.asPrimitiveType(); + + switch (primitiveType.getPrimitiveTypeName()) { + case BOOLEAN: + recordConsumer.addBoolean((Boolean) value); + break; + case FLOAT: + if (value instanceof Float) { + recordConsumer.addFloat(((Float) value).floatValue()); + } else if (value instanceof Double) { + recordConsumer.addFloat(((Double) value).floatValue()); + } else if (value instanceof Long) { + recordConsumer.addFloat(((Long) value).floatValue()); + } else if (value instanceof Integer) { + recordConsumer.addFloat(((Integer) value).floatValue()); + } + break; + case DOUBLE: + if (value instanceof Float) { + recordConsumer.addDouble(((Float) value).doubleValue()); + } else if (value instanceof Double) { + recordConsumer.addDouble(((Double) value).doubleValue()); + } else if (value instanceof Long) { + recordConsumer.addDouble(((Long) value).doubleValue()); + } else if (value instanceof Integer) { + recordConsumer.addDouble(((Integer) value).doubleValue()); + } + break; + case INT32: + if (value instanceof Integer) { + recordConsumer.addInteger((Integer) value); + } else if (value instanceof Long) { + recordConsumer.addInteger(((Long) value).intValue()); + } else { + // 之前代码写的有问题,导致这里丢列了没抛异常,先收集,后续看看有没有任务命中在决定怎么改 + LimitLogger.limit("dirtyDataHiveWriterParquet", TimeUnit.MINUTES.toMillis(1), () -> LOGGER.warn("dirtyDataHiveWriterParquet {}", String.format("Invalid value: %s(clazz: %s) for field: %s", value, value.getClass(), type.getName()))); + } + break; + case INT64: + if (value instanceof Integer) { + recordConsumer.addLong(((Integer) value).longValue()); + } else if (value instanceof Long) { + recordConsumer.addInteger(((Long) value).intValue()); + } else { + // 之前代码写的有问题,导致这里丢列了没抛异常,先收集,后续看看有没有任务命中在决定怎么改 + LimitLogger.limit("dirtyDataHiveWriterParquet", TimeUnit.MINUTES.toMillis(1), () -> LOGGER.warn("dirtyDataHiveWriterParquet {}", String.format("Invalid value: %s(clazz: %s) for field: %s", value, value.getClass(), type.getName()))); + } + break; + case INT96: + if (value instanceof Integer) { + recordConsumer.addBinary(timestampColToBinary(new LongColumn((Integer) value))); + } else if (value instanceof Long) { + recordConsumer.addBinary(timestampColToBinary(new LongColumn((Long) value))); + } else if (value instanceof Timestamp) { + recordConsumer.addBinary(timestampColToBinary(new DateColumn((Timestamp) value))); + } else if (value instanceof Date) { + recordConsumer.addBinary(timestampColToBinary(new DateColumn((Date) value))); + } else { + recordConsumer.addBinary(timestampColToBinary(new StringColumn(value.toString()))); + } + break; + case FIXED_LEN_BYTE_ARRAY: + if (primitiveType.getDecimalMetadata() != null) { + // decimal + Column column; + if (value instanceof Integer) { + column = new LongColumn((Integer) value); + } else if (value instanceof Long) { + column = new LongColumn((Long) value); + } else if (value instanceof Double) { + column = new DoubleColumn((Double) value); + } else if (value instanceof BigDecimal) { + column = new DoubleColumn((BigDecimal) value); + } else { + column = new StringColumn(value.toString()); + } + recordConsumer.addBinary(decimalToBinary(column, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale())); + break; + } + /* fall through */ + case BINARY: + default: + recordConsumer.addBinary(Binary.fromString((String) value)); + break; + } + recordConsumer.endField(type.getName(), index); + } + + private void writePrimitiveType(Type type, Column value, int index) { + if (value == null || value.getRawData() == null) { + return; + } + + recordConsumer.startField(type.getName(), index); + PrimitiveType primitiveType = type.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + case BOOLEAN: + recordConsumer.addBoolean(value.asBoolean()); + break; + case FLOAT: + recordConsumer.addFloat(value.asDouble().floatValue()); + break; + case DOUBLE: + recordConsumer.addDouble(value.asDouble()); + break; + case INT32: + OriginalType originalType = type.getOriginalType(); + if (OriginalType.DATE.equals(originalType)) { + int realVal = (int) (new java.sql.Date(value.asLong()).toLocalDate().toEpochDay()); + recordConsumer.addInteger(realVal); + } else { + recordConsumer.addInteger(value.asLong().intValue()); + } + break; + case INT64: + recordConsumer.addLong(value.asLong()); + break; + case INT96: + recordConsumer.addBinary(timestampColToBinary(value)); + break; + case BINARY: + String valueAsString2Write = null; + if (Column.Type.DATE == value.getType() && null != this.dateParse) { + valueAsString2Write = dateParse.format(value.asDate()); + } else { + valueAsString2Write = value.asString(); + } + recordConsumer.addBinary(Binary.fromString(valueAsString2Write)); + break; + case FIXED_LEN_BYTE_ARRAY: + if (primitiveType.getDecimalMetadata() != null) { + // decimal + recordConsumer.addBinary(decimalToBinary(value, primitiveType.getDecimalMetadata().getPrecision(), primitiveType.getDecimalMetadata().getScale())); + break; + } + /* fall through */ + default: + recordConsumer.addBinary(Binary.fromString(value.asString())); + break; + } + recordConsumer.endField(type.getName(), index); + } +} diff --git a/neo4jwriter/pom.xml b/neo4jwriter/pom.xml index 0e65505e48..2ff0f55021 100644 --- a/neo4jwriter/pom.xml +++ b/neo4jwriter/pom.xml @@ -53,11 +53,6 @@ ${junit4.version} test - - com.alibaba.datax - datax-example - 0.0.1-SNAPSHOT - diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java index 67f6e95e47..53c9235e99 100644 --- a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java @@ -52,8 +52,8 @@ public class Neo4jWriterTest { protected static final Network NETWORK = Network.newNetwork(); private GenericContainer container; - protected Driver neo4jDriver; - protected Session neo4jSession; + private Driver neo4jDriver; + private Session neo4jSession; @Before public void init() { diff --git a/oceanbasev10writer/pom.xml b/oceanbasev10writer/pom.xml index cbe197327f..11997a1e3f 100644 --- a/oceanbasev10writer/pom.xml +++ b/oceanbasev10writer/pom.xml @@ -64,8 +64,16 @@ + + com.oceanbase + shade-ob-partition-calculator + 1.0-SNAPSHOT + system + ${pom.basedir}/src/main/libs/shade-ob-partition-calculator-1.0-SNAPSHOT.jar + - + + log4j log4j 1.2.16 diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java index 9fa3cd9a89..6776196b52 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/Config.java @@ -6,6 +6,7 @@ public interface Config { double DEFAULT_MEMSTORE_THRESHOLD = 0.9d; + double DEFAULT_SLOW_MEMSTORE_THRESHOLD = 0.75d; String MEMSTORE_CHECK_INTERVAL_SECOND = "memstoreCheckIntervalSecond"; long DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND = 30; diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java new file mode 100644 index 0000000000..c8630cd0af --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/AbstractConnHolder.java @@ -0,0 +1,48 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; + +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; + +public abstract class AbstractConnHolder { + private static final Logger LOG = LoggerFactory.getLogger(AbstractConnHolder.class); + + protected final Configuration config; + protected Connection conn; + + public AbstractConnHolder(Configuration config) { + this.config = config; + } + + public abstract Connection initConnection(); + + public Configuration getConfig() { + return config; + } + + public Connection getConn() { + try { + if (conn != null && !conn.isClosed()) { + return conn; + } + } catch (Exception e) { + LOG.warn("judge connection is closed or not failed. try to reconnect.", e); + } + return reconnect(); + } + + public Connection reconnect() { + DBUtil.closeDBResources(null, conn); + return initConnection(); + } + + public abstract String getJdbcUrl(); + + public abstract String getUserName(); + + public abstract void destroy(); +} diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java index 531724950b..b8ae259a9f 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/DataBaseWriterBuffer.java @@ -23,7 +23,7 @@ public class DataBaseWriterBuffer { private static final Logger LOG = LoggerFactory.getLogger(DataBaseWriterBuffer.class); - private final ConnHolder connHolder; + private final AbstractConnHolder connHolder; private final String dbName; private Map> tableBuffer = new HashMap>(); private long lastCheckMemstoreTime; @@ -33,7 +33,7 @@ public DataBaseWriterBuffer(Configuration config,String jdbcUrl, String userName this.dbName=dbName; } - public ConnHolder getConnHolder(){ + public AbstractConnHolder getConnHolder(){ return connHolder; } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java index 10de5615ba..262fb1cb7b 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/OCJConnHolder.java @@ -3,15 +3,13 @@ import java.sql.Connection; import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; /** * wrap oceanbase java client * @author oceanbase */ -public class OCJConnHolder extends ConnHolder { +public class OCJConnHolder extends AbstractConnHolder { private ServerConnectInfo connectInfo; private String dataSourceKey; @@ -28,17 +26,6 @@ public Connection initConnection() { return conn; } - @Override - public Connection reconnect() { - DBUtil.closeDBResources(null, conn); - return initConnection(); - } - - @Override - public Connection getConn() { - return conn; - } - @Override public String getJdbcUrl() { return connectInfo.jdbcUrl; diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java index 8ff5303901..ac75d359dc 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ObClientConnHolder.java @@ -16,7 +16,7 @@ * @author oceanbase * */ -public class ObClientConnHolder extends ConnHolder { +public class ObClientConnHolder extends AbstractConnHolder { private final String jdbcUrl; private final String userName; private final String password; diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java index b06116420b..fe8889e167 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/ext/ServerConnectInfo.java @@ -1,5 +1,7 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.ext; +import static org.apache.commons.lang3.StringUtils.EMPTY; + import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -12,40 +14,19 @@ public class ServerConnectInfo { public String databaseName; public String ipPort; public String jdbcUrl; + public boolean publicCloud; + /** + * + * @param jdbcUrl format is jdbc:oceanbase//ip:port + * @param username format is cluster:tenant:username or username@tenant#cluster or user@tenant or user + * @param password + */ public ServerConnectInfo(final String jdbcUrl, final String username, final String password) { - if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) { - String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN); - if (ss.length != 3) { - throw new RuntimeException("jdbc url format is not correct: " + jdbcUrl); - } - this.userName = username; - this.clusterName = ss[1].trim().split(":")[0]; - this.tenantName = ss[1].trim().split(":")[1]; - this.jdbcUrl = ss[2].replace("jdbc:mysql:", "jdbc:oceanbase:"); - } else { - this.jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:"); - if (username.contains("@") && username.contains("#")) { - this.userName = username.substring(0, username.indexOf("@")); - this.tenantName = username.substring(username.indexOf("@") + 1, username.indexOf("#")); - this.clusterName = username.substring(username.indexOf("#") + 1); - } else if (username.contains(":")) { - String[] config = username.split(":"); - if (config.length != 3) { - throw new RuntimeException ("username format is not correct: " + username); - } - this.clusterName = config[0]; - this.tenantName = config[1]; - this.userName = config[2]; - } else { - this.clusterName = null; - this.tenantName = null; - this.userName = username; - } - } - + this.jdbcUrl = jdbcUrl; this.password = password; parseJdbcUrl(jdbcUrl); + parseFullUserName(username); } private void parseJdbcUrl(final String jdbcUrl) { @@ -56,11 +37,42 @@ private void parseJdbcUrl(final String jdbcUrl) { String dbName = matcher.group(2); this.ipPort = ipPort; this.databaseName = dbName; + this.publicCloud = ipPort.split(":")[0].endsWith("aliyuncs.com"); } else { throw new RuntimeException("Invalid argument:" + jdbcUrl); } } + private void parseFullUserName(final String fullUserName) { + int tenantIndex = fullUserName.indexOf("@"); + int clusterIndex = fullUserName.indexOf("#"); + if (fullUserName.contains(":") && tenantIndex < 0) { + String[] names = fullUserName.split(":"); + if (names.length != 3) { + throw new RuntimeException("invalid argument: " + fullUserName); + } else { + this.clusterName = names[0]; + this.tenantName = names[1]; + this.userName = names[2]; + } + } else if (!publicCloud || tenantIndex < 0) { + this.userName = tenantIndex < 0 ? fullUserName : fullUserName.substring(0, tenantIndex); + this.clusterName = clusterIndex < 0 ? EMPTY : fullUserName.substring(clusterIndex + 1); + this.tenantName = tenantIndex < 0 ? EMPTY : fullUserName.substring(tenantIndex + 1, clusterIndex); + } else { + // If in public cloud, the username with format user@tenant#cluster should be parsed, otherwise, connection can't be created. + this.userName = fullUserName.substring(0, tenantIndex); + if (clusterIndex > tenantIndex) { + this.tenantName = fullUserName.substring(tenantIndex + 1, clusterIndex); + this.clusterName = fullUserName.substring(clusterIndex + 1); + } else { + this.tenantName = fullUserName.substring(tenantIndex + 1); + this.clusterName = EMPTY; + } + } + } + + @Override public String toString() { StringBuffer strBuffer = new StringBuffer(); return strBuffer.append("clusterName:").append(clusterName).append(", tenantName:").append(tenantName) @@ -69,11 +81,18 @@ public String toString() { } public String getFullUserName() { - StringBuilder builder = new StringBuilder(userName); - if (tenantName != null && clusterName != null) { - builder.append("@").append(tenantName).append("#").append(clusterName); + StringBuilder builder = new StringBuilder(); + builder.append(userName); + if (!EMPTY.equals(tenantName)) { + builder.append("@").append(tenantName); } + if (!EMPTY.equals(clusterName)) { + builder.append("#").append(clusterName); + } + if (EMPTY.equals(this.clusterName) && EMPTY.equals(this.tenantName)) { + return this.userName; + } return builder.toString(); } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/IObPartCalculator.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/IObPartCalculator.java new file mode 100644 index 0000000000..b49ade02f5 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/IObPartCalculator.java @@ -0,0 +1,19 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.part; + +import com.alibaba.datax.common.element.Record; + +/** + * @author cjyyz + * @date 2023/02/07 + * @since + */ +public interface IObPartCalculator { + + /** + * 计算 Partition Id + * + * @param record + * @return Long + */ + Long calculate(Record record); +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/ObPartitionCalculatorV1.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/ObPartitionCalculatorV1.java new file mode 100644 index 0000000000..96985588d8 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/ObPartitionCalculatorV1.java @@ -0,0 +1,109 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.part; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; +import com.alipay.oceanbase.obproxy.data.TableEntryKey; +import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OceanBase 1.x和2.x的分区计算 + * + * @author cjyyz + * @date 2023/02/07 + * @since + */ +public class ObPartitionCalculatorV1 implements IObPartCalculator { + + private static final Logger LOG = LoggerFactory.getLogger(ObPartitionCalculatorV1.class); + + /** + * 分区键的位置 + */ + private List partIndexes; + + /** + * 表的全部字段名 + */ + private List columnNames; + + /** + * ocj partition calculator + */ + private ObPartitionIdCalculator calculator; + + /** + * @param connectInfo + * @param table + * @param columns + */ + public ObPartitionCalculatorV1(ServerConnectInfo connectInfo, String table, List columns) { + + initCalculator(connectInfo, table); + + if (Objects.isNull(calculator)) { + LOG.warn("partCalculator is null"); + return; + } + + this.partIndexes = new ArrayList<>(columns.size()); + this.columnNames = new ArrayList<>(columns); + + for (int i = 0; i < columns.size(); ++i) { + String columnName = columns.get(i); + if (calculator.isPartitionKeyColumn(columnName)) { + LOG.info(columnName + " is partition key."); + partIndexes.add(i); + } + } + } + + /** + * @param record + * @return Long + */ + @Override + public Long calculate(Record record) { + if (Objects.isNull(calculator)) { + return null; + } + + for (Integer i : partIndexes) { + calculator.addColumn(columnNames.get(i), record.getColumn(i).asString()); + } + return calculator.calculate(); + } + + /** + * @param connectInfo + * @param table + */ + private void initCalculator(ServerConnectInfo connectInfo, String table) { + + LOG.info(String.format("create tableEntryKey with clusterName %s, tenantName %s, databaseName %s, tableName %s", + connectInfo.clusterName, connectInfo.tenantName, connectInfo.databaseName, table)); + TableEntryKey tableEntryKey = new TableEntryKey(connectInfo.clusterName, connectInfo.tenantName, + connectInfo.databaseName, table); + + int retry = 0; + + do { + try { + if (retry > 0) { + TimeUnit.SECONDS.sleep(1); + LOG.info("retry create new part calculator {} times", retry); + } + LOG.info("create partCalculator with address: " + connectInfo.ipPort); + calculator = new ObPartitionIdCalculator(connectInfo.ipPort, tableEntryKey); + } catch (Exception ex) { + ++retry; + LOG.warn("create new part calculator failed, retry: {}", ex.getMessage()); + } + } while (calculator == null && retry < 3); + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/ObPartitionCalculatorV2.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/ObPartitionCalculatorV2.java new file mode 100644 index 0000000000..11b7b25cd3 --- /dev/null +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/part/ObPartitionCalculatorV2.java @@ -0,0 +1,169 @@ +package com.alibaba.datax.plugin.writer.oceanbasev10writer.part; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils; +import com.oceanbase.partition.calculator.ObPartIdCalculator; +import com.oceanbase.partition.calculator.enums.ObPartLevel; +import com.oceanbase.partition.calculator.enums.ObServerMode; +import com.oceanbase.partition.calculator.helper.TableEntryExtractor; +import com.oceanbase.partition.calculator.model.TableEntry; +import com.oceanbase.partition.calculator.model.TableEntryKey; +import com.oceanbase.partition.calculator.model.Version; +import com.oceanbase.partition.metadata.desc.ObPartColumn; +import com.oceanbase.partition.metadata.desc.ObTablePart; +import java.sql.Connection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OceanBase 3.x和4.x的分区计算 + * + * @author cjyyz + * @date 2023/02/07 + * @since + */ +public class ObPartitionCalculatorV2 implements IObPartCalculator { + + private static final Logger LOG = LoggerFactory.getLogger(ObPartitionCalculatorV2.class); + + /** + * OB的模式以及版本信息 + */ + private ObServerMode mode; + + /** + * ob-partition-calculator 分区计算组件 + */ + private ObPartIdCalculator calculator; + + /** + * 记录columns的字段名和在record中的位置。 + * 当目标表结构的分区键是生成列时,calculator 需要从改结构中获取到生成列所依赖的字段的值 + * e.g. + * create table t1 ( + * c1 varchar(20), + * c2 varchar(20) generated always as (substr(`c1`,1,8)) + * )partition by key(c2) partitions 5 + * + * 此时,columnNameIndexMap包含的元素是 c1:0 + * 需要将c1字段的值从columnNameIndexMap中添加到{@link com.oceanbase.partition.calculator.ObPartIdCalculator#getRefColumnValues()} + */ + private Map columnNameIndexMap; + + /** + * @param connectInfo + * @param table + * @param mode + */ + public ObPartitionCalculatorV2(ServerConnectInfo connectInfo, String table, ObServerMode mode, List columns) { + this.mode = mode; + this.columnNameIndexMap = new HashMap<>(); + for (int i = 0; i < columns.size(); i++) { + columnNameIndexMap.put(columns.get(i).toLowerCase(), i); + } + initCalculator(connectInfo, table); + } + + /** + * @param record + * @return Long + */ + @Override + public Long calculate(Record record) { + if (Objects.isNull(calculator)) { + return null; + } + if (!calculator.getTableEntry().isPartitionTable()) { + return 0L; + } + return calculator.calculatePartId(filterNullableColumns(record)); + } + + /** + * 初始化分区计算组件 + * + * @param connectInfo + * @param table + */ + private void initCalculator(ServerConnectInfo connectInfo, String table) { + TableEntryKey tableEntryKey = new TableEntryKey(connectInfo.clusterName, connectInfo.tenantName, connectInfo.databaseName, table, mode); + boolean subsequentFromV4 = !mode.getVersion().isOlderThan(new Version("4.0.0.0")); + try { + TableEntry tableEntry; + try (Connection conn = getConnection(connectInfo, subsequentFromV4)){ + TableEntryExtractor extractor = new TableEntryExtractor(); + tableEntry = extractor.queryTableEntry(conn, tableEntryKey,subsequentFromV4); + } + this.calculator = new ObPartIdCalculator(false, tableEntry, subsequentFromV4); + } catch (Exception e) { + LOG.warn("create new part calculator failed. reason: {}", e.getMessage()); + } + } + + private Connection getConnection(ServerConnectInfo connectInfo, boolean subsequentFromV4) throws Exception { + // OceanBase 4.0.0.0及之后版本均使用业务租户连接计算分区 + if (subsequentFromV4) { + return DBUtil.getConnection(DataBaseType.OceanBase, connectInfo.jdbcUrl, connectInfo.getFullUserName(), connectInfo.password); + } + // OceanBase 4.0.0.0之前版本使用sys租户连接计算分区 + return DbUtils.buildSysConn(connectInfo.jdbcUrl, connectInfo.clusterName); + } + + /** + * 只选择分区字段值传入分区计算组件 + * + * @param record + * @return Object[] + */ + private Object[] filterNullableColumns(Record record) { + final ObTablePart tablePart = calculator.getTableEntry().getTablePart(); + + final Object[] filteredRecords = new Object[record.getColumnNumber()]; + + if (tablePart.getLevel().getIndex() > ObPartLevel.LEVEL_ZERO.getIndex()) { + // 从record中添加非生成列的一级分区值到filteredRecords数组中 + for (ObPartColumn partColumn : tablePart.getPartColumns()) { + if (partColumn.getColumnExpr() == null) { + int metaIndex = partColumn.getColumnIndex(); + String columnName = partColumn.getColumnName().toLowerCase(); + int idxInRecord = columnNameIndexMap.get(columnName); + filteredRecords[metaIndex] = record.getColumn(idxInRecord).asString(); + } + + } + // 从record中添加生成列的一级分区值到calculator的redColumnMap中,ObTablePart.getRefPartColumns中的字段名均为小写 + for (ObPartColumn partColumn : tablePart.getRefPartColumns()) { + String columnName = partColumn.getColumnName(); + int index = columnNameIndexMap.get(columnName); + calculator.addRefColumn(columnName, record.getColumn(index).asString()); + } + } + + if (tablePart.getLevel().getIndex() >= ObPartLevel.LEVEL_TWO.getIndex()) { + // 从record中添加非生成列的二级分区值到filteredRecords数组中 + for (ObPartColumn partColumn : tablePart.getSubPartColumns()) { + if (partColumn.getColumnExpr() == null) { + int metaIndex = partColumn.getColumnIndex(); + String columnName = partColumn.getColumnName().toLowerCase(); + int idxInRecord = columnNameIndexMap.get(columnName); + filteredRecords[metaIndex] = record.getColumn(idxInRecord).asString(); + } + + } + // 从record中添加生成列的二级分区值到calculator的redColumnMap中,ObTablePart.getRefSubPartColumns中的字段名均为小写 + for (ObPartColumn partColumn : tablePart.getRefSubPartColumns()) { + String columnName = partColumn.getColumnName(); + int index = columnNameIndexMap.get(columnName); + calculator.addRefColumn(columnName, record.getColumn(index).asString()); + } + } + return filteredRecords; + } +} \ No newline at end of file diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index 82b16923e0..0ad3a1ed2f 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -1,6 +1,5 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; -import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; @@ -11,16 +10,14 @@ import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.AbstractConnHolder; import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.IObPartCalculator; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV1; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.part.ObPartitionCalculatorV2; import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; -import com.alipay.oceanbase.obproxy.data.TableEntryKey; -import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.oceanbase.partition.calculator.enums.ObServerMode; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -35,8 +32,12 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - -//import java.sql.PreparedStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static com.alibaba.datax.plugin.writer.oceanbasev10writer.Config.DEFAULT_SLOW_MEMSTORE_THRESHOLD; +import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.FAST; +import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.PAUSE; +import static com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils.LoadMode.SLOW; public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class); @@ -47,41 +48,31 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private long memstoreCheckIntervalSecond = Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND; // 最后一次检查 private long lastCheckMemstoreTime; + + private volatile ObWriterUtils.LoadMode loadMode = FAST; private static AtomicLong totalTask = new AtomicLong(0); private long taskId = -1; - private AtomicBoolean isMemStoreFull = new AtomicBoolean(false); - private ConnHolder checkConnHolder; + private HashMap> groupInsertValues; + private IObPartCalculator obPartCalculator; + private ConcurrentTableWriter concurrentWriter = null; + private AbstractConnHolder connHolder; + private boolean allTaskInQueue = false; + private Lock lock = new ReentrantLock(); + private Condition condition = lock.newCondition(); + private long startTime; + private String obWriteMode = "update"; + private boolean isOracleCompatibleMode = false; + private String obUpdateColumns = null; + private String dbName; + private int calPartFailedCount = 0; - public ConcurrentTableWriterTask(DataBaseType dataBaseType) { + public ConcurrentTableWriterTask(DataBaseType dataBaseType) { super(dataBaseType); taskId = totalTask.getAndIncrement(); } - private ObPartitionIdCalculator partCalculator = null; - - private HashMap> groupInsertValues; - List unknownPartRecords = new ArrayList(); -// private List unknownPartRecords; - private List partitionKeyIndexes; - - private ConcurrentTableWriter concurrentWriter = null; - - private ConnHolder connHolder; - - private boolean allTaskInQueue = false; - - private Lock lock = new ReentrantLock(); - private Condition condition = lock.newCondition(); - - private long startTime; - private String obWriteMode = "update"; - private boolean isOracleCompatibleMode = false; - private String obUpdateColumns = null; - private List> deleteColPos; - private String dbName; - @Override public void init(Configuration config) { super.init(config); @@ -95,15 +86,11 @@ public void init(Configuration config) { this.memstoreThreshold = config.getDouble(Config.MEMSTORE_THRESHOLD, Config.DEFAULT_MEMSTORE_THRESHOLD); this.memstoreCheckIntervalSecond = config.getLong(Config.MEMSTORE_CHECK_INTERVAL_SECOND, Config.DEFAULT_MEMSTORE_CHECK_INTERVAL_SECOND); - this.isOracleCompatibleMode = ObWriterUtils.isOracleMode(); - LOG.info("configure url is unavailable, use obclient for connections."); - this.checkConnHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, - connectInfo.getFullUserName(), connectInfo.password); - this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, + this.connHolder = new ObClientConnHolder(config, connectInfo.jdbcUrl, connectInfo.getFullUserName(), connectInfo.password); - checkConnHolder.initConnection(); - if (isOracleCompatibleMode) { + this.isOracleCompatibleMode = ObWriterUtils.isOracleMode(); + if (isOracleCompatibleMode) { connectInfo.databaseName = connectInfo.databaseName.toUpperCase(); //在转义的情况下不翻译 if (!(table.startsWith("\"") && table.endsWith("\""))) { @@ -115,43 +102,36 @@ public void init(Configuration config) { } if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) { - initPartCalculator(connectInfo); + this.obPartCalculator = createPartitionCalculator(connectInfo, ObServerMode.from(config.getString(Config.OB_COMPATIBLE_MODE), config.getString(Config.OB_VERSION))); } else { LOG.info("Disable partition calculation feature."); } - obUpdateColumns = config.getString(Config.OB_UPDATE_COLUMNS, null); - groupInsertValues = new HashMap>(); - partitionKeyIndexes = new ArrayList(); - rewriteSql(); + obUpdateColumns = config.getString(Config.OB_UPDATE_COLUMNS, null); + groupInsertValues = new HashMap>(); + rewriteSql(); - if (null == concurrentWriter) { - concurrentWriter = new ConcurrentTableWriter(config, connectInfo, writeRecordSql); - allTaskInQueue = false; - } - } + if (null == concurrentWriter) { + concurrentWriter = new ConcurrentTableWriter(config, connectInfo, writeRecordSql); + allTaskInQueue = false; + } + } - private void initPartCalculator(ServerConnectInfo connectInfo) { - int retry = 0; - LOG.info(String.format("create tableEntryKey with clusterName %s, tenantName %s, databaseName %s, tableName %s", - connectInfo.clusterName, connectInfo.tenantName, connectInfo.databaseName, table)); - TableEntryKey tableEntryKey = new TableEntryKey(connectInfo.clusterName, connectInfo.tenantName, - connectInfo.databaseName, table); - do { - try { - if (retry > 0) { - int sleep = retry > 8 ? 500 : (1 << retry); - TimeUnit.SECONDS.sleep(sleep); - LOG.info("retry create new part calculator, the {} times", retry); - } - LOG.info("create partCalculator with address: " + connectInfo.ipPort); - partCalculator = new ObPartitionIdCalculator(connectInfo.ipPort, tableEntryKey); - } catch (Exception ex) { - ++retry; - LOG.warn("create new part calculator failed, retry {}: {}", retry, ex.getMessage()); - } - } while (partCalculator == null && retry < 3); // try 3 times - } + /** + * 创建需要的分区计算组件 + * + * @param connectInfo + * @return + */ + private IObPartCalculator createPartitionCalculator(ServerConnectInfo connectInfo, ObServerMode obServerMode) { + if (obServerMode.isSubsequentFrom("3.0.0.0")) { + LOG.info("oceanbase version is {}, use ob-partition-calculator to calculate partition Id.", obServerMode.getVersion()); + return new ObPartitionCalculatorV2(connectInfo, table, obServerMode, columns); + } + + LOG.info("oceanbase version is {}, use ocj to calculate partition Id.", obServerMode.getVersion()); + return new ObPartitionCalculatorV1(connectInfo, table, columns); + } public boolean isFinished() { return allTaskInQueue && concurrentWriter.checkFinish(); @@ -174,43 +154,18 @@ private void rewriteSql() { if (isOracleCompatibleMode && obWriteMode.equalsIgnoreCase("update")) { // change obWriteMode to insert so the insert statement will be generated. obWriteMode = "insert"; - deleteColPos = ObWriterUtils.buildDeleteSql(conn, dbName, table, columns); } this.writeRecordSql = ObWriterUtils.buildWriteSql(table, columns, conn, obWriteMode, obUpdateColumns); LOG.info("writeRecordSql :{}", this.writeRecordSql); } - + + @Override public void prepare(Configuration writerSliceConfig) { super.prepare(writerSliceConfig); - calPartitionKeyIndex(partitionKeyIndexes); concurrentWriter.start(); } - private void calPartitionKeyIndex(List partKeyIndexes) { - partKeyIndexes.clear(); - if (null == partCalculator) { - LOG.error("partCalculator is null"); - return; - } - for (int i = 0; i < columns.size(); ++i) { - if (partCalculator.isPartitionKeyColumn(columns.get(i))) { - LOG.info(columns.get(i) + " is partition key."); - partKeyIndexes.add(i); - } - } - } - - private Long calPartitionId(List partKeyIndexes, Record record) { - if (partCalculator == null) { - return null; - } - for (Integer i : partKeyIndexes) { - partCalculator.addColumn(columns.get(i), record.getColumn(i).asString()); - } - return partCalculator.calculate(); - } - - @Override + @Override public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) { this.taskPluginCollector = taskPluginCollector; @@ -271,21 +226,6 @@ public PreparedStatement fillStatement(PreparedStatement preparedStatement, Reco return fillPreparedStatement(preparedStatement, record); } - public PreparedStatement fillStatementIndex(PreparedStatement preparedStatement, - int prepIdx, int columnIndex, Column column) throws SQLException { - int columnSqltype = this.resultSetMetaData.getMiddle().get(columnIndex); - String typeName = this.resultSetMetaData.getRight().get(columnIndex); - return fillPreparedStatementColumnType(preparedStatement, prepIdx, columnSqltype, typeName, column); - } - - public void collectDirtyRecord(Record record, SQLException e) { - taskPluginCollector.collectDirtyRecord(record, e); - } - - public void insertOneRecord(Connection connection, List buffer) { - doOneInsert(connection, buffer); - } - private void addLeftRecords() { //不需要刷新Cache,已经是最后一批数据了 for (List groupValues : groupInsertValues.values()) { @@ -293,17 +233,16 @@ private void addLeftRecords() { addRecordsToWriteQueue(groupValues); } } - if (unknownPartRecords.size() > 0) { - addRecordsToWriteQueue(unknownPartRecords); - } } private void addRecordToCache(final Record record) { Long partId =null; try { - partId = calPartitionId(partitionKeyIndexes, record); + partId = obPartCalculator == null ? Long.MAX_VALUE : obPartCalculator.calculate(record); } catch (Exception e1) { - LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record); + if (calPartFailedCount++ < 10) { + LOG.warn("fail to get partition id: " + e1.getMessage() + ", record: " + record); + } } if (partId == null) { @@ -311,24 +250,11 @@ private void addRecordToCache(final Record record) { partId = Long.MAX_VALUE; } - if (partId != null) { - List groupValues = groupInsertValues.get(partId); - if (groupValues == null) { - groupValues = new ArrayList(batchSize); - groupInsertValues.put(partId, groupValues); - } - groupValues.add(record); - if (groupValues.size() >= batchSize) { - groupValues = addRecordsToWriteQueue(groupValues); - groupInsertValues.put(partId, groupValues); - } - } else { - LOG.debug("add unknown part record {}", record); - unknownPartRecords.add(record); - if (unknownPartRecords.size() >= batchSize) { - unknownPartRecords = addRecordsToWriteQueue(unknownPartRecords); - } - + List groupValues = groupInsertValues.computeIfAbsent(partId, k -> new ArrayList(batchSize)); + groupValues.add(record); + if (groupValues.size() >= batchSize) { + groupValues = addRecordsToWriteQueue(groupValues); + groupInsertValues.put(partId, groupValues); } } @@ -354,15 +280,25 @@ private List addRecordsToWriteQueue(List records) { return new ArrayList(batchSize); } private void checkMemStore() { - Connection checkConn = checkConnHolder.reconnect(); + Connection checkConn = connHolder.getConn(); + try { + if (checkConn == null || checkConn.isClosed()) { + checkConn = connHolder.reconnect(); + } + }catch (Exception e) { + LOG.warn("Check connection is unusable"); + } + long now = System.currentTimeMillis(); if (now - lastCheckMemstoreTime < 1000 * memstoreCheckIntervalSecond) { return; } - boolean isFull = ObWriterUtils.isMemstoreFull(checkConn, memstoreThreshold); - this.isMemStoreFull.set(isFull); - if (isFull) { - LOG.warn("OB memstore is full,sleep 30 seconds, threshold=" + memstoreThreshold); + double memUsedRatio = ObWriterUtils.queryMemUsedRatio(checkConn); + if (memUsedRatio >= DEFAULT_SLOW_MEMSTORE_THRESHOLD) { + this.loadMode = memUsedRatio >= memstoreThreshold ? PAUSE : SLOW; + LOG.info("Memstore used ration is {}. Load data {}", memUsedRatio, loadMode.name()); + }else { + this.loadMode = FAST; } lastCheckMemstoreTime = now; } @@ -370,21 +306,23 @@ private void checkMemStore() { public boolean isMemStoreFull() { return isMemStoreFull.get(); } - - public void printEveryTime() { - long cost = System.currentTimeMillis() - startTime; - if (cost > 10000) { //10s - print(); - startTime = System.currentTimeMillis(); - } + + public boolean isShouldPause() { + return this.loadMode.equals(PAUSE); + } + + public boolean isShouldSlow() { + return this.loadMode.equals(SLOW); } public void print() { - LOG.debug("Statistic total task {}, finished {}, queue Size {}", - concurrentWriter.getTotalTaskCount(), - concurrentWriter.getFinishTaskCount(), - concurrentWriter.getTaskQueueSize()); - concurrentWriter.printStatistics(); + if (LOG.isDebugEnabled()) { + LOG.debug("Statistic total task {}, finished {}, queue Size {}", + concurrentWriter.getTotalTaskCount(), + concurrentWriter.getFinishTaskCount(), + concurrentWriter.getTaskQueueSize()); + concurrentWriter.printStatistics(); + } } public void waitTaskFinish() { @@ -417,8 +355,6 @@ public void destroy(Configuration writerSliceConfig) { } // 把本级持有的conn关闭掉 DBUtil.closeDBResources(null, connHolder.getConn()); - DBUtil.closeDBResources(null, checkConnHolder.getConn()); - checkConnHolder.destroy(); super.destroy(writerSliceConfig); } @@ -469,7 +405,7 @@ public boolean checkFinish() { public synchronized void start() { for (int i = 0; i < threadCount; ++i) { LOG.info("start {} insert task.", (i+1)); - InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql, deleteColPos); + InsertTask insertTask = new InsertTask(taskId, queue, config, connectInfo, rewriteRecordSql); insertTask.setWriterTask(ConcurrentTableWriterTask.this); insertTask.setWriter(this); insertTasks.add(insertTask); @@ -495,7 +431,7 @@ public void printStatistics() { public void addBatchRecords(final List records) throws InterruptedException { boolean isSucc = false; while (!isSucc) { - isSucc = queue.offer(records, 5, TimeUnit.SECONDS); + isSucc = queue.offer(records, 5, TimeUnit.MILLISECONDS); checkMemStore(); } totalTaskCount.incrementAndGet(); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java index 968908ca27..df80cf7ff2 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/InsertTask.java @@ -1,286 +1,204 @@ package com.alibaba.datax.plugin.writer.oceanbasev10writer.task; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.TimeUnit; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.AbstractConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo; import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask.ConcurrentTableWriter; import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; -public class InsertTask implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class); - - private ConcurrentTableWriterTask writerTask; - private ConcurrentTableWriter writer; - - private String writeRecordSql; - private long totalCost = 0; - private long insertCount = 0; - - private Queue> queue; - private boolean isStop; - private ConnHolder connHolder; - - private final long taskId; - private ServerConnectInfo connInfo; - - // 失败重试次数 - private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT; - private boolean printCost = Config.DEFAULT_PRINT_COST; - private long costBound = Config.DEFAULT_COST_BOUND; - private List> deleteMeta; - - public InsertTask( - final long taskId, - Queue> recordsQueue, - Configuration config, - ServerConnectInfo connectInfo, - String writeRecordSql, - List> deleteMeta) { - this.taskId = taskId; - this.queue = recordsQueue; - this.connInfo = connectInfo; - failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT); - printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST); - costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND); - this.connHolder = new ObClientConnHolder(config, connInfo.jdbcUrl, - connInfo.getFullUserName(), connInfo.password); - this.writeRecordSql = writeRecordSql; - this.isStop = false; - this.deleteMeta = deleteMeta; - connHolder.initConnection(); - } - - void setWriterTask(ConcurrentTableWriterTask writerTask) { - this.writerTask = writerTask; - } - - void setWriter(ConcurrentTableWriter writer) { - this.writer = writer; - } - - private boolean isStop() { return isStop; } - public void setStop() { isStop = true; } - public long getTotalCost() { return totalCost; } - public long getInsertCount() { return insertCount; } - - @Override - public void run() { - Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId())); - LOG.debug("Task {} start to execute...", taskId); - while (!isStop()) { - try { - List records = queue.poll(); - if (null != records) { - doMultiInsert(records, this.printCost, this.costBound); - - } else if (writerTask.isFinished()) { - writerTask.singalTaskFinish(); - LOG.debug("not more task, thread exist ..."); - break; - } else { - TimeUnit.MILLISECONDS.sleep(5); - } - } catch (InterruptedException e) { - LOG.debug("TableWriter is interrupt"); - } catch (Exception e) { - LOG.warn("ERROR UNEXPECTED {}", e); - } - } - LOG.debug("Thread exist..."); - } - - public void destroy() { - connHolder.destroy(); - }; - - public void calStatistic(final long cost) { - writer.increFinishCount(); - ++insertCount; - totalCost += cost; - if (this.printCost && cost > this.costBound) { - LOG.info("slow multi insert cost {}ms", cost); - } - } - - private void doDelete(Connection conn, final List buffer) throws SQLException { - if(deleteMeta == null || deleteMeta.size() == 0) { - return; - } - for (int i = 0; i < deleteMeta.size(); i++) { - String deleteSql = deleteMeta.get(i).getKey(); - int[] valueIdx = deleteMeta.get(i).getValue(); - PreparedStatement ps = null; - try { - ps = conn.prepareStatement(deleteSql); - StringBuilder builder = new StringBuilder(); - for (Record record : buffer) { - int bindIndex = 0; - for (int idx : valueIdx) { - writerTask.fillStatementIndex(ps, bindIndex++, idx, record.getColumn(idx)); - builder.append(record.getColumn(idx).asString()).append(","); - } - ps.addBatch(); - } - LOG.debug("delete values: " + builder.toString()); - ps.executeBatch(); - } catch (SQLException ex) { - LOG.error("SQL Exception when delete records with {}", deleteSql, ex); - throw ex; - } finally { - DBUtil.closeDBResources(ps, null); - } - } - } - - public void doMultiInsert(final List buffer, final boolean printCost, final long restrict) { - checkMemstore(); - Connection conn = connHolder.getConn(); - boolean success = false; - long cost = 0; - long startTime = 0; - try { - for (int i = 0; i < failTryCount; ++i) { - if (i > 0) { - try { - int sleep = i >= 9 ? 500 : 1 << i;//不明白为什么要sleep 500s - TimeUnit.SECONDS.sleep(sleep); - } catch (InterruptedException e) { - LOG.info("thread interrupted ..., ignore"); - } - conn = connHolder.getConn(); - LOG.info("retry {}, start do batch insert, size={}", i, buffer.size()); - checkMemstore(); - } - startTime = System.currentTimeMillis(); - PreparedStatement ps = null; - try { - conn.setAutoCommit(false); - - // do delete if necessary - doDelete(conn, buffer); - - ps = conn.prepareStatement(writeRecordSql); - for (Record record : buffer) { - ps = writerTask.fillStatement(ps, record); - ps.addBatch(); - } - ps.executeBatch(); - conn.commit(); - success = true; - cost = System.currentTimeMillis() - startTime; - calStatistic(cost); - break; - } catch (SQLException e) { - LOG.warn("Insert fatal error SqlState ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); - if (i == 0 || i > 10 ) { - for (Record record : buffer) { - LOG.warn("ERROR : record {}", record); - } - } - // 按照错误码分类,分情况处理 - // 如果是OB系统级异常,则需要重建连接 - boolean fatalFail = ObWriterUtils.isFatalError(e); - if (fatalFail) { - ObWriterUtils.sleep(300000); - connHolder.reconnect(); - // 如果是可恢复的异常,则重试 - } else if (ObWriterUtils.isRecoverableError(e)) { - conn.rollback(); - ObWriterUtils.sleep(60000); - } else {// 其它异常直接退出,采用逐条写入方式 - conn.rollback(); - ObWriterUtils.sleep(1000); - break; - } - } catch (Exception e) { - e.printStackTrace(); - LOG.warn("Insert error unexpected {}", e); - } finally { - DBUtil.closeDBResources(ps, null); - } - } - } catch (SQLException e) { - LOG.warn("ERROR:retry failSql State ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); - } +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - if (!success) { - try { - LOG.info("do one insert"); - conn = connHolder.reconnect(); - doOneInsert(conn, buffer); - cost = System.currentTimeMillis() - startTime; - calStatistic(cost); - } finally { - } - } - } +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; - // process one row, delete before insert - private void doOneInsert(Connection connection, List buffer) { - List deletePstmtList = new ArrayList(); - PreparedStatement preparedStatement = null; - try { - connection.setAutoCommit(false); - if (deleteMeta != null && deleteMeta.size() > 0) { - for (int i = 0; i < deleteMeta.size(); i++) { - String deleteSql = deleteMeta.get(i).getKey(); - deletePstmtList.add(connection.prepareStatement(deleteSql)); - } - } +public class InsertTask implements Runnable { - preparedStatement = connection.prepareStatement(this.writeRecordSql); - for (Record record : buffer) { - try { - for (int i = 0; i < deletePstmtList.size(); i++) { - PreparedStatement deleteStmt = deletePstmtList.get(i); - int[] valueIdx = deleteMeta.get(i).getValue(); - int bindIndex = 0; - for (int idx : valueIdx) { - writerTask.fillStatementIndex(deleteStmt, bindIndex++, idx, record.getColumn(idx)); - } - deleteStmt.execute(); - } - preparedStatement = writerTask.fillStatement(preparedStatement, record); - preparedStatement.execute(); - connection.commit(); - } catch (SQLException e) { - writerTask.collectDirtyRecord(record, e); - } finally { - // 此处不应该关闭statement,后续的数据还需要用到 - } - } - } catch (Exception e) { - throw DataXException.asDataXException( - DBUtilErrorCode.WRITE_DATA_ERROR, e); - } finally { - DBUtil.closeDBResources(preparedStatement, null); - for (PreparedStatement pstmt : deletePstmtList) { - DBUtil.closeDBResources(pstmt, null); - } - } - } + private static final Logger LOG = LoggerFactory.getLogger(InsertTask.class); - private void checkMemstore() { - while (writerTask.isMemStoreFull()) { - ObWriterUtils.sleep(30000); - } - } + private ConcurrentTableWriterTask writerTask; + private ConcurrentTableWriter writer; + + private String writeRecordSql; + private long totalCost = 0; + private long insertCount = 0; + + private BlockingQueue> queue; + private boolean isStop; + private AbstractConnHolder connHolder; + + private final long taskId; + private ServerConnectInfo connInfo; + + // 失败重试次数 + private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT; + private boolean printCost = Config.DEFAULT_PRINT_COST; + private long costBound = Config.DEFAULT_COST_BOUND; + + public InsertTask( + final long taskId, + BlockingQueue> recordsQueue, + Configuration config, + ServerConnectInfo connectInfo, + String writeRecordSql) { + this.taskId = taskId; + this.queue = recordsQueue; + this.connInfo = connectInfo; + failTryCount = config.getInt(Config.FAIL_TRY_COUNT, Config.DEFAULT_FAIL_TRY_COUNT); + printCost = config.getBool(Config.PRINT_COST, Config.DEFAULT_PRINT_COST); + costBound = config.getLong(Config.COST_BOUND, Config.DEFAULT_COST_BOUND); + this.connHolder = new ObClientConnHolder(config, connInfo.jdbcUrl, + connInfo.getFullUserName(), connInfo.password); + this.writeRecordSql = writeRecordSql; + this.isStop = false; + connHolder.initConnection(); + } + + void setWriterTask(ConcurrentTableWriterTask writerTask) { + this.writerTask = writerTask; + } + + void setWriter(ConcurrentTableWriter writer) { + this.writer = writer; + } + + private boolean isStop() { + return isStop; + } + + public void setStop() { + isStop = true; + } + + public long getTotalCost() { + return totalCost; + } + + public long getInsertCount() { + return insertCount; + } + + @Override + public void run() { + Thread.currentThread().setName(String.format("%d-insertTask-%d", taskId, Thread.currentThread().getId())); + LOG.debug("Task {} start to execute...", taskId); + while (!isStop()) { + try { + List records = queue.poll(5, TimeUnit.MILLISECONDS); + if (null != records) { + doMultiInsert(records, this.printCost, this.costBound); + } else if (writerTask.isFinished()) { + writerTask.singalTaskFinish(); + LOG.debug("not more task, thread exist ..."); + break; + } + } catch (InterruptedException e) { + LOG.debug("TableWriter is interrupt"); + } catch (Exception e) { + LOG.warn("ERROR UNEXPECTED ", e); + } + } + LOG.debug("Thread exist..."); + } + + public void destroy() { + connHolder.destroy(); + } + + public void calStatistic(final long cost) { + writer.increFinishCount(); + ++insertCount; + totalCost += cost; + if (this.printCost && cost > this.costBound) { + LOG.info("slow multi insert cost {}ms", cost); + } + } + + public void doMultiInsert(final List buffer, final boolean printCost, final long restrict) { + checkMemstore(); + Connection conn = connHolder.getConn(); + boolean success = false; + long cost = 0; + long startTime = 0; + try { + for (int i = 0; i < failTryCount; ++i) { + if (i > 0) { + conn = connHolder.getConn(); + LOG.info("retry {}, start do batch insert, size={}", i, buffer.size()); + checkMemstore(); + } + startTime = System.currentTimeMillis(); + PreparedStatement ps = null; + try { + conn.setAutoCommit(false); + ps = conn.prepareStatement(writeRecordSql); + for (Record record : buffer) { + ps = writerTask.fillStatement(ps, record); + ps.addBatch(); + } + ps.executeBatch(); + conn.commit(); + success = true; + cost = System.currentTimeMillis() - startTime; + calStatistic(cost); + break; + } catch (SQLException e) { + LOG.warn("Insert fatal error SqlState ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); + if (LOG.isDebugEnabled() && (i == 0 || i > 10)) { + for (Record record : buffer) { + LOG.warn("ERROR : record {}", record); + } + } + // 按照错误码分类,分情况处理 + // 如果是OB系统级异常,则需要重建连接 + boolean fatalFail = ObWriterUtils.isFatalError(e); + if (fatalFail) { + ObWriterUtils.sleep(300000); + connHolder.reconnect(); + // 如果是可恢复的异常,则重试 + } else if (ObWriterUtils.isRecoverableError(e)) { + conn.rollback(); + ObWriterUtils.sleep(60000); + } else {// 其它异常直接退出,采用逐条写入方式 + conn.rollback(); + ObWriterUtils.sleep(1000); + break; + } + } catch (Exception e) { + e.printStackTrace(); + LOG.warn("Insert error unexpected {}", e); + } finally { + DBUtil.closeDBResources(ps, null); + } + } + } catch (SQLException e) { + LOG.warn("ERROR:retry failSql State ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); + } + + if (!success) { + LOG.info("do one insert"); + conn = connHolder.reconnect(); + writerTask.doOneInsert(conn, buffer); + cost = System.currentTimeMillis() - startTime; + calStatistic(cost); + } + } + + private void checkMemstore() { + if (writerTask.isShouldSlow()) { + ObWriterUtils.sleep(100); + } else { + while (writerTask.isShouldPause()) { + ObWriterUtils.sleep(100); + } + } + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java index 637a3be410..d2f42de557 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/SingleTableWriterTask.java @@ -12,7 +12,7 @@ import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; import com.alibaba.datax.plugin.rdbms.writer.Key; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; -import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder; +import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.AbstractConnHolder; import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder; import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils; @@ -30,7 +30,7 @@ public class SingleTableWriterTask extends CommonRdbmsWriter.Task { // 失败重试次数 private int failTryCount = Config.DEFAULT_FAIL_TRY_COUNT; - private ConnHolder connHolder; + private AbstractConnHolder connHolder; private String obWriteMode = "update"; private boolean isOracleCompatibleMode = false; private String obUpdateColumns = null; diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java index ec26e788f2..adffc6f76f 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/DbUtils.java @@ -66,4 +66,48 @@ public static String fetchSingleValueWithRetry(Configuration config, String quer return value; } + + /** + * build sys connection from ordinary jdbc url + * + * @param jdbcUrl + * @param clusterName + * @return + * @throws Exception + */ + public static Connection buildSysConn(String jdbcUrl, String clusterName) throws Exception { + jdbcUrl = jdbcUrl.replace("jdbc:mysql://", "jdbc:oceanbase://"); + int startIdx = jdbcUrl.indexOf('/', "jdbc:oceanbase://".length()); + int endIdx = jdbcUrl.lastIndexOf('?'); + String prefix = jdbcUrl.substring(0, startIdx + 1); + final String postfix = jdbcUrl.substring(endIdx); + String sysJDBCUrl = prefix + "oceanbase" + postfix; + + String tenantName = "sys"; + String[][] userConfigs = { + {"monitor", "monitor"} + }; + + Connection conn = null; + for (String[] userConfig : userConfigs) { + try { + conn = DBUtil.getConnectionWithoutRetry(DataBaseType.OceanBase, sysJDBCUrl, String.format("%s@%s#%s", userConfig[0], + tenantName, clusterName), userConfig[1]); + } catch (Exception e) { + LOG.warn("fail connecting to ob: " + e.getMessage()); + + } + if (conn == null) { + LOG.warn("fail to get connection with user " + userConfig[0] + ", try alternative user."); + } else { + break; + } + } + + if (conn == null) { + throw new Exception("fail to get connection with sys tenant."); + } + + return conn; + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index 037e4ce519..a5d6b0eae8 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -4,6 +4,7 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter.Task; import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -12,6 +13,7 @@ import java.sql.*; import java.util.*; +import static com.alibaba.datax.plugin.writer.oceanbasev10writer.Config.DEFAULT_SLOW_MEMSTORE_THRESHOLD; public class ObWriterUtils { @@ -21,6 +23,9 @@ public class ObWriterUtils { private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?"; private static final String CHECK_MEMSTORE_4_0 = "select 1 from %s.gv$ob_memstore t where t.MEMSTORE_USED>t.MEMSTORE_LIMIT * ?"; + private static String CHECK_MEMSTORE_RATIO = "select min(t.total/t.mem_limit) from %s.gv$memstore t"; + private static final String CHECK_MEMSTORE_RATIO_4_0 = "select min(t.MEMSTORE_USED/t.MEMSTORE_LIMIT) from %s.gv$ob_memstore t"; + private static Set databaseKeywords; private static String compatibleMode = null; private static String obVersion = null; @@ -81,6 +86,30 @@ public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) return result; } + public static double queryMemUsedRatio (Connection conn) { + PreparedStatement ps = null; + ResultSet rs = null; + double result = 0; + try { + String sysDbName = "oceanbase"; + if (isOracleMode()) { + sysDbName = "sys"; + } + ps = conn.prepareStatement(String.format(getMemStoreRatioSql(), sysDbName)); + rs = ps.executeQuery(); + // 只要有满足条件的,则表示当前租户 有个机器的memstore即将满 + if (rs.next()) { + result = rs.getDouble(1); + } + } catch (Throwable e) { + LOG.warn("Check memstore fail, reason: {}. Use a random value instead.", e.getMessage()); + result = RandomUtils.nextDouble(0.3D, DEFAULT_SLOW_MEMSTORE_THRESHOLD + 0.2D); + } finally { + //do not need to close the statment in ob1.0 + } + return result; + } + public static boolean isOracleMode(){ return (compatibleMode.equals(Config.OB_COMPATIBLE_MODE_ORACLE)); } @@ -93,6 +122,14 @@ private static String getMemStoreSql() { } } + private static String getMemStoreRatioSql() { + if (ObVersion.valueOf(obVersion).compareTo(ObVersion.V4000) >= 0) { + return CHECK_MEMSTORE_RATIO_4_0; + } else { + return CHECK_MEMSTORE_RATIO; + } + } + public static String getCompatibleMode() { return compatibleMode; } @@ -181,7 +218,7 @@ private static Map> getAllUniqueIndex(Connection conn, Stri } List s = uniqueKeys.get(keyName); if (s == null) { - s = new ArrayList(); + s = new ArrayList<>(); uniqueKeys.put(keyName, s); } s.add(columnName); @@ -253,7 +290,7 @@ private static Set getSkipColumns(Connection conn, String tableName) { String columnName = StringUtils.upperCase(rs.getString("Column_name")); Set s = uniqueKeys.get(keyName); if (s == null) { - s = new HashSet(); + s = new HashSet<>(); uniqueKeys.put(keyName, s); } s.add(columnName); @@ -415,7 +452,7 @@ public static boolean isRecoverableError(SQLException e) { private static Set white = new HashSet(); static { - int[] errList = { 1213, 1047, 1041, 1094, 4000, 4012 }; + int[] errList = { 1213, 1047, 1041, 1094, 4000, 4012, 4013 }; for (int err : errList) { white.add(err); } @@ -445,4 +482,26 @@ public void run() { t.setDaemon(true); t.start(); } + + /** + * + */ + public static enum LoadMode { + + /** + * Fast insert + */ + FAST, + + /** + * Insert slowly + */ + SLOW, + + /** + * Pause to insert + */ + PAUSE + } + } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index bec3c683f6..7b84c32088 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -12,6 +12,7 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil; import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; @@ -199,6 +200,9 @@ public static class Task { protected boolean emptyAsNull; protected Triple, List, List> resultSetMetaData; + private int dumpRecordLimit = Constant.DEFAULT_DUMP_RECORD_LIMIT; + private AtomicLong dumpRecordCount = new AtomicLong(0); + public Task(DataBaseType dataBaseType) { this.dataBaseType = dataBaseType; } @@ -209,7 +213,7 @@ public void init(Configuration writerSliceConfig) { this.jdbcUrl = writerSliceConfig.getString(Key.JDBC_URL); //ob10的处理 - if (this.jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING) && this.dataBaseType == DataBaseType.MySql) { + if (this.jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { String[] ss = this.jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); if (ss.length != 3) { throw DataXException @@ -368,7 +372,11 @@ protected void doBatchInsert(Connection connection, List buffer) } } - protected void doOneInsert(Connection connection, List buffer) { + public boolean needToDumpRecord() { + return dumpRecordCount.incrementAndGet() <= dumpRecordLimit; + } + + public void doOneInsert(Connection connection, List buffer) { PreparedStatement preparedStatement = null; try { connection.setAutoCommit(true); @@ -381,7 +389,10 @@ protected void doOneInsert(Connection connection, List buffer) { preparedStatement, record); preparedStatement.execute(); } catch (SQLException e) { - LOG.debug(e.toString()); + if (needToDumpRecord()) { + LOG.warn("ERROR : record {}", record); + LOG.warn("Insert fatal error SqlState ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); + } this.taskPluginCollector.collectDirtyRecord(record, e); } finally { diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java index 0e4692e2c8..9510fd14ef 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java @@ -19,4 +19,5 @@ public final class Constant { public static final String OB10_SPLIT_STRING = "||_dsc_ob10_dsc_||"; public static final String OB10_SPLIT_STRING_PATTERN = "\\|\\|_dsc_ob10_dsc_\\|\\|"; + public static final int DEFAULT_DUMP_RECORD_LIMIT = 10; } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java index c86bd20650..6bfc1bb9e2 100644 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java @@ -1,11 +1,11 @@ package com.alibaba.datax.plugin.unstructuredstorage.reader; +import com.alibaba.fastjson2.JSON; +import org.apache.commons.lang3.StringUtils; + import java.text.DateFormat; import java.text.SimpleDateFormat; -import org.apache.commons.lang3.StringUtils; - -import com.alibaba.fastjson2.JSON; public class ColumnEntry { private Integer index; @@ -13,6 +13,15 @@ public class ColumnEntry { private String value; private String format; private DateFormat dateParse; + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } public Integer getIndex() { return index; diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java index 71e13ad244..0945779b17 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java @@ -87,4 +87,7 @@ public class Key { public static final String TAR_FILE_FILTER_PATTERN = "tarFileFilterPattern"; public static final String ENABLE_INNER_SPLIT = "enableInnerSplit"; + public static final String HIVE_PARTION_COLUMN = "hivePartitionColumn"; + + } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java index afcad85132..27f4c48ac4 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java @@ -715,4 +715,70 @@ public static void setSourceFileName(Configuration configuration, List s public static void setSourceFile(Configuration configuration, List sourceFiles){ configuration.set(Constant.SOURCE_FILE, sourceFiles); } + + public static ArrayList getHivePartitionColumns(String filePath, List hivePartitionColumnEntrys) { + ArrayList hivePartitionColumns = new ArrayList<>(); + + if (null == hivePartitionColumnEntrys) { + return hivePartitionColumns; + } + + // 对于分区列pt,则从path中找/pt=xxx/,xxx即分区列的值,另外确认在path中只有一次出现 + + for (ColumnEntry columnEntry : hivePartitionColumnEntrys) { + String parColName = columnEntry.getValue(); + String patten = String.format("/%s=", parColName); + int index = filePath.indexOf(patten); + if (index != filePath.lastIndexOf(patten)) { + throw new DataXException(String.format("Found multiple partition folder in filePath %s, partition: %s", filePath, parColName)); + } + + String subPath = filePath.substring(index + 1); + int firstSeparatorIndex = subPath.indexOf(File.separator); + if (firstSeparatorIndex > 0) { + subPath = subPath.substring(0, firstSeparatorIndex); + } + + if (subPath.split("=").length != 2) { + throw new DataXException(String.format("Found partition column value in filePath %s failed, partition: %s", filePath, parColName)); + } + String parColVal = subPath.split("=")[1]; + + String colType = columnEntry.getType().toUpperCase(); + Type type = Type.valueOf(colType); + + Column generateColumn; + switch (type) { + case STRING: + generateColumn = new StringColumn(parColVal); + break; + + case DOUBLE: + generateColumn = new DoubleColumn(parColVal); + break; + + case LONG: + generateColumn = new LongColumn(parColVal); + break; + + case BOOLEAN: + generateColumn = new BoolColumn(parColVal); + break; + + case DATE: + generateColumn = new DateColumn(new StringColumn(parColVal.toString()).asDate()); + break; + + default: + String errorMessage = String.format("The column type you configured is not currently supported: %s", parColVal); + LOG.error(errorMessage); + throw DataXException.asDataXException(UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage); + } + + hivePartitionColumns.add(generateColumn); + } + + return hivePartitionColumns; + } + } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java index 092fbfd7c8..a485c1249b 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java @@ -12,9 +12,13 @@ public class Constant { public static final String FILE_FORMAT_TEXT = "text"; + public static final String FILE_FORMAT_SQL = "sql"; + //每个分块10MB,最大10000个分块, MAX_FILE_SIZE 单位: MB public static final Long MAX_FILE_SIZE = 10 * 10000L; + public static final int DEFAULT_COMMIT_SIZE = 2000; + public static final String DEFAULT_SUFFIX = ""; public static final String TRUNCATE = "truncate"; diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java index 125957f189..ee97abd86d 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java @@ -5,12 +5,16 @@ public class Key { // must have public static final String FILE_NAME = "fileName"; + public static final String TABLE_NAME = "table"; + // must have public static final String WRITE_MODE = "writeMode"; // not must , not default , public static final String FIELD_DELIMITER = "fieldDelimiter"; + public static final String QUOTE_CHARACTER = "quoteChar"; + // not must , default os's line delimiter public static final String LINE_DELIMITER = "lineDelimiter"; @@ -38,6 +42,8 @@ public class Key { // writer maxFileSize public static final String MAX_FILE_SIZE = "maxFileSize"; + + public static final String COMMIT_SIZE = "commitSize"; // writer file type suffix, like .txt .csv public static final String SUFFIX = "suffix"; diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/SqlWriter.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/SqlWriter.java new file mode 100644 index 0000000000..4d6ff713d3 --- /dev/null +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/SqlWriter.java @@ -0,0 +1,69 @@ +package com.alibaba.datax.plugin.unstructuredstorage.writer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Writer; +import java.util.List; +import java.util.stream.Collectors; + +public class SqlWriter implements UnstructuredWriter { + private static final Logger LOG = LoggerFactory.getLogger(SqlWriter.class); + + private Writer sqlWriter; + private String quoteChar; + private String lineSeparator; + private String tableName; + private StringBuilder insertPrefix; + + public SqlWriter(Writer writer, String quoteChar, String tableName, String lineSeparator, List columnNames) { + this.sqlWriter = writer; + this.quoteChar = quoteChar; + this.lineSeparator = lineSeparator; + this.tableName = tableName; + buildInsertPrefix(columnNames); + } + + @Override + public void writeOneRecord(List splitedRows) throws IOException { + if (splitedRows.isEmpty()) { + LOG.info("Found one record line which is empty."); + return; + } + + StringBuilder sqlPatten = new StringBuilder(4096).append(insertPrefix); + sqlPatten.append(splitedRows.stream().map(e -> "'" + DataXCsvWriter.replace(e, "'", "''") + "'").collect(Collectors.joining(","))); + sqlPatten.append(");").append(lineSeparator); + this.sqlWriter.write(sqlPatten.toString()); + } + + private void buildInsertPrefix(List columnNames) { + StringBuilder sb = new StringBuilder(columnNames.size() * 32); + + for (String columnName : columnNames) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(quoteChar).append(columnName).append(quoteChar); + } + + int capacity = 16 + tableName.length() + sb.length(); + this.insertPrefix = new StringBuilder(capacity); + this.insertPrefix.append("INSERT INTO ").append(tableName).append(" (").append(sb).append(")").append(" VALUES("); + } + + public void appendCommit() throws IOException { + this.sqlWriter.write("commit;" + lineSeparator); + } + + @Override + public void flush() throws IOException { + this.sqlWriter.flush(); + } + + @Override + public void close() throws IOException { + this.sqlWriter.close(); + } +} diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java index e9040662ab..4ce6461c03 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java @@ -10,7 +10,10 @@ import java.util.UUID; import com.alibaba.datax.common.element.BytesColumn; + +import com.google.common.base.Preconditions; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.compressors.CompressorOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; @@ -90,7 +93,8 @@ public static void validateParameter(Configuration writerConfiguration) { writerConfiguration.set(Key.FILE_FORMAT, fileFormat); } if (!Constant.FILE_FORMAT_CSV.equals(fileFormat) - && !Constant.FILE_FORMAT_TEXT.equals(fileFormat)) { + && !Constant.FILE_FORMAT_TEXT.equals(fileFormat) + && !Constant.FILE_FORMAT_SQL.equals(fileFormat)) { throw DataXException.asDataXException( UnstructuredStorageWriterErrorCode.ILLEGAL_VALUE, String.format("unsupported fileFormat %s ", fileFormat)); } @@ -232,22 +236,31 @@ private static void doWriteToStream(RecordReceiver lineReceiver, // warn: default false String fileFormat = config.getString(Key.FILE_FORMAT, Constant.FILE_FORMAT_TEXT); - + boolean isSqlFormat = Constant.FILE_FORMAT_SQL.equalsIgnoreCase(fileFormat); + int commitSize = config.getInt(Key.COMMIT_SIZE, Constant.DEFAULT_COMMIT_SIZE); UnstructuredWriter unstructuredWriter = produceUnstructuredWriter(fileFormat, config, writer); List headers = config.getList(Key.HEADER, String.class); - if (null != headers && !headers.isEmpty()) { + if (null != headers && !headers.isEmpty() && !isSqlFormat) { unstructuredWriter.writeOneRecord(headers); } Record record = null; + int receivedCount = 0; String byteEncoding = config.getString(Key.BYTE_ENCODING); while ((record = lineReceiver.getFromReader()) != null) { UnstructuredStorageWriterUtil.transportOneRecord(record, nullFormat, dateParse, taskPluginCollector, unstructuredWriter, byteEncoding); + receivedCount++; + if (isSqlFormat && receivedCount % commitSize == 0) { + ((SqlWriter) unstructuredWriter).appendCommit(); + } } + if (isSqlFormat) { + ((SqlWriter)unstructuredWriter).appendCommit(); + } // warn:由调用方控制流的关闭 // IOUtils.closeQuietly(unstructuredWriter); } @@ -262,6 +275,15 @@ public static UnstructuredWriter produceUnstructuredWriter(String fileFormat, Co String fieldDelimiter = config.getString(Key.FIELD_DELIMITER, String.valueOf(Constant.DEFAULT_FIELD_DELIMITER)); unstructuredWriter = TextCsvWriterManager.produceTextWriter(writer, fieldDelimiter, config); + } else if (StringUtils.equalsIgnoreCase(fileFormat, Constant.FILE_FORMAT_SQL)) { + String tableName = config.getString(Key.TABLE_NAME); + Preconditions.checkArgument(StringUtils.isNotEmpty(tableName), "table name is empty"); + String quoteChar = config.getString(Key.QUOTE_CHARACTER); + Preconditions.checkArgument(StringUtils.isNotEmpty(quoteChar), "quote character is empty"); + String lineSeparator = config.getString(Key.LINE_DELIMITER, IOUtils.LINE_SEPARATOR); + List headers = config.getList(Key.HEADER, String.class); + Preconditions.checkArgument(CollectionUtils.isNotEmpty(headers), "column names are empty"); + unstructuredWriter = new SqlWriter(writer, quoteChar, tableName, lineSeparator, headers); } return unstructuredWriter; diff --git a/userGuid.md b/userGuid.md index ff3f93b3c7..badb1b4e75 100644 --- a/userGuid.md +++ b/userGuid.md @@ -17,7 +17,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 * 工具部署 - * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz) + * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz) 下载后解压至本地某个目录,进入bin目录,即可运行同步作业: