Skip to content

Commit

Permalink
fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed May 16, 2024
1 parent 51b5651 commit 7b96137
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.arrow.dataset.file;

import java.util.Optional;

import org.apache.arrow.dataset.jni.NativeDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.FragmentScanOptions;
import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions;
import org.apache.arrow.memory.BufferAllocator;

import java.util.Optional;

/**
* Java binding of the C++ FileSystemDatasetFactory.
*/
Expand Down Expand Up @@ -55,7 +54,8 @@ private static long createNative(FileFormat format, String uri, Optional<Fragmen
fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null));
}

private static long createNative(FileFormat format, String[] uris, Optional<FragmentScanOptions> fragmentScanOptions) {
private static long createNative(FileFormat format, String[] uris,
Optional<FragmentScanOptions> fragmentScanOptions) {
return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id(),
fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.jni.JniLoader;

import java.nio.ByteBuffer;

import org.apache.arrow.dataset.jni.JniLoader;

/**
* JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.arrow.dataset.jni;

import java.nio.ByteBuffer;

import org.apache.arrow.dataset.scanner.FragmentScanOptions;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions;
import org.apache.arrow.dataset.source.Dataset;

import java.nio.ByteBuffer;

/**
* Native implementation of {@link Dataset}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@

package org.apache.arrow.dataset.scanner;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import io.substrait.proto.AdvancedExtension;
import org.apache.arrow.dataset.substrait.StringMapNode;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.arrow.dataset.substrait.StringMapNode;

import com.google.protobuf.Any;

import io.substrait.proto.AdvancedExtension;

public interface FragmentScanOptions {
String typeName();

int fileFormatId();

ByteBuffer serialize();

/**
* serialize the map.
*
* @param config config map
* @return bufer to jni call argument, should be DirectByteBuffer
*/
default ByteBuffer serializeMap(Map<String, String> config) {
if (config.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public Builder substraitFilter(ByteBuffer substraitFilter) {
return this;
}

/**
* Set the FragmentScanOptions.
*
* @param fragmentScanOptions scan options
* @return the ScanOptions configured.
*/
public Builder fragmentScanOptions(FragmentScanOptions fragmentScanOptions) {
Preconditions.checkNotNull(fragmentScanOptions);
this.fragmentScanOptions = fragmentScanOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.arrow.dataset.scanner.csv;

import org.apache.arrow.c.ArrowSchema;

import java.util.Map;
import java.util.Optional;

import org.apache.arrow.c.ArrowSchema;

public class CsvConvertOptions {

private final Map<String, String> configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,29 @@

package org.apache.arrow.dataset.scanner.csv;

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.scanner.FragmentScanOptions;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.scanner.FragmentScanOptions;

public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions {
private final CsvConvertOptions convertOptions;
private final Map<String, String> readOptions;
private final Map<String, String> parseOptions;


/**
* csv scan options, map to CPP struct CsvFragmentScanOptions.
*
* @param convertOptions same struct in CPP
* @param readOptions same struct in CPP
* @param parseOptions same struct in CPP
*/
public CsvFragmentScanOptions(CsvConvertOptions convertOptions,
Map<String, String> readOptions,
Map<String, String> parseOptions) {
Expand All @@ -45,10 +52,20 @@ public String typeName() {
return FileFormat.CSV.name().toLowerCase(Locale.ROOT);
}

/**
* File format id.
*
* @return id
*/
public int fileFormatId() {
return FileFormat.CSV.id();
}

/**
* Serialize this class to ByteBuffer and then called by jni call.
*
* @return DirectByteBuffer
*/
public ByteBuffer serialize() {
Map<String, String> options = Stream.concat(Stream.concat(readOptions.entrySet().stream(),
parseOptions.entrySet().stream()),
Expand All @@ -57,6 +74,7 @@ public ByteBuffer serialize() {
options.put("column_type", Long.toString(convertOptions.getArrowSchemaAddress()));
return serializeMap(options);
}

public static CsvFragmentScanOptions deserialize(String serialized) {
throw new UnsupportedOperationException("Not implemented now");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.dataset.substrait;

import io.substrait.proto.Expression;
package org.apache.arrow.dataset.substrait;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import io.substrait.proto.Expression;

public class StringMapNode implements Serializable {
private final Map<String, String> values = new HashMap<>();

public StringMapNode(Map<String, String> values) {
this.values.putAll(values);
}

/**
* Serialize String map.
*
* @return Substrait Literal
*/
public Expression.Literal toProtobuf() {
Expression.Literal.Builder literalBuilder = Expression.Literal.newBuilder();
Expression.Literal.Map.KeyValue.Builder keyValueBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.ImmutableMap;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CDataDictionaryProvider;
import org.apache.arrow.c.Data;
Expand All @@ -56,6 +55,8 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import com.google.common.collect.ImmutableMap;

public class TestAceroSubstraitConsumer extends TestDataset {

@ClassRule
Expand Down Expand Up @@ -474,11 +475,12 @@ public void testCsvConvertOptions() throws Exception {
), null);
String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv";
BufferAllocator allocator = rootAllocator();
try(ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
Data.exportSchema(allocator, schema, new CDataDictionaryProvider(), cSchema);
CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of("delimiter", ";"));
convertOptions.setArrowSchema(cSchema);
CsvFragmentScanOptions fragmentScanOptions = new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), ImmutableMap.of());
CsvFragmentScanOptions fragmentScanOptions = new CsvFragmentScanOptions(
convertOptions, ImmutableMap.of(), ImmutableMap.of());
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.fragmentScanOptions(fragmentScanOptions)
Expand Down

0 comments on commit 7b96137

Please sign in to comment.