Skip to content
This repository has been archived by the owner on Dec 12, 2020. It is now read-only.

Commit

Permalink
Add parallel endpoints to flight source
Browse files Browse the repository at this point in the history
* add parallel source for executors
* add actions to enable usage of parallel path
* expand testing
* add feature flags for parallel and serial flight

There were also a number of bug fixes as part of this fix
* fix a number of memory leak issues
* fix cancellation issue
* simplify connectivity configuration #10
* expand data type support #5
  • Loading branch information
Ryan Murray committed Feb 18, 2020
1 parent f455a63 commit ba012ef
Show file tree
Hide file tree
Showing 19 changed files with 1,769 additions and 202 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@

## Configuration

### Enable Flight

* the property `-Ddremio.flight.enable=true` *MUST* be set to enable flight
* the property `-Ddremio.flight.parallel.enable=true` *MUST* be set on all executors to enable parallel flight

### Parallel Flight
The parallel flight stream is now working in Dremio. However this requires a patched dremio-oss to work correctly. This allows executors to stream
results directly to the python/spark connector in parallel. See: [Spark Connector](https://github.com/rymurr/flight-spark-source). This results in an
approximate linear performance increase over serial Flight (with properly configured parallelization)


### SSL
* ensure you have ssl set up in your `dremio.conf`. This plugin currently uses the same certificates as the webserver.
* set property `-Ddremio.flight.use-ssl=true`
Expand Down
203 changes: 136 additions & 67 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.dremio.flight</groupId>
Expand All @@ -34,20 +34,25 @@

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!--
Dependencies version for this module
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Submodules should rely on dependencixesManagement as much as possible
-->
<arrow.flight.version>0.15.0</arrow.flight.version>
<version.dremio>4.0.2-201910020123580864-a98a0b9</version.dremio>
<arrow.flight.version>0.15.1</arrow.flight.version>
<version.dremio>4.1.4-202001240912140359-a90eb503</version.dremio>
<junit.version>4.12</junit.version>
<lilith.version>0.9.44</lilith.version>
<slf4j.version>1.7.10</slf4j.version>
<logback.version>1.1.3</logback.version>
<surefire.extra.argLine>-Ddremio.flight.ssl=true</surefire.extra.argLine>
<jackson.version>2.10.2</jackson.version>
<protobuf.version>3.9.1</protobuf.version>
<guava.version>20.0</guava.version>
<netty.version>4.1.38.Final</netty.version>
<netty.boringssl.version>2.0.25.Final</netty.boringssl.version>
<surefire.extra.argLine>-Ddremio.flight.ssl=true -Ddremio.flight.enabled=true -Ddremio.flight.parallel.enabled=true</surefire.extra.argLine>

<target.gen.source.path>${project.basedir}/target/generated-sources</target.gen.source.path>

Expand Down Expand Up @@ -133,19 +138,36 @@
<executions>
<execution>
<id>avoid_bad_dependencies</id>
<phase>verify</phase>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<bannedDependencies>
<includes combine.children="append">
<!-- Needed by HBase test cluster -->
<include>javax.servlet:servlet-api:*:*:test</include>
<include>commons-logging:commons-logging:*:jar:provided</include>
</includes>
<excludes>
<exclude>commons-logging</exclude>
<exclude>javax.servlet:servlet-api</exclude>
<exclude>org.mortbay.jetty:servlet-api</exclude>
<exclude>org.mortbay.jetty:servlet-api-2.5</exclude>
<exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude>
<exclude>log4j:*</exclude>
<!-- <exclude>io.netty:*:4.0-->
<!-- </exclude> &lt;!&ndash; exclude netty versions 4.0 and above. some of our dependencies (e.g. ES) still depend on netty 3.x which is fine as versions < 4.0 have a completely different package names &ndash;&gt;-->
</excludes>
<!-- <includes>-->
<!-- <include>io.netty:*:${netty.version}</include>-->
<!-- </includes>-->
</bannedDependencies>
<bannedPlugins>
<excludes>
<exclude>io.protostuff:protostuff-maven-plugin</exclude>
</excludes>
</bannedPlugins>
<requireSameVersions>
<plugins>
<plugin>*:*</plugin>
</plugins>
</requireSameVersions>
</rules>
</configuration>
</execution>
Expand Down Expand Up @@ -376,73 +398,104 @@ limitations under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.arrow:arrow-flight:shaded</include>
<include>com.google.flatbuffers:flatbuffers-java</include>
<!-- <include>io.grpc:*</include>-->
<!-- <include>io.netty:*</include>-->
<include>io.opencensus:*</include>
<include>com.google.code.gson:gson</include>
<include>com.google.code.findbugs:jsr305</include>
<include>com.google.code.errorprone:error_prone_annotations</include>
<include>com.google.api.grpc:proto-google-common-protos</include>
<include>com.google.protobuf:protobuf-java</include>
</includes>
<excludes>
<exclude>io.netty:netty-transport-native-unix-common</exclude>
<exclude>io.netty:netty-transport-native-epoll</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation><pattern>com.google.protobuf</pattern><shadedPattern>cdap.com.google.protobuf</shadedPattern></relocation>
<!-- Entries to relocate netty native libraries -->
<relocation><pattern>META-INF.native.libnetty_</pattern><shadedPattern>META-INF.native.libcdap_netty_</shadedPattern></relocation>
<relocation><pattern>META-INF.native.netty_</pattern><shadedPattern>META-INF.native.cdap_netty_</shadedPattern></relocation>

</relocations>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.arrow:arrow-flight:shaded</include>
<include>com.google.flatbuffers:flatbuffers-java</include>
<!-- <include>io.grpc:*</include>-->
<!-- <include>io.netty:*</include>-->
<include>io.opencensus:*</include>
<include>com.google.code.gson:gson</include>
<include>com.google.code.findbugs:jsr305</include>
<include>com.google.code.errorprone:error_prone_annotations</include>
<include>com.google.api.grpc:proto-google-common-protos</include>
<include>com.google.protobuf:protobuf-java</include>
</includes>
<excludes>
<exclude>io.netty:netty-transport-native-unix-common</exclude>
<exclude>io.netty:netty-transport-native-epoll</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>cdap.com.google.protobuf</shadedPattern>
</relocation>
<!-- Entries to relocate netty native libraries -->
<relocation>
<pattern>META-INF.native.libnetty_</pattern>
<shadedPattern>META-INF.native.libcdap_netty_</shadedPattern>
</relocation>
<relocation>
<pattern>META-INF.native.netty_</pattern>
<shadedPattern>META-INF.native.cdap_netty_</shadedPattern>
</relocation>

</relocations>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
<groupId>org.apache.arrow</groupId>
<!-- <artifactId>flight-grpc</artifactId>-->
<artifactId>arrow-flight</artifactId>
<version>${arrow.flight.version}</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.google.guava</groupId>-->
<!-- <artifactId>guava</artifactId>-->
<!-- <version>${guava.version}</version>-->
<!-- </dependency>-->


<dependency>
<groupId>com.dremio.sabot</groupId>
Expand All @@ -454,6 +507,22 @@ limitations under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -611,7 +680,7 @@ limitations under the License.
</profile>
</profiles>

<repositories>
<repositories>
<repository>
<id>dremio-public</id>
<url>http://maven.dremio.com/public/</url>
Expand Down
50 changes: 44 additions & 6 deletions src/main/java/com/dremio/flight/AuthValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.dremio.exec.proto.UserBitShared;
import com.dremio.exec.proto.UserProtos;
import com.dremio.exec.server.SabotContext;
import com.dremio.sabot.rpc.user.UserRpcUtils;
import com.dremio.sabot.rpc.user.UserSession;
import com.dremio.service.users.SystemUser;
import com.dremio.service.users.UserLoginException;
Expand All @@ -41,19 +42,25 @@
public class AuthValidator implements BasicServerAuthHandler.BasicAuthValidator {
private static final Logger logger = LoggerFactory.getLogger(AuthValidator.class);
private final Map<ByteArrayWrapper, UserSession> sessions = new HashMap<>();
private final Map<ByteArrayWrapper, String> passwords = new HashMap<>();
private final Map<ByteArrayWrapper, FlightSessionOptions> options = new HashMap<>();
private final Map<String, ByteArrayWrapper> tokens = new HashMap<>();
private final Provider<UserService> userService;
private final Provider<SabotContext> context;
private final UserService userService;
private final SabotContext context;

public AuthValidator(Provider<UserService> userService, Provider<SabotContext> context) {
this.userService = userService.get();
this.context = context.get();
}

public AuthValidator(UserService userService, SabotContext context) {
this.userService = userService;
this.context = context;
}


@Override
public byte[] getToken(String user, String password) throws Exception {
// UserSession.Builder.newBuilder()
UserService userService = this.userService.get();
try {
if (userService != null) {
userService.authenticate(user, password);
Expand All @@ -64,6 +71,8 @@ public byte[] getToken(String user, String password) throws Exception {
}
byte[] b = (user + ":" + password).getBytes();
sessions.put(new ByteArrayWrapper(b), build(user, password));
passwords.put(new ByteArrayWrapper(b), password);
options.put(new ByteArrayWrapper(b), new FlightSessionOptions());
tokens.put(user, new ByteArrayWrapper(b));
logger.info("authenticated {}", user);
return b;
Expand All @@ -75,24 +84,53 @@ public byte[] getToken(String user, String password) throws Exception {

@Override
public Optional<String> isValid(byte[] bytes) {
String user = sessions.get(new ByteArrayWrapper(bytes)).getCredentials().getUserName();
logger.warn("Sessions: " + sessions.keySet().size() + " with entries " + sessions.keySet());
logger.warn("asking for " + new ByteArrayWrapper(bytes) + " it is " + ((sessions.containsKey(new ByteArrayWrapper(bytes))) ? "in" : "not in") + " the session set");
UserSession session = sessions.get(new ByteArrayWrapper(bytes));
String user = null;
if (session != null) {
user = session.getCredentials().getUserName();
}
return Optional.ofNullable(user);
}

private UserSession build(String user, String password) {
return UserSession.Builder.newBuilder()
.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName(user).build())
.withOptionManager(context.getOptionManager())
.withUserProperties(
UserProtos.UserProperties.newBuilder().addProperties(
UserProtos.Property.newBuilder().setKey("password").setValue(password).build()
).build())
.withOptionManager(context.get().getOptionManager()).build();
.withClientInfos(UserRpcUtils.getRpcEndpointInfos("Dremio Flight Client"))
.setSupportComplexTypes(true)
.build();
}

public UserSession getUserSession(FlightProducer.CallContext callContext) {
return sessions.get(tokens.get(callContext.peerIdentity()));
}

public String getUserPassword(FlightProducer.CallContext callContext) {
return passwords.get(tokens.get(callContext.peerIdentity()));
}

public FlightSessionOptions getSessionOptions(FlightProducer.CallContext callContext) {
return options.get(tokens.get(callContext.peerIdentity()));
}

public static class FlightSessionOptions {
private boolean isParallel;

public boolean isParallel() {
return isParallel;
}

public void setParallel(boolean parallel) {
isParallel = parallel;
}
}

/**
* wrapper class to make byte[] a map key
*/
Expand Down
Loading

0 comments on commit ba012ef

Please sign in to comment.