Skip to content

Commit

Permalink
Added format xls
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs authored and psainics committed Dec 21, 2023
1 parent de7a6d6 commit a1f6456
Show file tree
Hide file tree
Showing 11 changed files with 1,078 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum FileFormat {
ORC(false, true),
PARQUET(true, true),
TEXT(true, false),
TSV(true, true);
TSV(true, true),
XLS(true,false);
private final boolean canRead;
private final boolean canWrite;

Expand Down
90 changes: 90 additions & 0 deletions format-xls/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
~ Copyright © 2023 Cask Data, Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
-->

<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cdap.plugin</groupId>
<artifactId>hydrator-plugins</artifactId>
<version>2.13.0-SNAPSHOT</version>
</parent>
<artifactId>format-xls</artifactId>
<name>XLS format plugins</name>
<packaging>jar</packaging>
<properties>
<poi.version>5.2.4</poi.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-etl-api</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-formats</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>hydrator-test</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.plugin</groupId>
<artifactId>format-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.format.xls.*
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
<Embed-Directory>lib</Embed-Directory>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>io.cdap</groupId>
<artifactId>cdap-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.format.xls.input;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellType;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;

import java.io.IOException;
import java.util.List;


/**
* {@link XlsInputFormat} is {@link TextInputFormat} implementation for reading Excel files.
* <p>
* The {@link XlsInputFormat.XlsRecordReader} reads a given sheet, and within a sheet reads
* all columns and all rows.
*/
public class XlsInputFormat extends CombineFileInputFormat<LongWritable, StructuredRecord> {

public static final String SHEET_NO = "Sheet Number";
public static final String SHEET_VALUE = "sheetValue";
public static final String NAME_SKIP_HEADER = "skipHeader";
public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";

@Override
public RecordReader<LongWritable, StructuredRecord> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new XlsRecordReader();
}

/**
* Reads excel spread sheet, where the keys are the offset in the excel file and the text is the complete record.
*/
public static class XlsRecordReader extends RecordReader<LongWritable, StructuredRecord> {
// DataFormatter to format and get each cell's value as String
XlsInputFormatDataFormatter formatter;
FormulaEvaluator formulaEvaluator;
// Map key that represents the row index.
private LongWritable key;
// Map value that represents an excel row
private StructuredRecord value;
private Sheet workSheet;
// InputStream handler for Excel files.
private FSDataInputStream fileIn;
// Specifies the row index.
private int rowIndex;
// Specifies last row num.
private int lastRowNum;
private Schema outputSchema;
private boolean terminateIfEmptyRow;
private boolean isRowNull;

@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {

CombineFileSplit split = (CombineFileSplit) genericSplit;
Configuration jobConf = context.getConfiguration();
// Path of input file.
Path file = split.getPath(0);
String schema = context.getConfiguration().get("schema");
outputSchema = schema != null ? Schema.parseJson(schema) : null;
FileSystem fs = file.getFileSystem(jobConf);
fileIn = fs.open(split.getPath(0));

String sheet = jobConf.get(SHEET_NO);
String sheetValue = jobConf.get(SHEET_VALUE, "0");
terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);

try (Workbook workbook = WorkbookFactory.create(fileIn)) {
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
formatter = new XlsInputFormatDataFormatter(formulaEvaluator);
// Check if user wants to access with name or number
if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) {
workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
} else {
workSheet = workbook.getSheet(sheetValue);
}
} catch (Exception e) {
throw new IllegalArgumentException("Exception while reading excel sheet. " + e.getMessage(), e);
}

lastRowNum = workSheet.getLastRowNum();
rowIndex = 0;
isRowNull = false;

boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, true);
if (skipFirstRow) {
Preconditions.checkArgument(lastRowNum != -1, "No rows found on sheet %s", sheetValue);
rowIndex = 1;
}
}

@Override
public boolean nextKeyValue() {
// If any is true, then we stop processing.
if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
return false;
}
// Get the next row.
Row row = workSheet.getRow(rowIndex);
if (row == null) {
// Row is empty, stop processing if terminateIfEmptyRow is true.
if (terminateIfEmptyRow) {
return false;
} else {
// set empty row in the output
value = StructuredRecord.builder(outputSchema).build();
rowIndex++;
return true;
}
}
key = new LongWritable(rowIndex);

StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
List<Schema.Field> fields = outputSchema.getFields();

isRowNull = true;
for (int cellIndex = 0; cellIndex < row.getLastCellNum(); cellIndex++) {
if (cellIndex >= fields.size()) {
throw new IllegalArgumentException(
String.format("Schema contains less fields than the number of columns in the excel file. " +
"Schema fields: %s, Excel columns: %s", fields.size(), row.getLastCellNum()));
}
Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
if (cell == null) {
// Blank cells are skipped, builder will set null for the field, no processing needed.
continue;
}
Schema.Field field = fields.get(cellIndex);
Schema.Type type = field.getSchema().isNullable() ?
field.getSchema().getNonNullable().getType() : field.getSchema().getType();
String result = formatter.formatCellValue(cell, type);
if (result == null) {
continue;
}
isRowNull = false;
builder.convertAndSet(field.getName(), result);
}
value = builder.build();
rowIndex++;

// Stop processing if the row is null and terminateIfEmptyRow is true.
return !isRowNull || !terminateIfEmptyRow;
}

@Override
public float getProgress() {
return (float) rowIndex / lastRowNum;
}

@Override
public void close() throws IOException {
if (fileIn != null) {
fileIn.close();
}
}

@Override
public LongWritable getCurrentKey() {
return key;
}

@Override
public StructuredRecord getCurrentValue() {
return value;
}
}
}
Loading

0 comments on commit a1f6456

Please sign in to comment.