Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB-6045 Support compression for avro external table #2065

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDDOperationScope;
import org.apache.spark.sql.SparkSession;
import org.xerial.snappy.OSInfo;
import scala.Tuple2;
import com.splicemachine.EngineDriver;
import com.splicemachine.access.HConfiguration;
Expand Down Expand Up @@ -78,7 +79,7 @@ private SpliceSpark() {} // private constructor forbids creating instances
public static synchronized SparkSession getSession() {
String threadName = Thread.currentThread().getName();
if (!SpliceClient.isClient() && !threadName.startsWith("olap-worker-")) {
// Not running on the Olap Server... raise exception. Use getSessionUnsafe() if you know what you are doing.
// Not running on the Olap Server... raise exception. Use getSessionUnsafe() if you know what you are doing.
throw new RuntimeException("Trying to get a SparkSession from outside the OlapServer");
}
return getSessionUnsafe();
Expand All @@ -91,7 +92,7 @@ public static synchronized void setCredentials(Broadcast<SerializableWritable<C
public static synchronized Broadcast<SerializableWritable<Credentials>> getCredentials() {
return credentials;
}

/** This method is unsafe, it should only be used on tests are as a convenience when trying to
* get a local Spark Context, it should never be used when implementing Splice operations or functions
*/
Expand Down Expand Up @@ -184,7 +185,7 @@ public Void run() throws Exception {
}
}
}

