Skip to content

Commit

Permalink
ExpediaGroup#197 Hive 3 migration: porting existing logics
Browse files Browse the repository at this point in the history
  • Loading branch information
rtotaro committed Apr 26, 2021
1 parent 5e64ae1 commit c73da63
Show file tree
Hide file tree
Showing 29 changed files with 1,058 additions and 308 deletions.
61 changes: 50 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<module>waggle-dance-boot</module>
<module>waggle-dance-integration-tests</module>
<module>waggle-dance</module>
<module>waggle-dance-rpm</module>
<!-- <module>waggle-dance-rpm</module>-->
</modules>

<properties>
Expand All @@ -43,7 +43,7 @@
<dropwizard.metrics.version>3.1.5</dropwizard.metrics.version>
<aspectj-maven-plugin.version>1.9</aspectj-maven-plugin.version>
<aspectj.version>1.8.9</aspectj.version>
<beeju.version>4.0.0</beeju.version>
<beeju.version>5.0.0</beeju.version>
<guava.version>23.0</guava.version>
<guice.version>4.0</guice.version>
<hcommon-hive-metastore.version>1.2.3</hcommon-hive-metastore.version>
Expand Down Expand Up @@ -81,6 +81,38 @@
<artifactId>javax.el</artifactId>
<version>2.2.6</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<!-- spring uses a newer version of byte-buddy but Mockito requires
an older version -->
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.10.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
<version>1.10.15</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
Expand Down Expand Up @@ -118,6 +150,12 @@
<version>${hive.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
Expand All @@ -126,7 +164,8 @@
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<!-- we should use whatever hive-metastore is using. Cannot rely on the transitive dep because we want it test scoped. -->
<!-- we should use whatever hive-metastore is using. Cannot rely
on the transitive dep because we want it test scoped. -->
<version>10.10.2.0</version>
<scope>test</scope>
</dependency>
Expand All @@ -135,6 +174,12 @@
<artifactId>beeju</artifactId>
<version>${beeju.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-runner</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.hotels</groupId>
Expand All @@ -153,6 +198,8 @@
<version>${maven.release.plugin.version}</version>
</plugin>
<plugin>
<!-- NOTE: we are blocked on building waggle dance using Java 11
on the below, need to wait for https://github.com/mojohaus/aspectj-maven-plugin/pull/45 -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>${aspectj-maven-plugin.version}</version>
Expand Down Expand Up @@ -224,15 +271,7 @@
<configuration>
<!-- excluding files that don't need a header update -->
<excludes>
<exclude>src/main/java/com/hotels/bdp/waggledance/api/validation/constraint/TunnelRoute.java</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/api/validation/validator/TunnelRouteValidator.java
</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClient.java</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/validation/Preconditions.java</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/parse/ASTConverter.java</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/parse/ASTNodeUtils.java</exclude>
<exclude>src/main/java/com/hotels/bdp/waggledance/parse/Rule.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
@Type(value = PrimaryMetaStore.class, name = "PRIMARY"),
@Type(value = FederatedMetaStore.class, name = "FEDERATED") })
public abstract class AbstractMetaStore {
private String catalog = "hive";
private String databasePrefix;
private String hiveMetastoreFilterHook;
private List<String> writableDatabaseWhitelist;
Expand Down Expand Up @@ -91,6 +92,16 @@ public static PrimaryMetaStore newPrimaryInstance(String name, String remoteMeta
return new PrimaryMetaStore(name, remoteMetaStoreUris, AccessControlType.READ_ONLY);
}

public String getCatalog()
{
return catalog;
}

public void setCatalog(String catalog)
{
this.catalog = catalog;
}

public String getDatabasePrefix() {
return databasePrefix;
}
Expand Down Expand Up @@ -212,7 +223,7 @@ public void setStatus(MetaStoreStatus status) {

@Override
public int hashCode() {
return Objects.hashCode(name);
return Objects.hashCode(name, catalog);
}

@Override
Expand All @@ -224,7 +235,7 @@ public boolean equals(Object obj) {
return false;
}
final AbstractMetaStore other = (AbstractMetaStore) obj;
return Objects.equal(name, other.name);
return Objects.equal(name, other.name) && Objects.equal(catalog, other.catalog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void nullDatabasePrefix() {

@Test
public void toJson() throws Exception {
String expected = "{\"accessControlType\":\"READ_ONLY\",\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"name_\",\"federationType\":\"FEDERATED\",\"hiveMetastoreFilterHook\":null,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
String expected = "{\"accessControlType\":\"READ_ONLY\",\"catalog\":\"hive\",\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"name_\",\"federationType\":\"FEDERATED\",\"hiveMetastoreFilterHook\":null,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
ObjectMapper mapper = new ObjectMapper();
// Sorting to get deterministic test behaviour
mapper.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void nonEmptyDatabasePrefix() {

@Test
public void toJson() throws Exception {
String expected = "{\"accessControlType\":\"READ_ONLY\",\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"\",\"federationType\":\"PRIMARY\",\"hiveMetastoreFilterHook\":null,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
String expected = "{\"accessControlType\":\"READ_ONLY\",\"catalog\":\"hive\",\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"\",\"federationType\":\"PRIMARY\",\"hiveMetastoreFilterHook\":null,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
ObjectMapper mapper = new ObjectMapper();
// Sorting to get deterministic test behaviour
mapper.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@

public interface DatabaseMapping extends MetaStoreMapping {


Partition transformInboundPartition(Partition partition);

Table transformInboundTable(Table table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public DatabaseMappingImpl(MetaStoreMapping metaStoreMapping, QueryMapping query
this.queryMapping = queryMapping;
}

@Override
public String getCatalog()
{
return metaStoreMapping.getCatalog();
}

@Override
public MetaStoreFilterHook getMetastoreFilter() {
return metaStoreMapping.getMetastoreFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

public interface MetaStoreMapping extends Closeable {

String getCatalog();

/**
* Outbound means parameter coming from the Hive Metastore and return result will be sent to user client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public MetaStoreMappingDecorator(MetaStoreMapping metaStoreMapping) {
this.metaStoreMapping = metaStoreMapping;
}

@Override
public String getCatalog()
{
return metaStoreMapping.getCatalog();
}

@Override
public String transformOutboundDatabaseName(String databaseName) {
if (databaseName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public MetaStoreMapping newInstance(AbstractMetaStore metaStore) {
LOG
.info("Mapping databases with name '{}' to metastore: {}", metaStore.getName(),
metaStore.getRemoteMetaStoreUris());
MetaStoreMapping metaStoreMapping = new MetaStoreMappingImpl(prefixNameFor(metaStore), metaStore.getName(),
MetaStoreMapping metaStoreMapping = new MetaStoreMappingImpl(prefixNameFor(metaStore), metaStore.getName(),metaStore.getCatalog(),
createClient(metaStore), accessControlHandlerFactory.newInstance(metaStore), metaStore.getConnectionType(),
metaStore.getLatency(), loadMetastoreFilterHook(metaStore));
if (waggleDanceConfiguration.getDatabaseResolution() == DatabaseResolution.PREFIXED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.hotels.bdp.waggledance.mapping.model;

import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.*;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -32,6 +34,7 @@

import com.hotels.bdp.waggledance.api.model.ConnectionType;
import com.hotels.bdp.waggledance.client.CloseableThriftHiveMetastoreIface;
import com.hotels.bdp.waggledance.server.FederatedHMSHandler;
import com.hotels.bdp.waggledance.server.security.AccessControlHandler;
import com.hotels.bdp.waggledance.server.security.NotAllowedException;

Expand All @@ -43,6 +46,7 @@ class MetaStoreMappingImpl implements MetaStoreMapping {
private final CloseableThriftHiveMetastoreIface client;
private final AccessControlHandler accessControlHandler;
private final String name;
private final String catalog;
private final long latency;
private final MetaStoreFilterHook metastoreFilter;

Expand All @@ -51,20 +55,27 @@ class MetaStoreMappingImpl implements MetaStoreMapping {
MetaStoreMappingImpl(
String databasePrefix,
String name,
String catalog,
CloseableThriftHiveMetastoreIface client,
AccessControlHandler accessControlHandler,
ConnectionType connectionType,
long latency,
MetaStoreFilterHook metastoreFilter) {
this.databasePrefix = databasePrefix;
this.name = name;
this.catalog = catalog;
this.client = client;
this.accessControlHandler = accessControlHandler;
this.connectionType = connectionType;
this.latency = latency;
this.metastoreFilter = metastoreFilter;
}

public String getCatalog()
{
return catalog;
}

@Override
public String transformOutboundDatabaseName(String databaseName) {
return databaseName.toLowerCase(Locale.ROOT);
Expand Down Expand Up @@ -121,10 +132,11 @@ public boolean isAvailable() {
}

@Override
public MetaStoreMapping checkWritePermissions(String databaseName) {
if (!accessControlHandler.hasWritePermission(databaseName)) {
public MetaStoreMapping checkWritePermissions(String databaseName){
String internal_name = FederatedHMSHandler.getDbInternalName(databaseName);
if (!accessControlHandler.hasWritePermission(internal_name)) {
throw new NotAllowedException(
"You cannot perform this operation on the virtual database '" + databaseName + "'.");
"You cannot perform this operation on the virtual database '" + databaseName + "'.");
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2021 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,32 +15,76 @@
*/
package com.hotels.bdp.waggledance.mapping.model;

import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.CAT_NAME;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.DB_NAME;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.parseDbName;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.prependNotNullCatToDbName;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.metastore.api.MetaException;

public class PrefixMapping extends MetaStoreMappingDecorator {

public PrefixMapping(MetaStoreMapping metaStoreMapping) {
super(metaStoreMapping);
}

@Override
public String transformOutboundDatabaseName(String databaseName) {
return getDatabasePrefix() + super.transformOutboundDatabaseName(databaseName);
public String transformOutboundDatabaseName(String databaseName) {
if(hasCatalogName(databaseName)) {
String[] catalogAndDatabase = new String[0];
try {
catalogAndDatabase = parseDbName(databaseName, null);
}
catch (MetaException e) {
//FIXME
throw new RuntimeException(e);
}
String dbName = catalogAndDatabase[DB_NAME];
String catalogName = catalogAndDatabase[CAT_NAME];
String prefixedDbName = getDatabasePrefix() + super.transformOutboundDatabaseName(dbName);
return prependNotNullCatToDbName(catalogName, prefixedDbName);
}else {
return getDatabasePrefix() + super.transformOutboundDatabaseName(databaseName);
}
}

private static boolean hasCatalogName(String dbName) {
return dbName != null && dbName.length() > 0 &&
dbName.charAt(0) == '@';
}

@Override
public List<String> transformOutboundDatabaseNameMultiple(String databaseName) {
List<String> outbound = super.transformOutboundDatabaseNameMultiple(databaseName);
List<String> result = new ArrayList<>(outbound.size());
for (String outboundDatabase : outbound) {
result.add(getDatabasePrefix() + outboundDatabase);
result.add(transformOutboundDatabaseName(outboundDatabase));
}
return result;
}

@Override
public String transformInboundDatabaseName(String databaseName) {
if(hasCatalogName(databaseName)) {
String[] catalogAndDatabase = new String[0];
try {
catalogAndDatabase = parseDbName(databaseName, null);
}
catch (MetaException e) {
//FIXME
throw new RuntimeException(e);
}
String dbName = catalogAndDatabase[DB_NAME];
String catalogName = catalogAndDatabase[CAT_NAME];
return prependNotNullCatToDbName(catalogName, internalTransformInboundDatabaseName(dbName));
}
return internalTransformInboundDatabaseName(databaseName);
}

private String internalTransformInboundDatabaseName(String databaseName) {
String result = super.transformInboundDatabaseName(databaseName);
if (result.startsWith(getDatabasePrefix())) {
return result.substring(getDatabasePrefix().length());
Expand Down
Loading

0 comments on commit c73da63

Please sign in to comment.