diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 000000000..8bb20115a --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,29 @@ +--- +name: Bug report +about: Create a bug report to help us improve +labels: "bug" +--- + +**Describe the bug** + + + +**To Reproduce** + +Steps to reproduce the behavior: + +1. ... +2. ... +3. ... + +**Expected behavior** + + + +**System [please complete the following information]:** + +- OS: e.g. [Ubuntu 18.04] + +**Additional context** + + diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 000000000..5065058d6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,17 @@ +--- +name: Feature request +about: Suggest a new feature +labels: "enhancement" +--- + +**Is your feature request related to a problem? Please describe.** + + + +**Describe the solution you would like** + + + +**Additional context** + + diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000000000..4a1217df6 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,10 @@ +**PR Checklist** + +- [ ] A description of the changes is added to the description of this PR. +- [ ] If there is a related issue, make sure it is linked to this PR. +- [ ] If you've fixed a bug or added code that should be tested, add tests! +- [ ] If you've added or modified a feature, documentation in `docs` is updated + +**Description of changes** + + diff --git a/.github/workflows/sbt-tests.yml b/.github/workflows/sbt-tests.yml index 26ff45d9f..4474091e0 100644 --- a/.github/workflows/sbt-tests.yml +++ b/.github/workflows/sbt-tests.yml @@ -29,6 +29,3 @@ jobs: cache: 'sbt' - name: Run tests run: sbt test - # Optional: This step uploads information to the GitHub dependency graph and unblocking Dependabot alerts for the repository - - name: Upload dependency graph - uses: scalacenter/sbt-dependency-submission@ab086b50c947c9774b70f39fc7f6e20ca2706c91 diff --git a/.gitignore b/.gitignore index 8be0399c4..8c5351828 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ etc/db/h2db.mv.db -.idea -*target* -build/sbt-launch-* -.bsp +target/ +.idea/ +.bsp/ diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 62b0937a6..c5962bbd4 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -106,7 +106,7 @@ Violating these terms may lead to a permanent ban. ### 4. Permanent Ban **Community Impact**: Demonstrating a pattern of violation of community -standards, including sustained inappropriate behavior, harassment of an +standards, including sustained inappropriate behavior, harassment of an individual, or aggression toward or disparagement of classes of individuals. **Consequence**: A permanent ban from any sort of public interaction within diff --git a/NOTICE b/NOTICE index 09015e99e..32650825d 100644 --- a/NOTICE +++ b/NOTICE @@ -55,7 +55,7 @@ Notice - https://github.com/delta-io/delta/blob/master/NOTICE.txt sbt/sbt-jupiter-interface - https://github.com/sbt/sbt-jupiter-interface/tree/main Copyright sbt-jupiter-interface authors -LIcense - https://github.com/sbt/sbt-jupiter-interface/blob/main/LICENSE +License - https://github.com/sbt/sbt-jupiter-interface/blob/main/LICENSE raphw/byte-buddy - https://github.com/raphw/byte-buddy Copyright byte-buddy authors diff --git a/README.md b/README.md index 2ed7c3948..83fd6e22d 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Let's take Unity Catalog for spin. In this guide, we are going to do the followi OR -You have to ensure that you local environment has the following: +You have to ensure that your local environment has the following: - Clone this repository. - Ensure the `JAVA_HOME` environment variable your terminal is configured to point to JDK11+. - Compile the project using `build/sbt package` @@ -38,7 +38,7 @@ docker run -d -i --name unitycatalog -p 8081:8081 datacatering/unitycatalog:0.1. In a terminal, in the cloned repository root directory, start the UC server. -``` +```sh bin/start-uc-server ``` @@ -46,15 +46,15 @@ For the rest of the steps, continue in a different terminal. ### Operate on Delta tables with the CLI Let's list the tables. -``` +```sh bin/uc table list --catalog unity --schema default ``` You should see a few tables. Some details are truncated because of the nested nature of the data. To see all the content, you can add `--output jsonPretty` to any command. -Next, let's get the metadata of one those tables. +Next, let's get the metadata of one of those tables. -``` +```sh bin/uc table get --full_name unity.default.numbers ``` @@ -62,7 +62,7 @@ You can see that it is a Delta table. Now, specifically for Delta tables, this C print snippet of the contents of a Delta table (powered by the [Delta Kernel Java](https://delta.io/blog/delta-kernel/) project). Let's try that. -``` +```sh bin/uc table read --full_name unity.default.numbers ``` @@ -71,7 +71,7 @@ bin/uc table read --full_name unity.default.numbers For trying with DuckDB, you will have to [install it](https://duckdb.org/docs/installation/) (at least version 1.0). Let's start DuckDB and install a couple of extensions. To start DuckDB, run the command `duckdb` in the terminal. Then, in the DuckDB shell, run the following commands: -```sh +```sql install uc_catalog from core_nightly; load uc_catalog; install delta; @@ -80,8 +80,8 @@ load delta; If you have installed these extensions before, you may have to run `update extensions` and restart DuckDB for the following steps to work. -Now that we have DuckDB all set up, let's trying connecting to UC by specifying a secret. -```sh +Now that we have DuckDB all set up, let's try connecting to UC by specifying a secret. +```sql CREATE SECRET ( TYPE UC, TOKEN 'not-used', @@ -90,10 +90,10 @@ CREATE SECRET ( ); ``` You should see it print a short table saying `Success` = `true`. Then we attach the `unity` catalog to DuckDB. -```sh +```sql ATTACH 'unity' AS unity (TYPE UC_CATALOG); ``` -Now we ready to query. Try the following +Now we are ready to query. Try the following ```sql SHOW ALL TABLES; @@ -114,19 +114,24 @@ See the full [tutorial](docs/tutorial.md) for more details. - Compatibility and stability: The APIs are currently evolving and should not be assumed to be stable. ## Compiling and testing -- Install JDK 11 by whatever mechanism that is appropriate for your system, and +- Install JDK 11 by whatever mechanism is appropriate for your system, and set that version to be the default Java version (e.g., by setting env variable JAVA_HOME) - To compile all the code without running tests, run the following: - ``` + ```sh build/sbt clean compile ``` - To compile and execute tests, run the following: - ``` + ```sh build/sbt clean test ``` - To update the API specification, just update the `api/all.yaml` and then run the following: - ``` + ```sh build/sbt generate ``` This will regenerate the OpenAPI data models in the UC server and data models + APIs in the client SDK. + +### Using more recent JDKs + +The build script [checks for a lower bound on the JDK](./build.sbt#L14) but the [current SBT version](./project/build.properties) +imposes an upper bound. Please check the [JDK compatibility](https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html) documentation for more information diff --git a/dev/copybara.py b/dev/copybara.py deleted file mode 100644 index 380a9e70a..000000000 --- a/dev/copybara.py +++ /dev/null @@ -1,173 +0,0 @@ -# -# DATABRICKS CONFIDENTIAL & PROPRIETARY -# __________________ -# -# Copyright 2019 Databricks, Inc. -# All Rights Reserved. -# -# NOTICE: All information contained herein is, and remains the property of Databricks, Inc. -# and its suppliers, if any. The intellectual and technical concepts contained herein are -# proprietary to Databricks, Inc. and its suppliers and may be covered by U.S. and foreign Patents, -# patents in process, and are protected by trade secret and/or copyright law. Dissemination, use, -# or reproduction of this information is strictly forbidden unless prior written permission is -# obtained from Databricks, Inc. -# -# If you view or obtain a copy of this information and believe Databricks, Inc. may not have -# intended it to be made available, please promptly report it to Databricks Legal Department -# @ legal@databricks.com. -# - -import os -import subprocess -import sys -import random -import string -import tempfile - -def run_copybara( - origin_url, - origin_ref, - target_git_repo, - target_branch, - config_file, - workflow, - auto_commit -): - copybara_command = download_copybara(os.path.dirname(os.path.dirname(config_file))) - # - # ["java", "-Xmx6g", "-jar", "/Users/tdas/Projects/copybara/bazel-bin/java/com/google/copybara/copybara_deploy.jar"] - - copybara_args = ["migrate", config_file, workflow, "--force", "--init-history", - "--ignore-noop"] - locations_file = os.path.join(os.path.dirname(config_file), ".locations.bara.sky") - locations_file_content = """ -originUrl="{origin_url}" -originRef="{origin_ref}" -destinationUrl="{destination_url}" -destinationRef="{destination_ref}" -copybaraMode="{migration_mode}" -""".format( - origin_url=origin_url, - origin_ref=origin_ref, - destination_url=target_git_repo, - destination_ref=target_branch, - migration_mode="SQUASH") - - with WorkingDirectory(os.path.dirname(config_file)): - try: - with open(locations_file, "w") as location_file: - print("Writing Location File %s:\n%s" % (locations_file, locations_file_content)) - location_file.write(locations_file_content) - - if not os.path.exists(locations_file): - raise Exception("Failed to write Locations File %s" % locations_file) - - # If we are using the current HEAD, check if there are uncommitted changes - if origin_ref == "HEAD": - (exit_code, _, _) = run_cmd(["git", "diff-index", "--quiet", "HEAD", "--"], - throw_on_error=False) - has_uncommitted_changes = exit_code != 0 - if has_uncommitted_changes: - if auto_commit: - print("Committing uncommitted Changes") - run_cmd(["git", "commit", "-am", "Copybara Auto Commit"], - stream_output=True) - else: - print("================ WARNING ================") - print("Using HEAD as the origin commit will not sync uncommitted changes") - print("Use --auto-commit to automatically commit changes") - print("=========================================") - print("Running copybara command: %s" % str(copybara_command + copybara_args)) - run_cmd(copybara_command + copybara_args, stream_output=True) - finally: - if os.path.exists(locations_file): - os.remove(locations_file) - - -def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): - """Runs a command as a child process. - - A convenience wrapper for running a command from a Python script. - Keyword arguments: - cmd -- the command to run, as a list of strings - throw_on_error -- if true, raises an Exception if the exit code of the program is nonzero - env -- additional environment variables to be defined when running the child process - stream_output -- if true, does not capture standard output and error; if false, captures these - streams and returns them - - Note on the return value: If stream_output is true, then only the exit code is returned. If - stream_output is false, then a tuple of the exit code, standard output and standard error is - returned. - """ - cmd_env = os.environ.copy() - if env: - cmd_env.update(env) - - if stream_output: - # Flush buffered output before running the command so that the output of the command will - # show up after the current buffered output - sys.stdout.flush() - sys.stderr.flush() - child = subprocess.Popen(cmd, env=cmd_env, **kwargs) - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception("Non-zero exitcode: %s" % (exit_code)) - return exit_code - else: - child = subprocess.Popen( - cmd, - env=cmd_env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs) - (stdout, stderr) = child.communicate() - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception( - "Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" % - (exit_code, stdout, stderr)) - return (exit_code, stdout, stderr) - - -def download_copybara(download_dir): - download_path = os.path.join(download_dir, "copybara_deploy.jar") - if not os.path.exists(download_path): - # Download copybara source from https://github.com/tdas/copybara/tree/updated_by_td - # and build it. - print("Downloading and building copybara at %s" % download_path) - download_script = os.path.join(download_dir, "build_copybara.sh") - run_cmd(["sh", "-c", download_script]) - if not os.path.exists(download_path): - print("Run %s manually and ensure copybara JAR is present at %s" % - (download_script, download_path)) - - copybara_cmd = ["java", "-Xmx10g", "-jar", download_path] - return copybara_cmd - - -def create_random_branch_name(): - return ''.join( - random.choice(string.ascii_lowercase + string.digits) - for _ in range(8)) - - -def initialize_empty_git_repo(): - git_repo = tempfile.mkdtemp() - with WorkingDirectory(git_repo): - run_cmd(["git", "init"]) - branch_name = create_random_branch_name() - - return git_repo, branch_name - - -# pylint: disable=too-few-public-methods -class WorkingDirectory(object): - def __init__(self, working_directory): - self.working_directory = working_directory - self.old_workdir = os.getcwd() - - def __enter__(self): - os.chdir(self.working_directory) - - def __exit__(self, tpe, value, traceback): - os.chdir(self.old_workdir) diff --git a/docs/cli.md b/docs/cli.md index 2b1ef6580..95b79474c 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -330,7 +330,7 @@ This is an experimental feature and only supported for python functions that tak It runs the functions using the python engine script at `etc/data/function/python_engine.py`. Example: -Invoke a python sum function that take two integer inputs: +Invoke a python sum function that takes two integer inputs: ```sh bin/uc function call --full_name my_catalog.my_schema.my_function --input_params "1,2" ``` diff --git a/docs/server.md b/docs/server.md index 450bb5b10..a6be64c22 100644 --- a/docs/server.md +++ b/docs/server.md @@ -10,7 +10,7 @@ bin/start-uc-server ## Configuration -The server config file is at the location `etc/conf/server.properties` (relative to the project root). +The server config file is at the location `etc/conf/server.properties` (relative to the project root). - `server.env`: The environment in which the server is running. This can be set to `dev` or `test`. When set to `test` the server will instantiate an empty in-memory h2 database for storing metadata. diff --git a/docs/tutorial.md b/docs/tutorial.md index 106f6c4ab..7dc38db90 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -2,11 +2,11 @@ Let's take Unity Catalog for spin. In this tutorial, we are going to do the following: - In one terminal, run the UC server. - In another terminal, we will explore the contents of the UC server using the UC CLI, - which is example UC connector provided to demonstrate how to use the UC SDK for various assets, + which is an example UC connector provided to demonstrate how to use the UC SDK for various assets, as well as provide a convenient way to explore the content of any UC server implementation. ### Prerequisites -You have to ensure that you local environment has the following: +You have to ensure that your local environment has the following: - Clone this repository. - Ensure the `JAVA_HOME` environment variable your terminal is configured to point to JDK11+. - Compile the project running `build/sbt package` in the repository root directory. @@ -67,7 +67,7 @@ Let's try creating a new table. bin/uc table create --full_name unity.default.myTable --columns "col1 int, col2 double" --storage_location /tmp/uc/myTable ``` -If you list the tables again you should see this new table. Next, let's write to the table with +If you list the tables again, you should see this new table. Next, let's write to the table with some randomly generated data (again, powered by [Delta Kernel Java](https://delta.io/blog/delta-kernel/)] and read it back. ``` @@ -91,7 +91,7 @@ load delta; If you have installed these extensions before, you may have to run `update extensions` and restart DuckDB for the following steps to work. -Now that we have DuckDB all set up, let's trying connecting to UC by specifying a secret. +Now that we have DuckDB all set up, let's try connecting to UC by specifying a secret. ```sh CREATE SECRET ( TYPE UC, @@ -104,7 +104,7 @@ You should see it print a short table saying `Success` = `true`. Then we attach ```sh ATTACH 'unity' AS unity (TYPE UC_CATALOG); ``` -Now we ready to query. Try the following +Now we are ready to query. Try the following ```sql SHOW ALL TABLES; @@ -137,7 +137,7 @@ You should see two text files listed and one directory. Let's read the content o ``` bin/uc volume read --full_name unity.default.json_files --path c.json ``` -Voila! You have read the content of a file stored in a volume.We can also list the contents of any subdirectory. +Voila! You have read the content of a file stored in a volume. We can also list the contents of any subdirectory. For e.g.: ``` @@ -179,7 +179,7 @@ with the function name and arguments. bin/uc function call --full_name unity.default.sum --input_params "1,2,3" ``` -Voila! You have called a function stored in UC. Lets try and create a new function. +Voila! You have called a function stored in UC. Let's try and create a new function. ``` bin/uc function create --full_name unity.default.myFunction --data_type INT --input_params "a int, b int" --def "c=a*b\nreturn c" @@ -228,4 +228,4 @@ SELECT * FROM iceberg."unity.default".marksheet_uniform ## APIs and Compatibility - Open API specification: The Unity Catalog Rest API is documented [here](../api). -- Compatibility and stability: The APIs are currently evolving and should not be assumed to be stable. \ No newline at end of file +- Compatibility and stability: The APIs are currently evolving and should not be assumed to be stable. diff --git a/etc/db/h2db.mv.db b/etc/db/h2db.mv.db index 80b0435ab..0458fa978 100644 Binary files a/etc/db/h2db.mv.db and b/etc/db/h2db.mv.db differ diff --git a/project/build.properties b/project/build.properties index 3b06b0f4f..f716c53e0 100644 --- a/project/build.properties +++ b/project/build.properties @@ -33,4 +33,4 @@ # limitations under the License. # -sbt.version=1.6.1 +sbt.version=1.10.0 diff --git a/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java b/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java deleted file mode 100644 index fa30ad7c0..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java +++ /dev/null @@ -1,121 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CatalogInfo; -import io.unitycatalog.server.model.ListCatalogsResponse; -import io.unitycatalog.server.persist.dao.CatalogInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.query.Query; -import io.unitycatalog.server.model.CreateCatalog; -import io.unitycatalog.server.model.UpdateCatalog; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.stream.Collectors; - -public class CatalogOperations { - @Getter - private static final CatalogOperations instance = new CatalogOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(CatalogOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - private CatalogOperations() {} - - public CatalogInfo addCatalog(CreateCatalog createCatalog) { - ValidationUtils.validateSqlObjectName(createCatalog.getName()); - CatalogInfoDAO catalogInfo = new CatalogInfoDAO(); - catalogInfo.setId(java.util.UUID.randomUUID()); - catalogInfo.setName(createCatalog.getName()); - catalogInfo.setComment(createCatalog.getComment()); - catalogInfo.setCreatedAt(new Date()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - session.persist(catalogInfo); - tx.commit(); - System.out.println("Added catalog: " + catalogInfo.getName()); - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } catch(Exception e) { - LOGGER.error("Error adding catalog", e); - return null; - } - } - - public ListCatalogsResponse listCatalogs() { - ListCatalogsResponse response = new ListCatalogsResponse(); - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - response.setCatalogs(session - .createQuery("from CatalogInfoDAO ", CatalogInfoDAO.class).list() - .stream().map(CatalogInfoDAO::toCatalogInfo).collect(Collectors.toList())); - return response; - } catch(Exception e) { - LOGGER.error("Error listing catalogs", e); - return null; - } - } - - public CatalogInfo getCatalog(String name) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - return getCatalog(session, name); - } catch(Exception e) { - LOGGER.error("Error getting catalog", e); - return null; - } - } - - public CatalogInfo getCatalog(Session session, String name) { - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - if (catalogInfo == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); - } - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } - - public CatalogInfoDAO getCatalogInfoDAO(Session session, String name) { - Query query = session - .createQuery("FROM CatalogInfoDAO WHERE name = :value", CatalogInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - return query.uniqueResult(); - } - - public CatalogInfo updateCatalog(String name, UpdateCatalog updateCatalog) { - ValidationUtils.validateSqlObjectName(updateCatalog.getNewName()); - // cna make this just update once we have an identifier that is not the name - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - catalogInfo.setName(updateCatalog.getNewName()); - catalogInfo.setComment(updateCatalog.getComment()); - catalogInfo.setUpdatedAt(new Date()); - session.merge(catalogInfo); - tx.commit(); - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } catch(Exception e) { - LOGGER.error("Error updating catalog", e); - return null; - } - } - - public void deleteCatalog(String name) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - if (catalogInfo != null) { - session.remove(catalogInfo); - tx.commit(); - LOGGER.info("Deleted catalog: {}", catalogInfo.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); - } - } catch (Exception e) { - LOGGER.error("Error deleting catalog", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java b/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java new file mode 100644 index 000000000..dd78a44b7 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java @@ -0,0 +1,146 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.CatalogInfo; +import io.unitycatalog.server.model.ListCatalogsResponse; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.query.Query; +import io.unitycatalog.server.model.CreateCatalog; +import io.unitycatalog.server.model.UpdateCatalog; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.stream.Collectors; + +public class CatalogRepository { + @Getter + private static final CatalogRepository instance = new CatalogRepository(); + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + private CatalogRepository() {} + + public CatalogInfo addCatalog(CreateCatalog createCatalog) { + ValidationUtils.validateSqlObjectName(createCatalog.getName()); + CatalogInfoDAO catalogInfo = new CatalogInfoDAO(); + catalogInfo.setId(java.util.UUID.randomUUID()); + catalogInfo.setName(createCatalog.getName()); + catalogInfo.setComment(createCatalog.getComment()); + catalogInfo.setCreatedAt(new Date()); + + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + if (getCatalogDAO(session, createCatalog.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Catalog already exists: " + createCatalog.getName()); + } + session.persist(catalogInfo); + tx.commit(); + System.out.println("Added catalog: " + catalogInfo.getName()); + return CatalogInfoDAO.toCatalogInfo(catalogInfo); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public ListCatalogsResponse listCatalogs() { + ListCatalogsResponse response = new ListCatalogsResponse(); + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + response.setCatalogs(session + .createQuery("from CatalogInfoDAO ", CatalogInfoDAO.class).list() + .stream().map(CatalogInfoDAO::toCatalogInfo).collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + return response; + } + } + + public CatalogInfo getCatalog(String name) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + CatalogInfoDAO catalogInfo = null; + try { + catalogInfo = getCatalogDAO(session, name); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + if (catalogInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + return CatalogInfoDAO. + toCatalogInfo(getCatalogDAO(session, name)); + } + } + + public CatalogInfoDAO getCatalogDAO(Session session, String name) { + Query query = session + .createQuery("FROM CatalogInfoDAO WHERE name = :value", CatalogInfoDAO.class); + query.setParameter("value", name); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public CatalogInfo updateCatalog(String name, UpdateCatalog updateCatalog) { + ValidationUtils.validateSqlObjectName(updateCatalog.getNewName()); + // cna make this just update once we have an identifier that is not the name + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + CatalogInfoDAO catalogInfo = getCatalogDAO(session, name); + if (catalogInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + if (getCatalogDAO(session, updateCatalog.getNewName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Catalog already exists: " + updateCatalog.getNewName()); + } + catalogInfo.setName(updateCatalog.getNewName()); + catalogInfo.setComment(updateCatalog.getComment()); + catalogInfo.setUpdatedAt(new Date()); + session.merge(catalogInfo); + tx.commit(); + return CatalogInfoDAO.toCatalogInfo(catalogInfo); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteCatalog(String name) { + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + CatalogInfoDAO catalogInfo = getCatalogDAO(session, name); + if (catalogInfo != null) { + session.remove(catalogInfo); + tx.commit(); + LOGGER.info("Deleted catalog: {}", catalogInfo.getName()); + } else { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java b/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java index da8c79410..58387a594 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java +++ b/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java @@ -2,11 +2,9 @@ import io.unitycatalog.server.exception.BaseException; import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CreateFunction; -import io.unitycatalog.server.model.CreateFunctionRequest; -import io.unitycatalog.server.model.FunctionInfo; -import io.unitycatalog.server.model.ListFunctionsResponse; +import io.unitycatalog.server.model.*; import io.unitycatalog.server.persist.dao.FunctionInfoDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; import io.unitycatalog.server.utils.ValidationUtils; import lombok.Getter; import org.hibernate.Session; @@ -24,6 +22,7 @@ public class FunctionRepository { @Getter private static final FunctionRepository instance = new FunctionRepository(); + private static final SchemaRepository schemaRepository = SchemaRepository.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRepository.class); private static final SessionFactory SESSION_FACTORY = HibernateUtil.getSessionFactory(); @@ -58,77 +57,142 @@ public FunctionInfo createFunction(CreateFunctionRequest createFunctionRequest) try (Session session = SESSION_FACTORY.openSession()) { Transaction tx = session.beginTransaction(); - FunctionInfoDAO dao = FunctionInfoDAO.from(functionInfo); - dao.getInputParams().forEach(p -> { - p.setId(UUID.randomUUID().toString()); - p.setFunction(dao); - }); - dao.getReturnParams().forEach(p -> { - p.setId(UUID.randomUUID().toString()); - p.setFunction(dao); - }); - session.persist(dao); - tx.commit(); - return functionInfo; - } catch(Exception e) { - LOGGER.error("Error adding function", e); - return null; + try { + String catalogName = createFunction.getCatalogName(); + String schemaName = createFunction.getSchemaName(); + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, + catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + if (getFunctionDAO(session, catalogName, schemaName, createFunction.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, "Function already exists: " + createFunction.getName()); + } + FunctionInfoDAO dao = FunctionInfoDAO.from(functionInfo); + dao.setSchemaId(schemaInfo.getId()); + dao.getInputParams().forEach(p -> { + p.setId(UUID.randomUUID().toString()); + p.setFunction(dao); + }); + dao.getReturnParams().forEach(p -> { + p.setId(UUID.randomUUID().toString()); + p.setFunction(dao); + }); + session.persist(dao); + tx.commit(); + return functionInfo; + } catch (Exception e) { + tx.rollback(); + throw e; + } } } public ListFunctionsResponse listFunctions(String catalogName, String schemaName, Optional maxResults, Optional nextPageToken) { ListFunctionsResponse response = new ListFunctionsResponse(); try (Session session = SESSION_FACTORY.openSession()) { - String queryString = "from FunctionInfoDAO f where f.catalogName = :catalogName and f.schemaName = :schemaName"; - Query query = session.createQuery(queryString, FunctionInfoDAO.class); - query.setParameter("catalogName", catalogName); - query.setParameter("schemaName", schemaName); - - maxResults.ifPresent(query::setMaxResults); - - if (nextPageToken.isPresent()) { - // Perform pagination logic here if needed - // Example: query.setFirstResult(startIndex); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName + "." + schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + String queryString = "from FunctionInfoDAO f where f.schemaId = :schemaId"; + Query query = session.createQuery(queryString, FunctionInfoDAO.class); + query.setParameter("schemaId", schemaInfo.getId()); + maxResults.ifPresent(query::setMaxResults); + if (nextPageToken.isPresent()) { + // Perform pagination logic here if needed + // Example: query.setFirstResult(startIndex); + } + List functions = query.list(); + response.setFunctions( + functions.stream().map(FunctionInfoDAO::toFunctionInfo) + .peek(f -> addNamespaceInfo(f, catalogName, schemaName)) + .collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; } - List functions = query.list(); - response.setFunctions(functions.stream().map(FunctionInfoDAO::toFunctionInfo).collect(Collectors.toList())); - return response; - } catch(Exception e) { - LOGGER.error("Error listing functions", e); - return null; } + return response; } public FunctionInfo getFunction(String name) { + FunctionInfo functionInfo = null; try (Session session = SESSION_FACTORY.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM FunctionInfoDAO WHERE fullName = :value", FunctionInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - FunctionInfoDAO functionInfoDAO = query.uniqueResult(); - return functionInfoDAO == null ? null : functionInfoDAO.toFunctionInfo(); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String[] parts = name.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid function name: " + name); + } + String catalogName = parts[0], schemaName = parts[1], functionName = parts[2]; + FunctionInfoDAO functionInfoDAO = getFunctionDAO(session, catalogName, schemaName, functionName); + if (functionInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } + functionInfo = functionInfoDAO.toFunctionInfo(); + addNamespaceInfo(functionInfo, catalogName, schemaName); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } } catch (Exception e) { LOGGER.error("Error getting function", e); return null; } + return functionInfo; + } + + public void addNamespaceInfo(FunctionInfo functionInfo, String catalogName, String schemaName) { + functionInfo.setCatalogName(catalogName); + functionInfo.setSchemaName(schemaName); + functionInfo.setFullName(catalogName + "." + schemaName + "." + functionInfo.getName()); + } + + public FunctionInfoDAO getFunctionDAO(Session session, String catalogName, String schemaName, String functionName) { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + Query query = session.createQuery( + "FROM FunctionInfoDAO WHERE name = :name and schemaId = :schemaId", FunctionInfoDAO.class); + query.setParameter("name", functionName); + query.setParameter("schemaId", schemaInfo.getId()); + query.setMaxResults(1); + return query.uniqueResult(); } public void deleteFunction(String name, Boolean force) { try (Session session = SESSION_FACTORY.openSession()) { Transaction tx = session.beginTransaction(); - Query query = session.createQuery("FROM FunctionInfoDAO WHERE fullName = :value", FunctionInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - FunctionInfoDAO functionInfoDAO = query.uniqueResult(); - if (functionInfoDAO != null) { + try { + String[] parts = name.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid function name: " + name); + } + String catalogName = parts[0], schemaName = parts[1], functionName = parts[2]; + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, + catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + FunctionInfoDAO functionInfoDAO = getFunctionDAO(session, catalogName, schemaName, functionName); + if (functionInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } session.remove(functionInfoDAO); tx.commit(); LOGGER.info("Deleted function: {}", functionInfoDAO.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } catch (Exception e) { + tx.rollback(); + throw e; } - } catch (Exception e) { - LOGGER.error("Error deleting function", e); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java b/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java deleted file mode 100644 index 3e6da606b..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java +++ /dev/null @@ -1,133 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CreateSchema; -import io.unitycatalog.server.model.ListSchemasResponse; -import io.unitycatalog.server.model.SchemaInfo; -import io.unitycatalog.server.persist.dao.SchemaInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.query.Query; -import io.unitycatalog.server.model.UpdateSchema; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; - -public class SchemaOperations { - @Getter - public static final SchemaOperations instance = new SchemaOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(SchemaOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - - private SchemaOperations() {} - - public SchemaInfo createSchema(CreateSchema createSchema) { - ValidationUtils.validateSqlObjectName(createSchema.getName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = new SchemaInfoDAO(); - schemaInfo.setSchemaId(UUID.randomUUID()); - schemaInfo.setName(createSchema.getName()); - schemaInfo.setCatalogName(createSchema.getCatalogName()); - schemaInfo.setComment(createSchema.getComment()); - //schemaInfo.setProperties(createSchema.getProperties()); - schemaInfo.setFullName(createSchema.getCatalogName() + "." + createSchema.getName()); - schemaInfo.setCreatedAt(new Date()); - schemaInfo.setUpdatedAt(null); - session.persist(schemaInfo); - tx.commit(); - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } catch (Exception e) { - LOGGER.error("Error creating schema", e); - return null; - } - } - - public ListSchemasResponse listSchemas(String catalogName, Optional maxResults, Optional pageToken) { - try (Session session = sessionFactory.openSession()) { - // TODO: Implement pagination and filtering if required - // For now, returning all schemas without pagination - session.beginTransaction(); - ListSchemasResponse response = new ListSchemasResponse(); - Query query = session.createQuery("FROM SchemaInfoDAO WHERE catalogName = :value", SchemaInfoDAO.class); - query.setParameter("value", catalogName); - response.setSchemas(query.list().stream().map(SchemaInfoDAO::toSchemaInfo).collect(Collectors.toList())); - return response; - } catch (Exception e) { - LOGGER.error("Error listing schemas", e); - return null; - } - } - - public SchemaInfo getSchema(String fullName) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - return getSchema(session, fullName); - } catch (Exception e) { - LOGGER.error("Error getting schema", e); - return null; - } - } - - public SchemaInfoDAO getSchemaInfoDAO(Session session, String fullName) { - Query query = session.createQuery("FROM SchemaInfoDAO WHERE fullName = :value", SchemaInfoDAO.class); - query.setParameter("value", fullName); - query.setMaxResults(1); - return query.uniqueResult(); - } - - public SchemaInfo getSchema(Session session, String fullName) { - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - if (schemaInfo == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); - } - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } - - public SchemaInfo updateSchema(String fullName, UpdateSchema updateSchema) { - ValidationUtils.validateSqlObjectName(updateSchema.getNewName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - - // Update the schema with new values - if (updateSchema.getComment() != null) { - schemaInfo.setComment(updateSchema.getComment()); - } - if (updateSchema.getNewName() != null) { - schemaInfo.setName(updateSchema.getNewName()); - schemaInfo.setFullName(schemaInfo.getCatalogName() + "." + updateSchema.getNewName()); - } - schemaInfo.setUpdatedAt(new Date()); - session.merge(schemaInfo); - tx.commit(); - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } catch (Exception e) { - LOGGER.error("Error updating schema", e); - return null; - } - } - - public void deleteSchema(String fullName) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - if (schemaInfo != null) { - session.remove(schemaInfo); - tx.commit(); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); - } - } catch (Exception e) { - LOGGER.error("Error deleting schema", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java b/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java new file mode 100644 index 000000000..9017b8722 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java @@ -0,0 +1,197 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.*; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.query.Query; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public class SchemaRepository { + @Getter + public static final SchemaRepository instance = new SchemaRepository(); + @Getter + public static final CatalogRepository catalogRepository = CatalogRepository.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + + private SchemaRepository() {} + + public SchemaInfo createSchema(CreateSchema createSchema) { + ValidationUtils.validateSqlObjectName(createSchema.getName()); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + if (getSchemaDAO(session, createSchema.getCatalogName(), createSchema.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Schema already exists: " + createSchema.getName()); + } + CatalogInfoDAO catalogDAO = catalogRepository + .getCatalogDAO(session, createSchema.getCatalogName()); + SchemaInfoDAO schemaInfo = new SchemaInfoDAO(); + schemaInfo.setId(UUID.randomUUID()); + schemaInfo.setName(createSchema.getName()); + schemaInfo.setCatalogId(catalogDAO.getId()); + schemaInfo.setComment(createSchema.getComment()); + schemaInfo.setCreatedAt(new Date()); + schemaInfo.setUpdatedAt(null); + session.persist(schemaInfo); + tx.commit(); + SchemaInfo toReturn = SchemaInfoDAO.toSchemaInfo(schemaInfo); + addNamespaceData(toReturn, createSchema.getCatalogName()); + return toReturn; + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + private void addNamespaceData(SchemaInfo schemaInfo, String catalogName) { + schemaInfo.setCatalogName(catalogName); + schemaInfo.setFullName(catalogName + "." + schemaInfo.getName()); + } + + private SchemaInfo convertFromDAO(SchemaInfoDAO schemaInfoDAO, String fullName) { + String catalogName = fullName.split("\\.")[0]; + SchemaInfo schemaInfo = SchemaInfoDAO.toSchemaInfo(schemaInfoDAO); + addNamespaceData(schemaInfo, catalogName); + return schemaInfo; + } + + public SchemaInfoDAO getSchemaDAO(Session session, UUID catalogId, String schemaName) { + Query query = session + .createQuery("FROM SchemaInfoDAO WHERE name = :name and catalogId = :catalogId", SchemaInfoDAO.class); + query.setParameter("name", schemaName); + query.setParameter("catalogId", catalogId); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public SchemaInfoDAO getSchemaDAO(Session session, String catalogName, String schemaName) { + CatalogInfoDAO catalog = catalogRepository.getCatalogDAO(session, catalogName); + if (catalog == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); + } + return getSchemaDAO(session, catalog.getId(), schemaName); + } + + public SchemaInfoDAO getSchemaDAO(Session session, String fullName) { + String[] namespace = fullName.split("\\."); + return getSchemaDAO(session, namespace[0], namespace[1]); + } + + public ListSchemasResponse listSchemas(String catalogName, Optional maxResults, + Optional pageToken) { + try (Session session = sessionFactory.openSession()) { + ListSchemasResponse response = new ListSchemasResponse(); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + // TODO: Implement pagination and filtering if required + // For now, returning all schemas without pagination + try { + CatalogInfoDAO catalog = catalogRepository.getCatalogDAO(session, catalogName); + if (catalog == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); + } + + Query query = session + .createQuery("FROM SchemaInfoDAO WHERE catalogId = :value", SchemaInfoDAO.class); + query.setParameter("value", catalog.getId()); + response.setSchemas(query.list().stream().map(SchemaInfoDAO::toSchemaInfo) + .peek(x -> addNamespaceData(x, catalogName)) + .collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + return response; + } + } + + public SchemaInfo getSchema(String fullName) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + SchemaInfoDAO schemaInfo = null; + try { + schemaInfo = getSchemaDAO(session, fullName); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); + } + return convertFromDAO(schemaInfo, fullName); + } + } + + public SchemaInfo updateSchema(String fullName, UpdateSchema updateSchema) { + ValidationUtils.validateSqlObjectName(updateSchema.getNewName()); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = getSchemaDAO(session, fullName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + fullName); + } + if (updateSchema.getNewName() != null) { + if (getSchemaDAO(session, fullName.split("\\.")[0], updateSchema + .getNewName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Schema already exists: " + updateSchema.getNewName()); + } + } + // Update the schema with new values + if (updateSchema.getComment() != null) { + schemaInfo.setComment(updateSchema.getComment()); + } + if (updateSchema.getNewName() != null) { + schemaInfo.setName(updateSchema.getNewName()); + } + schemaInfo.setUpdatedAt(new Date()); + session.merge(schemaInfo); + tx.commit(); + return convertFromDAO(schemaInfo, fullName); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteSchema(String fullName) { + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = getSchemaDAO(session, fullName); + if (schemaInfo != null) { + session.remove(schemaInfo); + tx.commit(); + } else { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + fullName); + } + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java b/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java index 8e86449f0..f76ed6606 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java +++ b/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java @@ -3,7 +3,9 @@ import io.unitycatalog.server.exception.BaseException; import io.unitycatalog.server.model.*; import io.unitycatalog.server.persist.converters.TableInfoConverter; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; import io.unitycatalog.server.persist.dao.PropertyDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; import io.unitycatalog.server.persist.dao.TableInfoDAO; import io.unitycatalog.server.utils.ValidationUtils; import lombok.Getter; @@ -22,49 +24,66 @@ public class TableRepository { private static final TableRepository instance = new TableRepository(); private static final Logger LOGGER = LoggerFactory.getLogger(TableRepository.class); private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - private static final CatalogOperations catalogOperations = CatalogOperations.getInstance(); - private static final SchemaOperations schemaOperations = SchemaOperations.getInstance(); + private static final CatalogRepository catalogOperations = CatalogRepository.getInstance(); + private static final SchemaRepository schemaOperations = SchemaRepository.getInstance(); private TableRepository() {} public TableInfo getTableById(String tableId) { LOGGER.debug("Getting table by id: " + tableId); try (Session session = sessionFactory.openSession()) { - TableInfoDAO tableInfoDAO = session.get(TableInfoDAO.class, UUID.fromString(tableId)); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + tableId); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + TableInfoDAO tableInfoDAO = session.get(TableInfoDAO.class, UUID.fromString(tableId)); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + tableId); + } + TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); + tx.commit(); + return tableInfo; + } catch (Exception e) { + tx.rollback(); + throw e; } - - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); - tableInfo.setProperties(TableInfoConverter - .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); - return tableInfo; } } public TableInfo getTable(String fullName) { LOGGER.debug("Getting table: " + fullName); + TableInfo tableInfo = null; try (Session session = sessionFactory.openSession()) { - String[] parts = fullName.split("\\."); - if (parts.length != 3) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid table name: " + fullName); - } - String catalogName = parts[0]; - String schemaName = parts[1]; - String tableName = parts[2]; - TableInfoDAO tableInfoDAO = findTable(session, catalogName, schemaName, tableName); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String[] parts = fullName.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid table name: " + fullName); + } + String catalogName = parts[0]; + String schemaName = parts[1]; + String tableName = parts[2]; + TableInfoDAO tableInfoDAO = findTable(session, catalogName, schemaName, tableName); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + } + tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + tableInfo.setProperties(TableInfoConverter + .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); + tableInfo.setCatalogName(catalogName); + tableInfo.setSchemaName(schemaName); + tx.commit(); + } catch (Exception e) { + if (tx != null && tx.getStatus().canRollback()) { + tx.rollback(); + } + throw e; } - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); - tableInfo.setProperties(TableInfoConverter - .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); - tableInfo.setCatalogName(catalogName); - tableInfo.setSchemaName(schemaName); - return tableInfo; } + return tableInfo; } private List findProperties(Session session, UUID tableId) { @@ -161,18 +180,11 @@ private String getTableFullName(TableInfo tableInfo) { } public String getSchemaId(Session session, String catalogName, String schemaName) { - - CatalogInfo catalogInfoDAO = catalogOperations.getCatalog(session, catalogName); - if (catalogInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); - } - SchemaInfo schemaInfo = schemaOperations.getSchema(session, catalogName - + "." + schemaName); + SchemaInfoDAO schemaInfo = schemaOperations.getSchemaDAO(session, catalogName, schemaName); if (schemaInfo == null) { throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); } - return schemaInfo.getSchemaId(); - + return schemaInfo.getId().toString(); } public static Date convertMillisToDate(String millisString) { @@ -206,37 +218,45 @@ public static String getNextPageToken(List tables) { * @return */ public ListTablesResponse listTables(String catalogName, - String schemaName, - Integer maxResults, - String nextPageToken, - Boolean omitProperties, - Boolean omitColumns) { + String schemaName, + Integer maxResults, + String nextPageToken, + Boolean omitProperties, + Boolean omitColumns) { List result = new ArrayList<>(); String returnNextPageToken = null; String hql = "FROM TableInfoDAO t WHERE t.schemaId = :schemaId and " + "(t.updatedAt < :pageToken OR :pageToken is null) order by t.updatedAt desc"; try (Session session = sessionFactory.openSession()) { - String schemaId = getSchemaId(session, catalogName, schemaName); - Query query = session.createQuery(hql, TableInfoDAO.class); - query.setParameter("schemaId", UUID.fromString(schemaId)); - query.setParameter("pageToken", convertMillisToDate(nextPageToken)); - query.setMaxResults(maxResults); - List tableInfoDAOList = query.list(); - - returnNextPageToken = getNextPageToken(tableInfoDAOList); - - for (TableInfoDAO tableInfoDAO : tableInfoDAOList) { - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - if (!omitColumns) { - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String schemaId = getSchemaId(session, catalogName, schemaName); + Query query = session.createQuery(hql, TableInfoDAO.class); + query.setParameter("schemaId", UUID.fromString(schemaId)); + query.setParameter("pageToken", convertMillisToDate(nextPageToken)); + query.setMaxResults(maxResults); + List tableInfoDAOList = query.list(); + returnNextPageToken = getNextPageToken(tableInfoDAOList); + for (TableInfoDAO tableInfoDAO : tableInfoDAOList) { + TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + if (!omitColumns) { + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + } + if (!omitProperties) { + tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap( + findProperties(session, tableInfoDAO.getId()))); + } + tableInfo.setCatalogName(catalogName); + tableInfo.setSchemaName(schemaName); + result.add(tableInfo); } - if (!omitProperties) { - tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap( - findProperties(session, tableInfoDAO.getId()))); + tx.commit(); + } catch (Exception e) { + if (tx != null && tx.getStatus().canRollback()) { + tx.rollback(); } - tableInfo.setCatalogName(catalogName); - tableInfo.setSchemaName(schemaName); - result.add(tableInfo); + throw e; } } return new ListTablesResponse().tables(result).nextPageToken(returnNextPageToken); @@ -252,22 +272,20 @@ public void deleteTable(String fullName) { String catalogName = parts[0]; String schemaName = parts[1]; String tableName = parts[2]; - - String schemaId = getSchemaId(session, catalogName, schemaName); - - TableInfoDAO tableInfoDAO = findBySchemaIdAndName(session, schemaId, tableName); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); - } Transaction tx = session.beginTransaction(); try { + String schemaId = getSchemaId(session, catalogName, schemaName); + TableInfoDAO tableInfoDAO = findBySchemaIdAndName(session, schemaId, tableName); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + } if (TableType.MANAGED.getValue().equals(tableInfoDAO.getType())) { - FileUtils.deleteDirectory(tableInfoDAO.getUrl()); + try { + FileUtils.deleteDirectory(tableInfoDAO.getUrl()); + } catch (Throwable e) { + LOGGER.error("Error deleting table directory: " + tableInfoDAO.getUrl()); + } } - } catch (Throwable e) { - LOGGER.error("Error deleting table directory: " + tableInfoDAO.getUrl()); - } - try { findProperties(session, tableInfoDAO.getId()).forEach(session::remove); session.remove(tableInfoDAO); tx.commit(); @@ -280,4 +298,4 @@ public void deleteTable(String fullName) { } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java b/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java deleted file mode 100644 index 5be826884..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java +++ /dev/null @@ -1,163 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.*; -import io.unitycatalog.server.persist.converters.VolumeInfoConverter; -import io.unitycatalog.server.persist.dao.VolumeInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.hibernate.query.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; - -public class VolumeOperations { - - @Getter - public static final VolumeOperations instance = new VolumeOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(VolumeOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - - private VolumeOperations() {} - - public VolumeInfo createVolume(CreateVolumeRequestContent createVolumeRequest) { - ValidationUtils.validateSqlObjectName(createVolumeRequest.getName()); - String volumeFullName = createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName() + "." + createVolumeRequest.getName(); - VolumeInfo volumeInfo = new VolumeInfo(); - volumeInfo.setVolumeId(UUID.randomUUID().toString()); - volumeInfo.setCatalogName(createVolumeRequest.getCatalogName()); - volumeInfo.setSchemaName(createVolumeRequest.getSchemaName()); - volumeInfo.setName(createVolumeRequest.getName()); - volumeInfo.setComment(createVolumeRequest.getComment()); - volumeInfo.setFullName(volumeFullName); - volumeInfo.setCreatedAt(System.currentTimeMillis()); - volumeInfo.setVolumeType(createVolumeRequest.getVolumeType()); - if (VolumeType.MANAGED.equals(createVolumeRequest.getVolumeType())) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Managed volume creation is not supported"); - } - if (createVolumeRequest.getStorageLocation() == null) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Storage location is required for external volume"); - } - volumeInfo.setStorageLocation(createVolumeRequest.getStorageLocation()); - VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - session.persist(volumeInfoDAO); - tx.commit(); - LOGGER.info("Added volume: {}", volumeInfo.getName()); - return VolumeInfoConverter.fromDAO(volumeInfoDAO); - } catch (Exception e) { - LOGGER.error("Error adding volume", e); - return null; - } - } - - public VolumeInfo getVolume(String fullName) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM VolumeInfoDAO WHERE fullName = :value", VolumeInfoDAO.class); - query.setParameter("value", fullName); - query.setMaxResults(1); - return VolumeInfoConverter.fromDAO(query.uniqueResult()); - } catch (Exception e) { - LOGGER.error("Error getting volume", e); - return null; - } - } - - public VolumeInfo getVolumeById(String volumeId) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM VolumeInfoDAO WHERE volumeId = :value", VolumeInfoDAO.class); - query.setParameter("value", volumeId); - query.setMaxResults(1); - return VolumeInfoConverter.fromDAO(query.uniqueResult()); - } catch (Exception e) { - throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + volumeId); - } - } - - public ListVolumesResponseContent listVolumes(String catalogName, String schemaName, Optional maxResults, Optional pageToken, Optional includeBrowse) { - ListVolumesResponseContent responseContent = new ListVolumesResponseContent(); - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - - String queryString = "from VolumeInfoDAO v where v.catalogName = :catalogName and v.schemaName = :schemaName"; - Query query = session.createQuery(queryString, VolumeInfoDAO.class); - query.setParameter("catalogName", catalogName); - query.setParameter("schemaName", schemaName); - - maxResults.ifPresent(query::setMaxResults); - - if (pageToken.isPresent()) { - // Perform pagination logic here if needed - // Example: query.setFirstResult(startIndex); - } - - responseContent.setVolumes(query.list().stream() - .map(VolumeInfoConverter::fromDAO).collect(Collectors.toList())); - return responseContent; - } catch (Exception e) { - LOGGER.error("Error listing volumes", e); - return null; - } - } - - public VolumeInfo updateVolume(String name, UpdateVolumeRequestContent updateVolumeRequest) { - ValidationUtils.validateSqlObjectName(updateVolumeRequest.getNewName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - VolumeInfoDAO volumeInfo = VolumeInfoConverter.toDAO(getVolume(name)); - if (volumeInfo != null) { - if (updateVolumeRequest.getNewName() != null) { - volumeInfo.setName(updateVolumeRequest.getNewName()); - String fullName = volumeInfo.getCatalogName() + "." + volumeInfo.getSchemaName() + "." + updateVolumeRequest.getNewName(); - volumeInfo.setFullName(fullName); - } - if (updateVolumeRequest.getComment() != null) { - volumeInfo.setComment(updateVolumeRequest.getComment()); - } - volumeInfo.setUpdatedAt(new Date()); - session.merge(volumeInfo); - tx.commit(); - LOGGER.info("Updated volume: {}", volumeInfo.getName()); - } - return VolumeInfoConverter.fromDAO(volumeInfo); - } catch (Exception e) { - LOGGER.error("Error updating volume", e); - return null; - } - } - - public void deleteVolume(String name) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - VolumeInfo volumeInfo = getVolume(name); - VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); - if (volumeInfoDAO != null) { - if (VolumeType.MANAGED.getValue().equals(volumeInfoDAO.getVolumeType())) { - try { - FileUtils.deleteDirectory(volumeInfo.getStorageLocation()); - } catch (Exception e) { - LOGGER.error("Error deleting volume directory", e); - } - } - session.remove(volumeInfoDAO); - tx.commit(); - LOGGER.info("Deleted volume: {}", volumeInfo.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); - } - } catch (Exception e) { - LOGGER.error("Error deleting volume", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java b/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java new file mode 100644 index 000000000..4edc2a6db --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java @@ -0,0 +1,229 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.*; +import io.unitycatalog.server.persist.converters.VolumeInfoConverter; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; +import io.unitycatalog.server.persist.dao.VolumeInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.hibernate.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public class VolumeRepository { + + @Getter + public static final VolumeRepository instance = new VolumeRepository(); + public static final SchemaRepository schemaRepository = SchemaRepository.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(VolumeRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + + private VolumeRepository() {} + + public VolumeInfo createVolume(CreateVolumeRequestContent createVolumeRequest) { + ValidationUtils.validateSqlObjectName(createVolumeRequest.getName()); + String volumeFullName = createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName() + "." + createVolumeRequest.getName(); + VolumeInfo volumeInfo = new VolumeInfo(); + volumeInfo.setVolumeId(UUID.randomUUID().toString()); + volumeInfo.setCatalogName(createVolumeRequest.getCatalogName()); + volumeInfo.setSchemaName(createVolumeRequest.getSchemaName()); + volumeInfo.setName(createVolumeRequest.getName()); + volumeInfo.setComment(createVolumeRequest.getComment()); + volumeInfo.setFullName(volumeFullName); + volumeInfo.setCreatedAt(System.currentTimeMillis()); + volumeInfo.setVolumeType(createVolumeRequest.getVolumeType()); + if (VolumeType.MANAGED.equals(createVolumeRequest.getVolumeType())) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Managed volume creation is not supported"); + } + if (createVolumeRequest.getStorageLocation() == null) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Storage location is required for external volume"); + } + volumeInfo.setStorageLocation(createVolumeRequest.getStorageLocation()); + VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfoDAO = schemaRepository.getSchemaDAO(session, + createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName()); + if (schemaInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName()); + } + if (getVolumeDAO(session, createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName(), createVolumeRequest.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, "Volume already exists: " + volumeFullName); + } + volumeInfoDAO.setSchemaId(schemaInfoDAO.getId()); + session.persist(volumeInfoDAO); + tx.commit(); + LOGGER.info("Added volume: {}", volumeInfo.getName()); + return convertFromDAO(volumeInfoDAO, createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName()); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public VolumeInfo getVolume(String fullName) { + try (Session session = sessionFactory.openSession()) { + String[] namespace = fullName.split("\\."); + if (namespace.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid volume name: " + fullName); + } + String catalogName = namespace[0]; + String schemaName = namespace[1]; + String volumeName = namespace[2]; + return convertFromDAO(getVolumeDAO(session, catalogName, schemaName, volumeName), + catalogName, schemaName); + } catch (Exception e) { + LOGGER.error("Error getting volume", e); + return null; + } + } + + public VolumeInfoDAO getVolumeDAO(Session session, String catalogName, String schemaName, String volumeName) { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + catalogName + "." + schemaName); + } + Query query = session.createQuery( + "FROM VolumeInfoDAO WHERE name = :name and schemaId = :schemaId", VolumeInfoDAO.class); + query.setParameter("name", volumeName); + query.setParameter("schemaId", schemaInfo.getId()); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public VolumeInfo getVolumeById(String volumeId) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + Query query = session.createQuery("FROM VolumeInfoDAO WHERE id = :value", VolumeInfoDAO.class); + query.setParameter("value", UUID.fromString(volumeId)); + query.setMaxResults(1); + VolumeInfoDAO volumeInfoDAO = query.uniqueResult(); + tx.commit(); + return VolumeInfoConverter.fromDAO(volumeInfoDAO); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public ListVolumesResponseContent listVolumes(String catalogName, String schemaName, Optional maxResults, Optional pageToken, Optional includeBrowse) { + ListVolumesResponseContent responseContent = new ListVolumesResponseContent(); + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + catalogName + "." + schemaName); + } + String queryString = "from VolumeInfoDAO v where v.schemaId = :schemaId"; + Query query = session.createQuery(queryString, VolumeInfoDAO.class); + query.setParameter("schemaId", schemaInfo.getId()); + maxResults.ifPresent(query::setMaxResults); + if (pageToken.isPresent()) { + // Perform pagination logic here if needed + // Example: query.setFirstResult(startIndex); + } + responseContent.setVolumes(query.list().stream() + .map(x -> convertFromDAO(x, catalogName, schemaName)) + .collect(Collectors.toList())); + tx.commit(); + return responseContent; + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + private VolumeInfo convertFromDAO(VolumeInfoDAO volumeInfoDAO, String catalogName, String schemaName) { + VolumeInfo volumeInfo = VolumeInfoConverter.fromDAO(volumeInfoDAO); + volumeInfo.setCatalogName(catalogName); + volumeInfo.setSchemaName(schemaName); + volumeInfo.setFullName(catalogName + "." + schemaName + "." + volumeInfo.getName()); + return volumeInfo; + } + + public VolumeInfo updateVolume(String name, UpdateVolumeRequestContent updateVolumeRequest) { + ValidationUtils.validateSqlObjectName(updateVolumeRequest.getNewName()); + String[] namespace =name.split("\\."); + String catalog = namespace[0], schema = namespace[1], volume = namespace[2]; + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + VolumeInfoDAO volumeInfo = getVolumeDAO(session, catalog, schema, volume); + if (volumeInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); + } + if (updateVolumeRequest.getNewName() != null) { + VolumeInfoDAO existingVolume = getVolumeDAO(session, catalog, schema, + updateVolumeRequest.getNewName()); + if (existingVolume != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Volume already exists: " + updateVolumeRequest.getNewName()); + } + } + if (updateVolumeRequest.getNewName() != null) { + volumeInfo.setName(updateVolumeRequest.getNewName()); + } + if (updateVolumeRequest.getComment() != null) { + volumeInfo.setComment(updateVolumeRequest.getComment()); + } + volumeInfo.setUpdatedAt(new Date()); + session.merge(volumeInfo); + tx.commit(); + LOGGER.info("Updated volume: {}", volumeInfo.getName()); + return convertFromDAO(volumeInfo, catalog, schema); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteVolume(String name) { + try (Session session = sessionFactory.openSession()) { + String[] namespace = name.split("\\."); + if (namespace.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid volume name: " + name); + } + String catalog = namespace[0], schema = namespace[1], volume = namespace[2]; + Transaction tx = session.beginTransaction(); + try { + VolumeInfoDAO volumeInfoDAO = getVolumeDAO(session, catalog, schema, volume); + if (volumeInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); + } + if (VolumeType.MANAGED.getValue().equals(volumeInfoDAO.getVolumeType())) { + try { + FileUtils.deleteDirectory(volumeInfoDAO.getStorageLocation()); + } catch (Exception e) { + LOGGER.error("Error deleting volume directory", e); + } + } + session.remove(volumeInfoDAO); + tx.commit(); + LOGGER.info("Deleted volume: {}", volumeInfoDAO.getName()); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java b/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java index d681f292b..b5a6532e2 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java +++ b/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java @@ -6,6 +6,7 @@ import io.unitycatalog.server.persist.dao.VolumeInfoDAO; import java.util.Date; +import java.util.UUID; public class VolumeInfoConverter { @@ -14,15 +15,12 @@ public static VolumeInfoDAO toDAO(VolumeInfo volumeInfo) { return null; } return VolumeInfoDAO.builder() - .volumeId(volumeInfo.getVolumeId()) + .id(UUID.fromString(volumeInfo.getVolumeId())) .name(volumeInfo.getName()) - .catalogName(volumeInfo.getCatalogName()) - .schemaName(volumeInfo.getSchemaName()) .comment(volumeInfo.getComment()) .storageLocation(volumeInfo.getStorageLocation()) .createdAt(volumeInfo.getCreatedAt() != null? new Date(volumeInfo.getCreatedAt()) : new Date()) .updatedAt(volumeInfo.getUpdatedAt() != null ? new Date(volumeInfo.getUpdatedAt()) : new Date()) - .fullName(volumeInfo.getFullName()) .volumeType(volumeInfo.getVolumeType().getValue()) .build(); } @@ -32,16 +30,13 @@ public static VolumeInfo fromDAO(VolumeInfoDAO dao) { return null; } return new VolumeInfo() - .volumeId(dao.getVolumeId()) + .volumeId(dao.getId().toString()) .name(dao.getName()) - .catalogName(dao.getCatalogName()) - .schemaName(dao.getSchemaName()) .comment(dao.getComment()) .storageLocation(FileUtils.convertRelativePathToURI(dao.getStorageLocation())) .createdAt(dao.getCreatedAt().getTime()) .updatedAt(dao.getUpdatedAt().getTime()) - .fullName(dao.getFullName()) .volumeType(VolumeType.valueOf(dao.getVolumeType())); } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java index 2a7ea9abb..192a36ca5 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java +++ b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java @@ -7,6 +7,7 @@ import io.unitycatalog.server.model.ColumnTypeName; import java.util.List; +import java.util.UUID; // Hibernate annotations @Entity @@ -20,17 +21,14 @@ @Builder public class FunctionInfoDAO { @Id - @Column(name = "function_id") - private String functionId; + @Column(name = "id", columnDefinition = "BINARY(16)") + private UUID id; @Column(name = "name", nullable = false) private String name; - @Column(name = "catalog_name", nullable = false) - private String catalogName; - - @Column(name = "schema_name", nullable = false) - private String schemaName; + @Column(name = "schema_id", columnDefinition = "BINARY(16)") + private UUID schemaId; @Column(name = "comment") private String comment; @@ -44,9 +42,6 @@ public class FunctionInfoDAO { @Column(name = "data_type") private ColumnTypeName dataType; - @Column(name = "full_name") - private String fullName; - @Column(name = "full_data_type") private String fullDataType; @@ -88,15 +83,12 @@ public class FunctionInfoDAO { public static FunctionInfoDAO from(FunctionInfo functionInfo) { FunctionInfoDAO functionInfoDAO = FunctionInfoDAO.builder() - .functionId(functionInfo.getFunctionId()) + .id(functionInfo.getFunctionId()!= null? UUID.fromString(functionInfo.getFunctionId()) : null) .name(functionInfo.getName()) - .catalogName(functionInfo.getCatalogName()) - .schemaName(functionInfo.getSchemaName()) .comment(functionInfo.getComment()) .createdAt(functionInfo.getCreatedAt()) .updatedAt(functionInfo.getUpdatedAt()) .dataType(functionInfo.getDataType()) - .fullName(functionInfo.getFullName()) .fullDataType(functionInfo.getFullDataType()) .externalLanguage(functionInfo.getExternalLanguage()) .isDeterministic(functionInfo.getIsDeterministic()) @@ -122,15 +114,12 @@ public static FunctionInfoDAO from(FunctionInfo functionInfo) { public FunctionInfo toFunctionInfo() { FunctionInfo functionInfo = new FunctionInfo() - .functionId(functionId) + .functionId(id.toString()) .name(name) - .catalogName(catalogName) - .schemaName(schemaName) .comment(comment) .createdAt(createdAt) .updatedAt(updatedAt) .dataType(dataType) - .fullName(fullName) .fullDataType(fullDataType) .externalLanguage(externalLanguage) .isDeterministic(isDeterministic) @@ -147,4 +136,4 @@ public FunctionInfo toFunctionInfo() { functionInfo.returnParams(FunctionParameterInfoDAO.toFunctionParameterInfos(returnParams)); return functionInfo; } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java index 8846e24fc..e4894a475 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java +++ b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java @@ -28,7 +28,7 @@ public enum InputOrReturnEnum { private String id; @ManyToOne - @JoinColumn(name = "function_id") + @JoinColumn(name = "function_id", referencedColumnName = "id") private FunctionInfoDAO function; // Whether the parameter is an input or return parameter @@ -119,4 +119,4 @@ public static FunctionParameterInfos toFunctionParameterInfos(List includeBrowse) { - VolumeInfo volumeInfo = volumeOperations.getVolume(fullName); - if (volumeInfo != null) { - return HttpResponse.ofJson(volumeInfo); - } - return ValidationUtils.entityNotFoundResponse(ValidationUtils.VOLUME, fullName); + return HttpResponse.ofJson(volumeOperations.getVolume(fullName)); } @Patch("/{full_name}") public HttpResponse updateVolume(@Param("full_name") String fullName, UpdateVolumeRequestContent updateVolumeRequest) { - VolumeInfo updatedVolume = volumeOperations.updateVolume(fullName, updateVolumeRequest); - if (updatedVolume != null) { - return HttpResponse.ofJson(updatedVolume); - } - return ValidationUtils.entityNotFoundResponse(ValidationUtils.VOLUME, fullName); + return HttpResponse.ofJson(volumeOperations.updateVolume(fullName, updateVolumeRequest)); } @Delete("/{full_name}") diff --git a/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java b/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java index e6a964241..a5e4786f8 100644 --- a/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java +++ b/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java @@ -1,62 +1,22 @@ package io.unitycatalog.server.utils; import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.persist.FunctionRepository; -import io.unitycatalog.server.persist.SchemaOperations; +import io.unitycatalog.server.persist.*; import com.linecorp.armeria.common.HttpResponse; -import io.unitycatalog.server.persist.CatalogOperations; -import io.unitycatalog.server.persist.TableRepository; -import io.unitycatalog.server.persist.VolumeOperations; +import io.unitycatalog.server.persist.VolumeRepository; import io.unitycatalog.server.exception.ErrorCode; -import java.util.Optional; import java.util.regex.Pattern; public class ValidationUtils { public static final String CATALOG = "Catalog"; public static final String SCHEMA = "Schema"; - public static final String VOLUME = "Volume"; public static final String FUNCTION = "Function"; - private static final CatalogOperations CATALOG_OPERATIONS = CatalogOperations.getInstance(); - private static final SchemaOperations SCHEMA_OPERATIONS = SchemaOperations.getInstance(); - private static final VolumeOperations VOLUME_OPERATIONS = VolumeOperations.getInstance(); - private static final FunctionRepository FUNCTION_REPOSITORY = FunctionRepository.getInstance(); - private static final TableRepository TABLE_REPOSITORY = TableRepository.getInstance(); // Regex to reject names containing a period, space, forward-slash, C0 + DEL control characters private static final Pattern INVALID_FORMAT = Pattern.compile("[\\.\\ \\/\\x00-\\x1F\\x7F]"); private static final Integer MAX_NAME_LENGTH = 255; - public static HttpResponse entityNotFoundResponse(String entity, String... message) { - throw new BaseException(ErrorCode.NOT_FOUND, entity + " not found: " + String.join(".", message)); - } - - public static HttpResponse entityAlreadyExistsResponse(String entity, String... message) { - throw new BaseException(ErrorCode.ALREADY_EXISTS, entity + " already exists: " + String.join(".", message)); - } - - public static boolean catalogExists(String catalogName) { - return CATALOG_OPERATIONS.getCatalog(catalogName) != null; - } - - public static boolean schemaExists(String catalogName, String schemaName) { - if (!catalogExists(catalogName)) - return false; - return SCHEMA_OPERATIONS.getSchema(catalogName + "." + schemaName) != null; - } - - public static boolean volumeExists(String catalogName, String schemaName, String volumeName) { - if (!schemaExists(catalogName, schemaName)) - return false; - return VOLUME_OPERATIONS.getVolume(catalogName + "." + schemaName + "." + volumeName) != null; - } - - public static boolean functionExists(String catalogName, String schemaName, String functionName) { - if (!schemaExists(catalogName, schemaName)) - return false; - return FUNCTION_REPOSITORY.getFunction(catalogName + "." + schemaName + "." + functionName) != null; - } - public static void validateSqlObjectName(String name) { if (name == null || name.isEmpty()) { throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Name cannot be empty"); @@ -68,4 +28,4 @@ public static void validateSqlObjectName(String name) { throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Name cannot contain a period, space, forward-slash, or control characters"); } } -} +} \ No newline at end of file diff --git a/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java index d4cbab4f3..2d790f340 100644 --- a/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java @@ -18,6 +18,10 @@ protected void cleanUp() { if (catalogOperations.getCatalog(CATALOG_NAME) != null) { catalogOperations.deleteCatalog(CATALOG_NAME); } + } catch (Exception e) { + // Ignore + } + try { if (catalogOperations.getCatalog(CATALOG_NEW_NAME) != null) { catalogOperations.deleteCatalog(CATALOG_NEW_NAME); } diff --git a/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java index d10ed8767..caf43d055 100644 --- a/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java @@ -3,6 +3,7 @@ import io.unitycatalog.client.ApiException; import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; +import io.unitycatalog.server.utils.TestUtils; import org.junit.*; import io.unitycatalog.server.base.ServerConfig; @@ -42,6 +43,20 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); } @@ -105,9 +120,17 @@ public void testFunctionCRUD() throws ApiException { FunctionInfo retrievedFunctionInfo = functionOperations.getFunction(FUNCTION_FULL_NAME); assertEquals(functionInfo, retrievedFunctionInfo); + // now update the parent catalog + catalogOperations.updateCatalog(CATALOG_NAME, CATALOG_NEW_NAME, ""); + // get the function again + FunctionInfo retrievedFunctionInfoAfterCatUpdate = functionOperations.getFunction( + CATALOG_NEW_NAME + "." + SCHEMA_NAME + "." + FUNCTION_NAME); + assertEquals(retrievedFunctionInfo.getFunctionId(), + retrievedFunctionInfoAfterCatUpdate.getFunctionId()); + // Delete function - functionOperations.deleteFunction(FUNCTION_FULL_NAME, true); - assertFalse(contains(functionOperations.listFunctions(CATALOG_NAME, SCHEMA_NAME), + functionOperations.deleteFunction(CATALOG_NEW_NAME + "." + SCHEMA_NAME + "." + FUNCTION_NAME, true); + assertFalse(contains(functionOperations.listFunctions(CATALOG_NEW_NAME, SCHEMA_NAME), functionInfo, f -> f.getFunctionId().equals(functionInfo.getFunctionId()))); } } diff --git a/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java index a53526007..3b1535621 100644 --- a/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java @@ -1,10 +1,7 @@ package io.unitycatalog.server.base.schema; import io.unitycatalog.client.ApiException; -import io.unitycatalog.client.model.CreateCatalog; -import io.unitycatalog.client.model.CreateSchema; -import io.unitycatalog.client.model.SchemaInfo; -import io.unitycatalog.client.model.UpdateSchema; +import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; import io.unitycatalog.server.base.ServerConfig; import io.unitycatalog.server.utils.TestUtils; @@ -38,7 +35,22 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); + } protected void createCommonResources() throws ApiException { @@ -84,10 +96,16 @@ public void testSchemaCRUDL() throws ApiException { Assert.assertEquals(TestUtils.SCHEMA_NEW_FULL_NAME, updatedSchemaInfo.getFullName()); assertNotNull(updatedSchemaInfo.getUpdatedAt()); + //Now update the parent catalog name + catalogOperations.updateCatalog(TestUtils.CATALOG_NAME, TestUtils.CATALOG_NEW_NAME, ""); + SchemaInfo updatedSchemaInfo2 = schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + + "." + TestUtils.SCHEMA_NEW_NAME); + assertEquals(retrievedSchemaInfo.getSchemaId(), updatedSchemaInfo2.getSchemaId()); + // Delete schema System.out.println("Testing delete schema.."); - schemaOperations.deleteSchema(TestUtils.SCHEMA_NEW_FULL_NAME); - assertFalse(TestUtils.contains(schemaOperations.listSchemas(TestUtils.CATALOG_NAME), updatedSchemaInfo, (schema) -> + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + assertFalse(TestUtils.contains(schemaOperations.listSchemas(TestUtils.CATALOG_NEW_NAME), updatedSchemaInfo, (schema) -> schema.getName().equals(TestUtils.SCHEMA_NEW_NAME))); } } diff --git a/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java index d5bbf9c1f..031eab4c0 100644 --- a/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java @@ -54,6 +54,21 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } + super.cleanUp(); } @@ -205,11 +220,18 @@ public void testTableCRUD() throws IOException, ApiException { assertNotNull(managedListTable.getCreatedAt()); assertNotNull(managedListTable.getTableId()); + // Now update the parent schema name + schemaOperations.updateSchema(TestUtils.SCHEMA_FULL_NAME, new UpdateSchema().newName(TestUtils.SCHEMA_NEW_NAME).comment(TestUtils.SCHEMA_COMMENT)); + // now fetch the table again + TableInfo managedTableAfterSchemaUpdate = tableOperations.getTable(TestUtils.CATALOG_NAME + "." + TestUtils.SCHEMA_NEW_NAME + "." + TABLE_NAME); + assertEquals(managedTable.getTableId(), managedTableAfterSchemaUpdate.getTableId()); + // Delete managed table + String newTableFullName = TestUtils.CATALOG_NAME + "." + TestUtils.SCHEMA_NEW_NAME + "." + TABLE_NAME; System.out.println("Testing delete table.."); - tableOperations.deleteTable(TABLE_FULL_NAME); - assertThrows(Exception.class, () -> tableOperations.getTable(TABLE_FULL_NAME)); + tableOperations.deleteTable(newTableFullName); + assertThrows(Exception.class, () -> tableOperations.getTable(newTableFullName)); } } \ No newline at end of file diff --git a/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java index b3a327307..255f09ed4 100644 --- a/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java @@ -6,6 +6,7 @@ import io.unitycatalog.server.persist.FileUtils; import io.unitycatalog.server.persist.HibernateUtil; import io.unitycatalog.server.persist.dao.VolumeInfoDAO; +import io.unitycatalog.server.utils.TestUtils; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.junit.*; @@ -56,13 +57,29 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); } + private SchemaInfo schemaInfo; + protected void createCommonResources() throws ApiException { // Common setup operations such as creating a catalog and schema catalogOperations.createCatalog(CATALOG_NAME, "Common catalog for volumes"); - schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); + schemaInfo = schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); } @Test @@ -126,15 +143,13 @@ public void testVolumeCRUD() throws ApiException { try (Session session = sessionFactory.openSession()) { session.beginTransaction(); VolumeInfoDAO managedVolume = VolumeInfoDAO.builder() - .catalogName(CATALOG_NAME) - .schemaName(SCHEMA_NAME) .volumeType(VolumeType.MANAGED.getValue()) .storageLocation("/tmp/managed_volume") .name(VOLUME_NAME) - .fullName(VOLUME_FULL_NAME) .createdAt(new Date()) .updatedAt(new Date()) - .volumeId(UUID.randomUUID().toString()) + .id(UUID.randomUUID()) + .schemaId(UUID.fromString(schemaInfo.getSchemaId())) .build(); session.persist(managedVolume); session.getTransaction().commit(); @@ -159,9 +174,17 @@ public void testVolumeCRUD() throws ApiException { return volume.getName().equals(VOLUME_NAME); })); + //NOW Update the schema name + schemaOperations.updateSchema(SCHEMA_FULL_NAME, + new UpdateSchema().newName(SCHEMA_NEW_NAME).comment(SCHEMA_COMMENT)); + // get volume + VolumeInfo volumePostSchemaNameChange = volumeOperations.getVolume + (CATALOG_NAME + "." + SCHEMA_NEW_NAME + "." + VOLUME_NAME); + assertEquals(volumePostSchemaNameChange.getVolumeId(), managedVolumeInfo.getVolumeId()); + // Delete volume System.out.println("Testing delete volume.."); - volumeOperations.deleteVolume(VOLUME_FULL_NAME); - assertEquals(0, getSize(volumeOperations.listVolumes(CATALOG_NAME, SCHEMA_NAME))); + volumeOperations.deleteVolume(CATALOG_NAME + "." + SCHEMA_NEW_NAME + "." + VOLUME_NAME); + assertEquals(0, getSize(volumeOperations.listVolumes(CATALOG_NAME, SCHEMA_NEW_NAME))); } } \ No newline at end of file