public static synchronized void setupSpliceStaticComponents() throws IOException {
try {
if (!spliceStaticComponentsSetup && isRunningOnSpark()) {
Expand All @@ -198,14 +199,14 @@ public static synchronized void setupSpliceStaticComponents() throws IOException
SIEnvironment env = SpliceClient.isClient() && tokenEnabled ?
AdapterSIEnvironment.loadEnvironment(new SystemClock(),ZkUtils.getRecoverableZooKeeper(),SpliceClient.getConnectionPool(debugConnections,maxConnections)) :
HBaseSIEnvironment.loadEnvironment(new SystemClock(),ZkUtils.getRecoverableZooKeeper());

SIDriver driver = env.getSIDriver();

//make sure the configuration is correct
SConfiguration config=driver.getConfiguration();

LOG.info("Splice Client in SpliceSpark "+SpliceClient.isClient());

//boot derby components
new EngineLifecycleService(new DistributedDerbyStartup(){
@Override public void distributedStart() throws IOException{ }
Expand Down Expand Up @@ -234,6 +235,11 @@ public static synchronized void setupSpliceStaticComponents() throws IOException
HBaseRegionLoads.INSTANCE.startWatching();

spliceStaticComponentsSetup = true;

// SPLICE-2115, workaround for snappy-java-1.0.4.1 on Mac
if (OSInfo.getOSName().equals("Mac")) {
System.setProperty("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");
}
}
} catch (RuntimeException e) {
LOG.error("Unexpected error setting up Splice components", e);
Expand Down Expand Up @@ -323,7 +329,6 @@ private static SparkSession initializeSparkSession() {

// TODO can this be set/overridden fwith system property, why do we use SpliceConstants?
conf.set("spark.io.compression.codec",HConfiguration.getConfiguration().getSparkIoCompressionCodec());
conf.set("spark.sql.avro.compression.codec","uncompressed");

/*
Application Properties
Expand Down Expand Up @@ -398,7 +403,7 @@ public static void popScope() {
SpliceSpark.getContext().setLocalProperty("spark.rdd.scope", null);
SpliceSpark.getContext().setLocalProperty("spark.rdd.scope.noOverride", null);
}

public synchronized static void setContext(JavaSparkContext sparkContext) {
session = SparkSession.builder().config(sparkContext.getConf()).getOrCreate(); // Claims this is a singleton from documentation
ctx = sparkContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
Expand Down Expand Up @@ -203,13 +204,13 @@ public DataSet<V> distinct(String name, boolean isLast, OperationContext context
pushScopeIfNeeded(context, pushScope, scopeDetail);
try {
Dataset<Row> result = toSparkRow(this,context)
.distinct();
.distinct();

return toSpliceLocatedRow(result,context);

}
catch (Exception se){
throw new RuntimeException(se);
catch (Exception se){
throw new RuntimeException(se);

} finally {
if (pushScope) context.popScope();
Expand All @@ -218,7 +219,7 @@ public DataSet<V> distinct(String name, boolean isLast, OperationContext context

@Override
public <Op extends SpliceOperation, K,U> PairDataSet<K,U> index(SplicePairFunction<Op,V,K,U> function) {
return new SparkPairDataSet<>(rdd.mapToPair(new SparkSplittingFunction<>(function)), function.getSparkName());
return new SparkPairDataSet<>(rdd.mapToPair(new SparkSplittingFunction<>(function)), function.getSparkName());
}

@Override
Expand Down Expand Up @@ -346,7 +347,7 @@ public <Op extends SpliceOperation> DataSet< V> filter(SplicePredicateFunction<O
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <Op extends SpliceOperation> DataSet< V> filter(
SplicePredicateFunction<Op,V> f, boolean isLast, boolean pushScope, String scopeDetail) {
SplicePredicateFunction<Op,V> f, boolean isLast, boolean pushScope, String scopeDetail) {
pushScopeIfNeeded(f, pushScope, scopeDetail);
try {
return new SparkDataSet(rdd.filter(new SparkSpliceFunctionWrapper<>(f)), planIfLast(f, isLast));
Expand Down Expand Up @@ -397,7 +398,7 @@ public DataSet<V> windows(WindowContext windowContext, OperationContext context,
dataset = dataset.withColumn(ValueRow.getNamedColumn(aggregator.getResultColumnId()-1),col);
}
//Convert back to Splice Row
return toSpliceLocatedRow(dataset, context);
return toSpliceLocatedRow(dataset, context);

} catch (Exception se){
throw new RuntimeException(se);
Expand All @@ -415,20 +416,20 @@ public DataSet< V> intersect(DataSet< V> dataSet, String name, OperationContext
//Convert this rdd backed iterator to a Spark untyped dataset
Dataset<Row> left = SpliceSpark.getSession()
.createDataFrame(
rdd.map(
new LocatedRowToRowFunction()),
context.getOperation()
.getExecRowDefinition()
.schema());
rdd.map(
new LocatedRowToRowFunction()),
context.getOperation()
.getExecRowDefinition()
.schema());

//Convert the left operand to a untyped dataset
Dataset<Row> right = SpliceSpark.getSession()
.createDataFrame(
((SparkDataSet)dataSet).rdd
.map(new LocatedRowToRowFunction()),
.map(new LocatedRowToRowFunction()),
context.getOperation()
.getExecRowDefinition()
.schema());
.getExecRowDefinition()
.schema());

//Do the intesect
Dataset<Row> result = left.intersect(right);
Expand Down Expand Up @@ -493,7 +494,7 @@ public <Op extends SpliceOperation, U> DataSet< U> flatMap(SpliceFlatMapFunction
public <Op extends SpliceOperation, U> DataSet< U> flatMap(SpliceFlatMapFunction<Op, V, U> f, boolean isLast) {
return new SparkDataSet<>(rdd.flatMap(new SparkFlatMapFunction<>(f)), planIfLast(f, isLast));
}

@Override
public void close() {

Expand Down Expand Up @@ -539,7 +540,7 @@ public RecordWriter<Void, ExecRow> getRecordWriter(TaskAttemptContext taskAttemp
Configuration conf = taskAttemptContext.getConfiguration();
String encoded = conf.get("exportFunction");
ByteDataInput bdi = new ByteDataInput(
Base64.decodeBase64(encoded));
Base64.decodeBase64(encoded));
SpliceFunction2<ExportOperation, OutputStream, Iterator<ExecRow>, Void> exportFunction;
try {
exportFunction = (SpliceFunction2<ExportOperation, OutputStream, Iterator<ExecRow>, Void>) bdi.readObject();
Expand Down Expand Up @@ -673,13 +674,13 @@ public DataSet<V> join(OperationContext context, DataSet<V> rightDataSet, JoinTy
JoinOperation op = (JoinOperation) context.getOperation();
Dataset<Row> leftDF = SpliceSpark.getSession().createDataFrame(
rdd.map(new LocatedRowToRowFunction()),
context.getOperation().getLeftOperation().getExecRowDefinition().schema());
context.getOperation().getLeftOperation().getExecRowDefinition().schema());
Dataset<Row> rightDF = SpliceSpark.getSession().createDataFrame(
((SparkDataSet)rightDataSet).rdd.map(new LocatedRowToRowFunction()),
context.getOperation().getRightOperation().getExecRowDefinition().schema());
if (isBroadcast) {
rightDF = broadcast(rightDF);
}
if (isBroadcast) {
rightDF = broadcast(rightDF);
}
Column expr = null;
int[] rightJoinKeys = ((JoinOperation)context.getOperation()).getRightHashKeys();
int[] leftJoinKeys = ((JoinOperation)context.getOperation()).getLeftHashKeys();
Expand All @@ -696,7 +697,7 @@ public DataSet<V> join(OperationContext context, DataSet<V> rightDataSet, JoinTy
new RowToLocatedRowFunction(context)));
else
joinedSet = new SparkDataSet(leftDF.join(rightDF,expr,joinType.strategy()).rdd().toJavaRDD().map(
new RowToLocatedRowFunction(context)));
new RowToLocatedRowFunction(context)));
return joinedSet;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -741,15 +742,15 @@ public static DataSet toSpliceLocatedRow(JavaRDD<Row> rdd, OperationContext cont

@SuppressWarnings({ "unchecked", "rawtypes" })
public DataSet<ExecRow> writeParquetFile(int[] baseColumnMap, int[] partitionBy, String location, String compression,
OperationContext context) {
OperationContext context) {
try {
Dataset<Row> insertDF = SpliceSpark.getSession().createDataFrame(
rdd.map(new SparkSpliceFunctionWrapper<>(new CountWriteFunction(context))).map(new LocatedRowToRowFunction()),
context.getOperation().getExecRowDefinition().schema());

List<Column> cols = new ArrayList();
for (int i = 0; i < baseColumnMap.length; i++) {
cols.add(new Column(ValueRow.getNamedColumn(baseColumnMap[i])));
cols.add(new Column(ValueRow.getNamedColumn(baseColumnMap[i])));
}
List<String> partitionByCols = new ArrayList();
for (int i = 0; i < partitionBy.length; i++) {
Expand All @@ -775,12 +776,16 @@ public DataSet<ExecRow> writeParquetFile(int[] baseColumnMap, int[] partitionBy,

@SuppressWarnings({ "unchecked", "rawtypes" })
public DataSet<ExecRow> writeAvroFile(int[] baseColumnMap, int[] partitionBy, String location, String compression,
OperationContext context) {
OperationContext context) {
try {

StructType schema = AvroUtils.supportAvroDateType(context.getOperation().getExecRowDefinition().schema(),"a");

Dataset<Row> insertDF = SpliceSpark.getSession().createDataFrame(
SparkSession spliceSpark = SpliceSpark.getSession();
if (compression.toLowerCase().equals("zlib"))
compression = "deflate";
spliceSpark.conf().set("spark.sql.avro.compression.codec",compression);
Dataset<Row> insertDF = spliceSpark.createDataFrame(
rdd.map(new SparkSpliceFunctionWrapper<>(new CountWriteFunction(context))).map(new LocatedRowToRowAvroFunction()),
schema);

Expand All @@ -799,7 +804,7 @@ public DataSet<ExecRow> writeAvroFile(int[] baseColumnMap, int[] partitionBy, St
}
insertDF = insertDF.repartition(scala.collection.JavaConversions.asScalaBuffer(repartitionCols).toList());
}
insertDF.write().option(SPARK_COMPRESSION_OPTION,compression).partitionBy(partitionByCols.toArray(new String[partitionByCols.size()]))
insertDF.write().partitionBy(partitionByCols.toArray(new String[partitionByCols.size()]))
.mode(SaveMode.Append).format("com.databricks.spark.avro").save(location);
ValueRow valueRow=new ValueRow(1);
valueRow.setColumn(1,new SQLLongint(context.getRecordsWritten()));
Expand All @@ -812,7 +817,7 @@ public DataSet<ExecRow> writeAvroFile(int[] baseColumnMap, int[] partitionBy, St

@SuppressWarnings({ "unchecked", "rawtypes" })
public DataSet<ExecRow> writeORCFile(int[] baseColumnMap, int[] partitionBy, String location, String compression,
OperationContext context) {
OperationContext context) {
try {
Dataset<Row> insertDF = SpliceSpark.getSession().createDataFrame(
rdd.map(new SparkSpliceFunctionWrapper<>(new CountWriteFunction(context))).map(new LocatedRowToRowFunction()),
Expand Down Expand Up @@ -845,8 +850,8 @@ public DataSet<ExecRow> writeORCFile(int[] baseColumnMap, int[] partitionBy, Str

@SuppressWarnings({ "unchecked", "rawtypes" })
public DataSet<ExecRow> writeTextFile(SpliceOperation op, String location, String characterDelimiter, String columnDelimiter,
int[] baseColumnMap,
OperationContext context) {
int[] baseColumnMap,
OperationContext context) {

try {
Dataset<Row> insertDF = SpliceSpark.getSession().createDataFrame(
Expand Down
Loading