diff --git a/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java b/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java index b291d34e8..620c81b18 100644 --- a/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java +++ b/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java @@ -38,7 +38,8 @@ public enum FileFormat { ORC(false, true), PARQUET(true, true), TEXT(true, false), - TSV(true, true); + TSV(true, true), + XLS(true,false); private final boolean canRead; private final boolean canWrite; diff --git a/format-xls/pom.xml b/format-xls/pom.xml new file mode 100644 index 000000000..643213d6e --- /dev/null +++ b/format-xls/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + io.cdap.plugin + hydrator-plugins + 2.13.0-SNAPSHOT + + format-xls + XLS format plugins + jar + + 5.2.4 + + + + io.cdap.cdap + cdap-etl-api + + + io.cdap.cdap + cdap-formats + + + io.cdap.cdap + hydrator-test + + + io.cdap.plugin + format-common + ${project.version} + + + org.apache.poi + poi + ${poi.version} + + + org.apache.poi + poi-ooxml + ${poi.version} + + + + junit + junit + + + + + + + + org.apache.felix + maven-bundle-plugin + + + <_exportcontents> + io.cdap.plugin.format.xls.* + + *;inline=false;scope=compile + true + lib + + + + + io.cdap + cdap-maven-plugin + + + + + diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java new file mode 100644 index 000000000..fa27ccfe2 --- /dev/null +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java @@ -0,0 +1,203 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.CellType; +import org.apache.poi.ss.usermodel.DataFormatter; +import org.apache.poi.ss.usermodel.FormulaEvaluator; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; +import org.apache.poi.ss.usermodel.WorkbookFactory; + +import java.io.IOException; +import java.util.List; + + +/** + * {@link XlsInputFormat} is {@link TextInputFormat} implementation for reading Excel files. + *

+ * The {@link XlsInputFormat.XlsRecordReader} reads a given sheet, and within a sheet reads + * all columns and all rows. + */ +public class XlsInputFormat extends CombineFileInputFormat { + + public static final String SHEET_NO = "Sheet Number"; + public static final String SHEET_VALUE = "sheetValue"; + public static final String NAME_SKIP_HEADER = "skipHeader"; + public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow"; + + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + return new XlsRecordReader(); + } + + /** + * Reads excel spread sheet, where the keys are the offset in the excel file and the text is the complete record. + */ + public static class XlsRecordReader extends RecordReader { + // DataFormatter to format and get each cell's value as String + XlsInputFormatDataFormatter formatter; + FormulaEvaluator formulaEvaluator; + // Map key that represents the row index. + private LongWritable key; + // Map value that represents an excel row + private StructuredRecord value; + private Sheet workSheet; + // InputStream handler for Excel files. + private FSDataInputStream fileIn; + // Specifies the row index. + private int rowIndex; + // Specifies last row num. + private int lastRowNum; + private Schema outputSchema; + private boolean terminateIfEmptyRow; + private boolean isRowNull; + + @Override + public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { + + CombineFileSplit split = (CombineFileSplit) genericSplit; + Configuration jobConf = context.getConfiguration(); + // Path of input file. + Path file = split.getPath(0); + String schema = context.getConfiguration().get("schema"); + outputSchema = schema != null ? Schema.parseJson(schema) : null; + FileSystem fs = file.getFileSystem(jobConf); + fileIn = fs.open(split.getPath(0)); + + String sheet = jobConf.get(SHEET_NO); + String sheetValue = jobConf.get(SHEET_VALUE, "0"); + terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false); + + try (Workbook workbook = WorkbookFactory.create(fileIn)) { + formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); + formulaEvaluator.setIgnoreMissingWorkbooks(true); + formatter = new XlsInputFormatDataFormatter(formulaEvaluator); + // Check if user wants to access with name or number + if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) { + workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue)); + } else { + workSheet = workbook.getSheet(sheetValue); + } + } catch (Exception e) { + throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e); + } + + lastRowNum = workSheet.getLastRowNum(); + rowIndex = 0; + isRowNull = false; + + boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, true); + if (skipFirstRow) { + Preconditions.checkArgument(lastRowNum != -1, "No rows found on sheet %s", sheetValue); + rowIndex = 1; + } + } + + @Override + public boolean nextKeyValue() { + // If any is true, then we stop processing. + if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) { + return false; + } + // Get the next row. + Row row = workSheet.getRow(rowIndex); + if (row == null) { + // Row is empty, stop processing if terminateIfEmptyRow is true. + if (terminateIfEmptyRow) { + return false; + } else { + // set empty row in the output + value = StructuredRecord.builder(outputSchema).build(); + rowIndex++; + return true; + } + } + key = new LongWritable(rowIndex); + + StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema); + List fields = outputSchema.getFields(); + + isRowNull = true; + for (int cellIndex = 0; cellIndex < row.getLastCellNum(); cellIndex++) { + if (cellIndex >= fields.size()) { + throw new IllegalArgumentException( + String.format("Schema contains less fields than the number of columns in the excel file. " + + "Schema fields: %s, Excel columns: %s", fields.size(), row.getLastCellNum())); + } + Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL); + if (cell == null) { + // Blank cells are skipped, builder will set null for the field, no processing needed. + continue; + } + Schema.Field field = fields.get(cellIndex); + Schema.Type type = field.getSchema().isNullable() ? + field.getSchema().getNonNullable().getType() : field.getSchema().getType(); + String result = formatter.formatCellValue(cell, type); + if (result == null) { + continue; + } + isRowNull = false; + builder.convertAndSet(field.getName(), result); + } + value = builder.build(); + rowIndex++; + + // Stop processing if the row is null and terminateIfEmptyRow is true. + return !isRowNull || !terminateIfEmptyRow; + } + + @Override + public float getProgress() { + return (float) rowIndex / lastRowNum; + } + + @Override + public void close() throws IOException { + if (fileIn != null) { + fileIn.close(); + } + } + + @Override + public LongWritable getCurrentKey() { + return key; + } + + @Override + public StructuredRecord getCurrentValue() { + return value; + } + } +} diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java new file mode 100644 index 000000000..4aa9ace02 --- /dev/null +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java @@ -0,0 +1,208 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.api.plugin.PluginPropertyField; +import io.cdap.plugin.common.KeyValueListParser; +import io.cdap.plugin.format.input.PathTrackingConfig; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Common config for Excel related formats. + */ +public class XlsInputFormatConfig extends PathTrackingConfig { + public static final String SHEET_NUMBER = "Sheet Number"; + private static final String NAME_OVERRIDE = "override"; + private static final String NAME_SHEET = "sheet"; + public static final String NAME_SHEET_VALUE = "sheetValue"; + private static final String NAME_SKIP_HEADER = "skipHeader"; + private static final String NAME_TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow"; + + // properties + public static final String NAME_SAMPLE_SIZE = "sampleSize"; + + public static final String DESC_SKIP_HEADER = + "Whether to skip the first line of each file. The default value is false."; + public static final String DESC_SHEET = "Select the sheet by name or number. Default is 'Sheet Number'."; + public static final String DESC_SHEET_VALUE = "Specifies the value corresponding to 'sheet' input. " + + "Can be either sheet name or sheet no; for example: 'Sheet1' or '0' in case user selects 'Sheet Name' or " + + "'Sheet Number' as 'sheet' input respectively. Sheet number starts with 0."; + public static final String DESC_TERMINATE_ROW = "Specify whether processing needs to be terminated in case an" + + " empty row is encountered while processing excel files. Default value is false."; + public static final Map XLS_FIELDS; + + static { + Map fields = new HashMap<>(FIELDS); + fields.put(NAME_SKIP_HEADER, + new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true)); + // Add fields specific for excel format handling. + fields.put(NAME_SHEET, new PluginPropertyField(NAME_SHEET, DESC_SHEET, "string", false, true)); + fields.put(NAME_SHEET_VALUE, new PluginPropertyField(NAME_SHEET_VALUE, DESC_SHEET_VALUE, "string", false, true)); + fields.put(NAME_TERMINATE_IF_EMPTY_ROW, new PluginPropertyField( + NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true)); + XLS_FIELDS = Collections.unmodifiableMap(fields); + } + + @Macro + @Nullable + @Name(NAME_SHEET) + @Description(DESC_SHEET) + private String sheet; + + @Macro + @Nullable + @Name(NAME_SHEET_VALUE) + @Description(DESC_SHEET_VALUE) + private String sheetValue; + + + @Macro + @Nullable + @Name(NAME_SKIP_HEADER) + @Description(DESC_SKIP_HEADER) + private Boolean skipHeader; + + @Macro + @Nullable + @Name(NAME_TERMINATE_IF_EMPTY_ROW) + @Description(DESC_TERMINATE_ROW) + private Boolean terminateIfEmptyRow; + + public XlsInputFormatConfig() { + super(); + } + + @VisibleForTesting + public XlsInputFormatConfig(@Nullable String schema, @Nullable String sheet, @Nullable String sheetValue, + @Nullable Boolean skipHeader, @Nullable Boolean terminateIfEmptyRow) { + super(); + this.schema = schema; + this.sheet = sheet; + this.sheetValue = sheetValue; + this.skipHeader = skipHeader; + this.terminateIfEmptyRow = terminateIfEmptyRow; + } + + public long getSampleSize() { + return Long.parseLong(getProperties().getProperties().getOrDefault(NAME_SAMPLE_SIZE, "1000")); + } + + public String getSheet() { + return sheet == null ? SHEET_NUMBER : sheet; + } + + @Nullable + public String getSheetValue() { + return sheetValue; + } + + public boolean getSkipHeader() { + return skipHeader != null ? skipHeader : false; + } + + public boolean getTerminateIfEmptyRow() { + return terminateIfEmptyRow != null ? terminateIfEmptyRow : false; + } + + /** + * Parses a list of key-value items of column names and their corresponding data types, manually set by the user. + * + * @return A hashmap of column names and their manually set schemas. + */ + public Map getOverride() throws IllegalArgumentException { + String override = getProperties().getProperties().get(NAME_OVERRIDE); + Map overrideDataTypes = new HashMap<>(); + KeyValueListParser kvParser = new KeyValueListParser("\\s*,\\s*", ":"); + if (!Strings.isNullOrEmpty(override)) { + for (KeyValue keyVal : kvParser.parse(override)) { + String name = keyVal.getKey(); + String stringDataType = keyVal.getValue(); + + Schema schema; + switch (stringDataType) { + case "date": + schema = Schema.of(Schema.LogicalType.DATE); + break; + case "time": + schema = Schema.of(Schema.LogicalType.TIME_MICROS); + break; + case "timestamp": + schema = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + break; + default: + schema = Schema.of(Schema.Type.valueOf(stringDataType.toUpperCase())); + } + + if (overrideDataTypes.containsKey(name)) { + throw new IllegalArgumentException(String.format("Cannot convert '%s' to multiple types.", name)); + } + overrideDataTypes.put(name, schema); + } + } + return overrideDataTypes; + } + + public static Builder builder() { + return new Builder(); + } + public static class Builder { + private String schema; + private String sheet; + private String sheetValue; + private Boolean skipHeader; + private Boolean terminateIfEmptyRow; + + public Builder setSchema(String schema) { + this.schema = schema; + return this; + } + + public Builder setSheet(String sheet) { + this.sheet = sheet; + return this; + } + + public Builder setSheetValue(String sheetValue) { + this.sheetValue = sheetValue; + return this; + } + public Builder setSkipHeader(Boolean skipHeader) { + this.skipHeader = skipHeader; + return this; + } + + public Builder setTerminateIfEmptyRow(Boolean terminateIfEmptyRow) { + this.terminateIfEmptyRow = terminateIfEmptyRow; + return this; + } + public XlsInputFormatConfig build() { + return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow); + } + } + +} diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatDataFormatter.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatDataFormatter.java new file mode 100644 index 000000000..4bfed1778 --- /dev/null +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatDataFormatter.java @@ -0,0 +1,86 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import io.cdap.cdap.api.data.schema.Schema; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.CellType; +import org.apache.poi.ss.usermodel.DataFormatter; +import org.apache.poi.ss.usermodel.DateUtil; +import org.apache.poi.ss.usermodel.FormulaEvaluator; + +/** + * Formats the cell value of an Excel file. + */ +public class XlsInputFormatDataFormatter { + private static final DataFormatter dataFormatter = new DataFormatter(); + private final FormulaEvaluator evaluator; + + /** + * Constructor for XlsInputFormatDataFormatter. + * + * @param evaluator the formula evaluator + */ + public XlsInputFormatDataFormatter(FormulaEvaluator evaluator) { + this.evaluator = evaluator; + } + + /** + * Formats the cell value of an Excel file. + * + * @param cell the cell to format + * @param type the schema type of the cell + * @return the formatted cell value + */ + public String formatCellValue(Cell cell, Schema.Type type) { + if (cell == null) { + return null; + } + + CellType cellType = cell.getCellType(); + if (cellType == CellType.FORMULA) { + try { + cellType = cell.getCachedFormulaResultType(); + } catch (Exception e) { + cellType = evaluator.evaluateFormulaCell(cell); + } + } + + switch (cellType) { + case NUMERIC: + if (DateUtil.isCellDateFormatted(cell)) { + return dataFormatter.formatCellValue(cell); + } + return Double.toString(cell.getNumericCellValue()); + case STRING: + if (type == Schema.Type.DOUBLE) { + // Edge case when schema was inferred as double but the cell is actually a string + // this can be caused by an error formula cell, as the error value is stored as a string + return null; + } + return cell.getRichStringCellValue().getString(); + case BOOLEAN: + return cell.getBooleanCellValue() ? "TRUE" : "FALSE"; + case BLANK: + case ERROR: + return null; + default: + throw new IllegalStateException( + String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType)); + } + } +} diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java new file mode 100644 index 000000000..42e64285a --- /dev/null +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java @@ -0,0 +1,192 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.plugin.PluginClass; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.validation.FormatContext; +import io.cdap.cdap.etl.api.validation.InputFile; +import io.cdap.cdap.etl.api.validation.InputFiles; +import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; +import io.cdap.plugin.format.input.PathTrackingConfig; +import io.cdap.plugin.format.input.PathTrackingInputFormatProvider; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.DataFormatter; +import org.apache.poi.ss.usermodel.FormulaEvaluator; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; +import org.apache.poi.ss.usermodel.WorkbookFactory; +import org.apache.poi.ss.util.CellReference; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; + + +/** + * Reads XLS(X) into StructuredRecords. + */ +@Plugin(type = ValidatingInputFormat.PLUGIN_TYPE) +@Name(XlsInputFormatProvider.NAME) +@Description(XlsInputFormatProvider.DESC) +public class XlsInputFormatProvider extends PathTrackingInputFormatProvider { + static final String NAME = "xls"; + static final String DESC = "Plugin for reading files in xls(x) format."; + public static final PluginClass PLUGIN_CLASS = PluginClass.builder() + .setType(ValidatingInputFormat.PLUGIN_TYPE) + .setName(NAME) + .setDescription(DESC) + .setClassName(XlsInputFormatProvider.class.getName()) + .setConfigFieldName("conf") + .setProperties(XlsInputFormatConfig.XLS_FIELDS) + .build(); + private final XlsInputFormatConfig conf; + + public XlsInputFormatProvider(XlsInputFormatConfig conf) { + super(conf); + this.conf = conf; + } + + @Override + public String getInputFormatClassName() { + return XlsInputFormat.class.getName(); + } + + @Override + public void validate(FormatContext context) { + Schema schema = super.getSchema(context); + FailureCollector collector = context.getFailureCollector(); + // When the sheet is specified by number, the sheet value must be a number + if (!conf.containsMacro(XlsInputFormatConfig.NAME_SHEET_VALUE) + && conf.getSheet().equals(XlsInputFormatConfig.SHEET_NUMBER) + && (conf.getSheetValue() == null || !conf.getSheetValue().matches("[0-9]+"))) { + collector.addFailure("Sheet number must be a number.", null) + .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE); + } + if (!conf.containsMacro(PathTrackingConfig.NAME_SCHEMA) && schema == null && context.getInputSchema() == null) { + collector.addFailure("XLS format cannot be used without specifying a schema.", "Schema must be specified.") + .withConfigProperty(PathTrackingConfig.NAME_SCHEMA); + } + } + + @Override + protected void addFormatProperties(Map properties) { + properties.put(XlsInputFormat.SHEET_NO, conf.getSheet()); + properties.put(XlsInputFormat.SHEET_VALUE, conf.getSheetValue()); + properties.put(XlsInputFormat.NAME_SKIP_HEADER, String.valueOf(conf.getSkipHeader())); + properties.put(XlsInputFormat.TERMINATE_IF_EMPTY_ROW, String.valueOf(conf.getTerminateIfEmptyRow())); + properties.put(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE)); + } + + @Override + @Nullable + public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws IOException { + String blankHeader = "BLANK"; + FailureCollector failureCollector = context.getFailureCollector(); + FormulaEvaluator formulaEvaluator; + for (InputFile inputFile : inputFiles) { + DataFormatter formatter = new DataFormatter(); + try (Workbook workbook = WorkbookFactory.create(inputFile.open())) { + formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); + formulaEvaluator.setIgnoreMissingWorkbooks(true); + Sheet workSheet; + // Check if user wants to access with name or number + if (conf.getSheet() != null && conf.getSheet().equals(XlsInputFormatConfig.SHEET_NUMBER)) { + workSheet = workbook.getSheetAt(Integer.parseInt(Objects.requireNonNull(conf.getSheetValue()))); + } else { + workSheet = workbook.getSheet(conf.getSheetValue()); + } + + // If provided sheet does not exist, throw an exception + if (workSheet == null) { + failureCollector.addFailure("Sheet " + conf.getSheetValue() + " does not exist in the workbook.", + "Specify a valid sheet."); + return null; + } + + int sampleSizeInt = getSampleSizeInt(); + // Row numbers are 0 based in POI + int rowStart = Math.min(0, workSheet.getFirstRowNum()); + int rowEnd = Math.min(sampleSizeInt, workSheet.getLastRowNum()); + + int lastCellNumMax = 0; + List columnNames = new ArrayList<>(); + XlsInputFormatSchemaDetector schemaDetector = new XlsInputFormatSchemaDetector(); + for (int rowIndex = rowStart; rowIndex <= rowEnd; rowIndex++) { + Row row = workSheet.getRow(rowIndex); + if (row == null) { + continue; + } + lastCellNumMax = Math.max(lastCellNumMax, row.getLastCellNum()); + + // Use the first row to get the column names + if (rowIndex == 0 && conf.getSkipHeader()) { + for (int cellIndex = 0; cellIndex < lastCellNumMax; cellIndex++) { + Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL); + columnNames.add(cell == null ? blankHeader : formatter.formatCellValue(cell, formulaEvaluator)); + } + // Skip Header + continue; + } + + for (int cellIndex = 0; cellIndex < lastCellNumMax; cellIndex++) { + Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL); + boolean isFirstRow = rowIndex == (conf.getSkipHeader() ? 1 : 0); + schemaDetector.reduceSchema(cellIndex, cell, isFirstRow); + } + + } + + // If some rows have more cells than the first row, add blank headers for the extra cells + if (lastCellNumMax > columnNames.size() && conf.getSkipHeader()) { + for (int i = columnNames.size(); i < lastCellNumMax; i++) { + columnNames.add(blankHeader); + } + } + + // Set column names if header is not skipped + if (!conf.getSkipHeader()) { + for (int i = 0; i < lastCellNumMax; i++) { + columnNames.add(CellReference.convertNumToColString(i)); + } + } + + Schema schema = Schema.recordOf("xls", schemaDetector.getFields( + XlsInputFormatUtils.getSafeColumnNames(columnNames), conf.getOverride())); + return PathTrackingInputFormatProvider.addPathField(context.getFailureCollector(), schema, conf.getPathField()); + } + } + return null; + } + + private int getSampleSizeInt() { + // Note: Sample size is long, but we are casting it to int here. This is because the POI API uses int + // for row numbers, so we cannot support sample sizes larger than Integer.MAX_VALUE + long sampleSize = conf.getSampleSize() > Integer.MAX_VALUE ? Integer.MAX_VALUE : conf.getSampleSize(); + return (int) sampleSize; + } + +} diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetector.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetector.java new file mode 100644 index 000000000..c2f242961 --- /dev/null +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetector.java @@ -0,0 +1,146 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import io.cdap.cdap.api.data.schema.Schema; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.CellType; +import org.apache.poi.ss.usermodel.DateUtil; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * Detects the schema of an Excel file. + */ +public class XlsInputFormatSchemaDetector { + + private final Map columnSchemaReducerMap = new HashMap<>(); + private final Map columnNullableMap = new HashMap<>(); + + /** + * Reduces the schema of the Excel file. + * + * @param columnIndex the column index of the cell + * @param cell the cell to reduce the schema from + * @param isFirstRow whether the cell is in the first row + */ + public void reduceSchema(int columnIndex, Cell cell, boolean isFirstRow) { + boolean isCellEmpty = isCellEmpty(cell); + + if (!columnNullableMap.containsKey(columnIndex)) { + // When we see the index for the first time and this is not the first row, + // we can assume that the column is nullable as the previous rows did not have a value for this column. + columnNullableMap.put(columnIndex, !isFirstRow); + } + // Pin the nullability of the column to true if the cell is empty + columnNullableMap.put(columnIndex, isCellEmpty || columnNullableMap.get(columnIndex)); + if (isCellEmpty) { + return; + } + // Check if key exists in map + if (columnSchemaReducerMap.containsKey(columnIndex)) { + // If key exists, reduce the schema type + columnSchemaReducerMap.put(columnIndex, reduceSchemaType(columnSchemaReducerMap.get(columnIndex), cell)); + } else { + // If key does not exist, add it to the map + columnSchemaReducerMap.put(columnIndex, getSchemaType(cell)); + } + } + + private void normalizeColumn(int numColumns) { + for (int i = 0; i < numColumns; i++) { + // set all nullability to true if not present + columnNullableMap.putIfAbsent(i, true); + // set all schema types to string if not present + columnSchemaReducerMap.putIfAbsent(i, Schema.Type.STRING); + } + } + + /** + * Returns the schema of the Excel file. + * + * @param columnNames the column names of the Excel file + * @param override the override schema of the Excel file provided by the user + * @return the schema of the Excel file + */ + public List getFields(List columnNames, Map override) { + normalizeColumn(columnNames.size()); + List fields = new ArrayList<>(); + for (int i = 0; i < columnNames.size(); i++) { + String columnName = columnNames.get(i); + boolean isNullable = columnNullableMap.get(i); + if (override.containsKey(columnName)) { + Schema schema = isNullable ? Schema.nullableOf(override.get(columnName)) : override.get(columnName); + fields.add(Schema.Field.of(columnName, schema)); + continue; + } + Schema.Type schemaType = columnSchemaReducerMap.get(i); + Schema schema = isNullable ? Schema.nullableOf(Schema.of(schemaType)) : Schema.of(schemaType); + fields.add(Schema.Field.of(columnName, schema)); + } + return fields; + } + + private static boolean isCellEmpty(Cell cell) { + if (cell != null && cell.getCellType() == CellType.FORMULA) { + return cell.getCachedFormulaResultType() == CellType.BLANK; + } + return cell == null || cell.getCellType() == CellType.BLANK; + } + + private static Schema.Type getSchemaType(Cell cell) { + CellType cellType = cell.getCellType() == CellType.FORMULA ? + cell.getCachedFormulaResultType() : cell.getCellType(); + // Force Dates As String + if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) { + return Schema.Type.STRING; + } + // Mapping for XLS Cell Types to CDAP Schema Types + switch (cellType) { + case BOOLEAN: + return Schema.Type.BOOLEAN; + case NUMERIC: + return Schema.Type.DOUBLE; + default: + return Schema.Type.STRING; + } + } + private static Schema.Type reduceSchemaType(Schema.Type detectedSchemaType, Cell cell) { + if (detectedSchemaType == Schema.Type.STRING) { + return Schema.Type.STRING; + } + CellType cellType = cell.getCellType() == CellType.FORMULA ? + cell.getCachedFormulaResultType() : cell.getCellType(); + switch (cellType) { + case BOOLEAN: + switch (detectedSchemaType) { + case BOOLEAN: + return Schema.Type.BOOLEAN; + case DOUBLE: + return Schema.Type.DOUBLE; + } + return Schema.Type.STRING; + case NUMERIC: + return Schema.Type.DOUBLE; + } + return Schema.Type.STRING; + } +} diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtils.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtils.java new file mode 100644 index 000000000..607d07836 --- /dev/null +++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtils.java @@ -0,0 +1,89 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import io.cdap.cdap.api.data.schema.Schema; +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.CellType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Utilities around XLS input format. + */ +public class XlsInputFormatUtils { + private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+"); + + /** + * Cleans a list of column names to make sure they comply with avro field naming standard. + * It also makes sure each name is unique in the list. + * Field names can start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]. + *

+ * Steps: + * 1) Trim surrounding spaces + * 2) If its empty replace it with BLANK + * 3) If it starts with a number, prepend "col_" + * 4) Replace invalid characters with "_" (multiple invalid characters gets replaced with one symbol) + * 5) Check if the name has been found before (without considering case) + * if so add _# where # is the number of times seen before + 1 + */ + public static List getSafeColumnNames(List columnNames) { + return cleanSchemaColumnNames(columnNames); + } + + private static List cleanSchemaColumnNames(List columnNames) { + final String replacementChar = "_"; + final List cleanColumnNames = new ArrayList<>(); + final Map seenColumnNames = new HashMap<>(); + for (String columnName : columnNames) { + StringBuilder cleanColumnNameBuilder = new StringBuilder(); + + // Remove any spaces at the end of the strings + columnName = columnName.trim(); + + // If it's an empty string replace it with BLANK + if (columnName.isEmpty()) { + cleanColumnNameBuilder.append("BLANK"); + } else if ((columnName.charAt(0) >= '0') && (columnName.charAt(0) <= '9')) { + // Prepend a col_ if the first character is a number + cleanColumnNameBuilder.append("col_"); + } + + // Replace all invalid characters with the replacement char + cleanColumnNameBuilder.append(NOT_VALID_PATTERN.matcher(columnName).replaceAll(replacementChar)); + + // Check if the field exist if so append and index at the end + // We use lowercase to match columns "A" and "a" to avoid issues with wrangler. + String cleanColumnName = cleanColumnNameBuilder.toString(); + String lowerCaseCleanColumnName = cleanColumnName.toLowerCase(); + while (seenColumnNames.containsKey(lowerCaseCleanColumnName)) { + cleanColumnNameBuilder.append(replacementChar).append(seenColumnNames.get(lowerCaseCleanColumnName)); + seenColumnNames.put(lowerCaseCleanColumnName, seenColumnNames.get(lowerCaseCleanColumnName) + 1); + cleanColumnName = cleanColumnNameBuilder.toString(); + lowerCaseCleanColumnName = cleanColumnName.toLowerCase(); + } + seenColumnNames.put(lowerCaseCleanColumnName, 2); + + cleanColumnNames.add(cleanColumnName); + } + return cleanColumnNames; + } +} diff --git a/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatProviderTest.java b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatProviderTest.java new file mode 100644 index 000000000..ff94bf9c1 --- /dev/null +++ b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatProviderTest.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format.xls.input; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.validation.FormatContext; +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class XlsInputFormatProviderTest { + XlsInputFormatProvider xlsInputFormatProvider; + MockFailureCollector failureCollector; + FormatContext formatContext; + + @Before + public void setup() throws IOException { + failureCollector = new MockFailureCollector(); + formatContext = new FormatContext(failureCollector, null); + } + + @Test + public void testValidateInvalidSheetNumber() { + Schema schema = Schema.recordOf("test", Schema.Field.of("test", Schema.of(Schema.Type.STRING))); + xlsInputFormatProvider = new XlsInputFormatProvider( + XlsInputFormatConfig.builder().setSheet(XlsInputFormatConfig.SHEET_NUMBER).setSheetValue("A") + .setSchema(schema.toString()).build()); + xlsInputFormatProvider.validate(formatContext); + Assert.assertEquals(1, failureCollector.getValidationFailures().size()); + Assert.assertEquals("Sheet number must be a number.", failureCollector.getValidationFailures().get(0).getMessage()); + } + + @Test + public void testValidateValidSheetNumber() { + Schema schema = Schema.recordOf("test", Schema.Field.of("test", Schema.of(Schema.Type.STRING))); + xlsInputFormatProvider = new XlsInputFormatProvider( + XlsInputFormatConfig.builder().setSheet(XlsInputFormatConfig.SHEET_NUMBER).setSheetValue("0") + .setSchema(schema.toString()).build()); + xlsInputFormatProvider.validate(formatContext); + Assert.assertEquals(0, failureCollector.getValidationFailures().size()); + } + +} + diff --git a/format-xls/src/test/resources/testdata.xlsx b/format-xls/src/test/resources/testdata.xlsx new file mode 100644 index 000000000..15212a717 Binary files /dev/null and b/format-xls/src/test/resources/testdata.xlsx differ diff --git a/pom.xml b/pom.xml index 63c00f4b7..b375c18fb 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ solrsearch-plugins spark-plugins transform-plugins + format-xls