Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lake] Fluss lake catalog supports pluggable #105

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.factories;

/**
* Base interface for all kind of factories that create object instances from a list of key-value
* pairs in Fluss's catalog.
*
* <p>A factory is uniquely identified by {@link Class} and {@link #identifier()}.
*
* <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI).
* Classes that implement this interface can be added to {@code
* META_INF/services/com.alibaba.fluss.factories.Factory} in JAR files.
*/
public interface Factory {
/**
* Returns a unique identifier among same factory interfaces.
*
* <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code
* kafka}). If multiple factories exist for different versions, a version should be appended
* using "-" (e.g. {@code elasticsearch-7}).
*/
String identifier();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.factories;

/** Exception for {@link Factory}. */
public class FactoryException extends RuntimeException {

public FactoryException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.factories;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

/** Utility for working with {@link Factory}s. */
public class FactoryUtils {

private static final Logger LOG = LoggerFactory.getLogger(FactoryUtils.class);

/** Discovers a factory using the given factory base class and identifier. */
@SuppressWarnings("unchecked")
public static <T extends Factory> T discoverFactory(
ClassLoader classLoader, Class<T> factoryClass, String identifier) {
final List<Factory> factories = discoverFactories(classLoader);

final List<Factory> foundFactories =
factories.stream()
.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
.collect(Collectors.toList());

if (foundFactories.isEmpty()) {
throw new FactoryException(
String.format(
"Could not find any factories that implement '%s' in the classpath.",
factoryClass.getName()));
}

final List<Factory> matchingFactories =
foundFactories.stream()
.filter(f -> f.identifier().equals(identifier))
.collect(Collectors.toList());

if (matchingFactories.isEmpty()) {
throw new FactoryException(
String.format(
"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n"
+ "Available factory identifiers are:\n\n"
+ "%s",
identifier,
factoryClass.getName(),
foundFactories.stream()
.map(Factory::identifier)
.collect(Collectors.joining("\n"))));
}
if (matchingFactories.size() > 1) {
throw new FactoryException(
String.format(
"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n"
+ "Ambiguous factory classes are:\n\n"
+ "%s",
identifier,
factoryClass.getName(),
matchingFactories.stream()
.map(f -> f.getClass().getName())
.sorted()
.collect(Collectors.joining("\n"))));
}

return (T) matchingFactories.get(0);
}

public static <T extends Factory> List<String> discoverIdentifiers(
ClassLoader classLoader, Class<T> factoryClass) {
final List<Factory> factories = discoverFactories(classLoader);

return factories.stream()
.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
.map(Factory::identifier)
.collect(Collectors.toList());
}

private static List<Factory> discoverFactories(ClassLoader classLoader) {
final Iterator<Factory> serviceLoaderIterator =
ServiceLoader.load(Factory.class, classLoader).iterator();

final List<Factory> loadResults = new ArrayList<>();
while (true) {
try {
// error handling should also be applied to the hasNext() call because service
// loading might cause problems here as well
if (!serviceLoaderIterator.hasNext()) {
break;
}

loadResults.add(serviceLoaderIterator.next());
} catch (Throwable t) {
if (t instanceof NoClassDefFoundError) {
LOG.debug(
"NoClassDefFoundError when loading a {}. This is expected when trying to load factory but no implementation is loaded.",
Factory.class.getCanonicalName(),
t);
} else {
throw new RuntimeException(
"Unexpected error when trying to load service provider.", t);
}
}
}

return loadResults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalog;
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalogFactory;
import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtil;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
Expand Down Expand Up @@ -84,8 +84,6 @@ public class FlinkCatalog implements Catalog {
private Connection connection;
private Admin admin;

private volatile @Nullable LakeCatalog lakeCatalog;

public FlinkCatalog(
String name,
@Nullable String defaultDatabase,
Expand Down Expand Up @@ -274,7 +272,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath)

private CatalogBaseTable getLakeTable(String databaseName, String tableName)
throws TableNotExistException, CatalogException {
mayInitLakeCatalogCatalog();
LakeCatalog lakeCatalog = LakeCatalogFactory.getOrCreate(admin, catalogName, classLoader);
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
// should be pattern like table_name$lake
Expand Down Expand Up @@ -517,20 +515,4 @@ public void alterPartitionColumnStatistics(
private TablePath toTablePath(ObjectPath objectPath) {
return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName());
}

private void mayInitLakeCatalogCatalog() {
if (lakeCatalog == null) {
synchronized (this) {
if (lakeCatalog == null) {
try {
Map<String, String> catalogProperties =
admin.describeLakeStorage().get().getCatalogProperties();
lakeCatalog = new LakeCatalog(catalogName, catalogProperties, classLoader);
} catch (Exception e) {
throw new FlussRuntimeException("Failed to init paimon catalog.", e);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,10 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkFileIOLoader;
import org.apache.paimon.options.Options;

import java.util.Map;

/** A lake catalog to delegate the operations on lake table. */
public class LakeCatalog {

// currently, only support paimon
// todo make it pluggable
private final FlinkCatalog paimonFlinkCatalog;

public LakeCatalog(
String catalogName, Map<String, String> catalogProperties, ClassLoader classLoader) {
CatalogContext catalogContext =
CatalogContext.create(
Options.fromMap(catalogProperties), null, new FlinkFileIOLoader());
paimonFlinkCatalog =
FlinkCatalogFactory.createCatalog(catalogName, catalogContext, classLoader);
}
public interface LakeCatalog {

public CatalogBaseTable getTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
return paimonFlinkCatalog.getTable(objectPath);
}
CatalogBaseTable getTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.lakehouse;

import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.factories.Factory;
import com.alibaba.fluss.factories.FactoryUtils;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;

import javax.annotation.Nullable;

import java.util.Map;

/** Factory to create {@link LakeCatalog}. */
public abstract class LakeCatalogFactory implements Factory {

protected abstract LakeCatalog create(
String catalogName, Map<String, String> catalogProperties, ClassLoader classLoader);

private static volatile @Nullable LakeCatalog lakeCatalog;

/** Get or create a {@link LakeCatalog} instance. */
public static LakeCatalog getOrCreate(
Admin admin, String catalogName, ClassLoader classLoader) {
if (lakeCatalog == null) {
synchronized (LakeCatalogFactory.class) {
if (lakeCatalog == null) {
lakeCatalog = create(admin, catalogName, classLoader);
}
}
}
return lakeCatalog;
}

/** Create a {@link LakeCatalog} instance. */
static LakeCatalog create(Admin admin, String catalogName, ClassLoader classLoader) {
String lakeStorage = null;
try {
LakeStorageInfo lakeStorageInfo = admin.describeLakeStorage().get();
lakeStorage = lakeStorageInfo.getLakeStorage();
LakeCatalogFactory lakeCatalogFactory =
FactoryUtils.discoverFactory(
classLoader, LakeCatalogFactory.class, lakeStorage);
return lakeCatalogFactory.create(
catalogName, lakeStorageInfo.getCatalogProperties(), classLoader);
} catch (Exception e) {
throw new FlussRuntimeException("Failed to init " + lakeStorage + " lake catalog.", e);
}
}
}
Loading