From fcfd0eaf470f2518abd163911a1626a6413133c4 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 14 Jun 2024 10:59:51 -0700 Subject: [PATCH 01/14] docs: Add syntax highlighting (#5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First of all, it's great to see this being open-sourced 🙌 While going over the docs, I noticed some of the syntax highlighting was missing. --- README.md | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 1fdfe8fee..fd43a8628 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ You have to ensure that you local environment has the following: ### Run the UC Server In a terminal, in the cloned repository root directory, start the UC server. -``` +```sh bin/start-uc-server ``` @@ -38,15 +38,15 @@ For the rest of the steps, continue in a different terminal. ### Operate on Delta tables with the CLI Let's list the tables. -``` +```sh bin/uc table list --catalog unity --schema default ``` You should see a few tables. Some details are truncated because of the nested nature of the data. To see all the content, you can add `--format jsonPretty` to any command. -Next, let's get the metadata of one those tables. +Next, let's get the metadata of one of those tables. -``` +```sh bin/uc table get --full_name unity.default.numbers ``` @@ -54,7 +54,7 @@ You can see that it is a Delta table. Now, specifically for Delta tables, this C print snippet of the contents of a Delta table (powered by the [Delta Kernel Java](https://delta.io/blog/delta-kernel/) project). Let's try that. -``` +```sh bin/uc table read --full_name unity.default.numbers ``` @@ -63,7 +63,7 @@ bin/uc table read --full_name unity.default.numbers For trying with DuckDB, you will have to [install it](https://duckdb.org/docs/installation/) (at least version 1.0). Let's start DuckDB and install a couple of extensions. To start DuckDB, run the command `duckdb` in the terminal. Then, in the DuckDB shell, run the following commands: -```sh +```sql install uc_catalog from core_nightly; load uc_catalog; install delta; @@ -72,8 +72,8 @@ load delta; If you have installed these extensions before, you may have to run `update extensions` and restart DuckDB for the following steps to work. -Now that we have DuckDB all set up, let's trying connecting to UC by specifying a secret. -```sh +Now that we have DuckDB all set up, let's try connecting to UC by specifying a secret. +```sql CREATE SECRET ( TYPE UC, TOKEN 'not-used', @@ -82,7 +82,7 @@ CREATE SECRET ( ); ``` You should see it print a short table saying `Success` = `true`. Then we attach the `unity` catalog to DuckDB. -```sh +```sql ATTACH 'unity' AS unity (TYPE UC_CATALOG); ``` Now we ready to query. Try the following @@ -106,19 +106,19 @@ See the full [tutorial](docs/tutorial.md) for more details. - Compatibility and stability: The APIs are currently evolving and should not be assumed to be stable. ## Compiling and testing -- Install JDK 11 by whatever mechanism that is appropriate for your system, and +- Install JDK 11 by whatever mechanism is appropriate for your system, and set that version to be the default Java version (e.g., by setting env variable JAVA_HOME) - To compile all the code without running tests, run the following: - ``` + ```sh build/sbt clean compile ``` - To compile and execute tests, run the following: - ``` + ```sh build/sbt clean test ``` - To update the API specification, just update the `api/all.yaml` and then run the following: - ``` + ```sh build/sbt generate ``` This will regenerate the OpenAPI data models in the UC server and data models + APIs in the client SDK. From 5b11c279e1357a1d9984764f62325f54d1ff770a Mon Sep 17 00:00:00 2001 From: Florian Maas Date: Fri, 14 Jun 2024 20:01:40 +0200 Subject: [PATCH 02/14] Add issue templates (#8) Add some initial [issue templates](https://docs.github.com/en/communities/using-templates-to-encourage-useful-issues-and-pull-requests/configuring-issue-templates-for-your-repository). image --- .github/ISSUE_TEMPLATE/bug_report.md | 29 +++++++++++++++++++++++ .github/ISSUE_TEMPLATE/feature_request.md | 17 +++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 000000000..8bb20115a --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,29 @@ +--- +name: Bug report +about: Create a bug report to help us improve +labels: "bug" +--- + +**Describe the bug** + + + +**To Reproduce** + +Steps to reproduce the behavior: + +1. ... +2. ... +3. ... + +**Expected behavior** + + + +**System [please complete the following information]:** + +- OS: e.g. [Ubuntu 18.04] + +**Additional context** + + diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 000000000..5065058d6 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,17 @@ +--- +name: Feature request +about: Suggest a new feature +labels: "enhancement" +--- + +**Is your feature request related to a problem? Please describe.** + + + +**Describe the solution you would like** + + + +**Additional context** + + From aef5bef9be8d1dc9f9b75f8e7ddcff1f58bcb426 Mon Sep 17 00:00:00 2001 From: sullis Date: Fri, 14 Jun 2024 11:02:02 -0700 Subject: [PATCH 03/14] git ignore target and .idea (#13) --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index f64e0dc9b..19e8a1057 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ etc/db/h2db.mv.db +target/ +.idea/ From cc053a51df615b283840a7d89f114ef46560ab3c Mon Sep 17 00:00:00 2001 From: hsm207 Date: Fri, 14 Jun 2024 20:02:23 +0200 Subject: [PATCH 04/14] docs: fix typo (#23) --- docs/tutorial.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tutorial.md b/docs/tutorial.md index 87b24b0d6..44e25e3ce 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -91,7 +91,7 @@ load delta; If you have installed these extensions before, you may have to run `update extensions` and restart DuckDB for the following steps to work. -Now that we have DuckDB all set up, let's trying connecting to UC by specifying a secret. +Now that we have DuckDB all set up, let's try connecting to UC by specifying a secret. ```sh CREATE SECRET ( TYPE UC, From d59f616127dc5dee92c431d988e776abb11646c4 Mon Sep 17 00:00:00 2001 From: Huanming Fang Date: Fri, 14 Jun 2024 11:03:08 -0700 Subject: [PATCH 05/14] Change the cmd flag from --format to --output (#14) Change the cmd flag from --format to --output. Otherwise following error will raise: ``` $ bin/uc table list --catalog unity --schema default --format jsonPretty Some of the provided parameters are not valid. Usage: bin/uc table list [options] Required Params: --catalog The name of the catalog. --schema The name of the schema. Optional Params: --max_results The maximum number of results to return. ``` --- README.md | 2 +- docs/tutorial.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index fd43a8628..3fba6d1cf 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Let's list the tables. bin/uc table list --catalog unity --schema default ``` You should see a few tables. Some details are truncated because of the nested nature of the data. -To see all the content, you can add `--format jsonPretty` to any command. +To see all the content, you can add `--output jsonPretty` to any command. Next, let's get the metadata of one of those tables. diff --git a/docs/tutorial.md b/docs/tutorial.md index 44e25e3ce..e399c9a34 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -45,7 +45,7 @@ Let's list the tables. bin/uc table list --catalog unity --schema default ``` You should see a few tables. Some details are truncated because of the nested nature of the data. -To see all the content, you can add `--format jsonPretty` to any command. +To see all the content, you can add `--output jsonPretty` to any command. Next, let's get the metadata of one those tables. @@ -228,4 +228,4 @@ SELECT * FROM iceberg."unity.default".marksheet_uniform ## APIs and Compatibility - Open API specification: The Unity Catalog Rest API is documented [here](../api). -- Compatibility and stability: The APIs are currently evolving and should not be assumed to be stable. \ No newline at end of file +- Compatibility and stability: The APIs are currently evolving and should not be assumed to be stable. From 34be7dd07224eb560117f2b6638f2a0df3a6964c Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Fri, 14 Jun 2024 14:03:51 -0400 Subject: [PATCH 06/14] Update README.md to provide more info about JDKs (#11) Helping people save time instead of trying to figure out what that unreadable message of SBT means when you have JDK 21 enabled :) --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 3fba6d1cf..7a8797fe6 100644 --- a/README.md +++ b/README.md @@ -122,3 +122,8 @@ See the full [tutorial](docs/tutorial.md) for more details. build/sbt generate ``` This will regenerate the OpenAPI data models in the UC server and data models + APIs in the client SDK. + +### Using more recent JDKs + +The build script [checks for a lower bound on the JDK](./build.sbt#L14) but the [current SBT version](./project/build.properties) +imposes an upper bound. Please check the [JDK compatibility](https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html) documentation for more information From 29c83127074a1c7c33ebd96ad786572fc25f94c9 Mon Sep 17 00:00:00 2001 From: Florian Maas Date: Fri, 14 Jun 2024 20:04:13 +0200 Subject: [PATCH 07/14] Add a PR template (#7) Probably useful to have a PR template, similar to [here](https://github.com/fpgmaas/deptry/blob/main/.github/pull_request_template.md) image --- .github/pull_request_template.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .github/pull_request_template.md diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 000000000..4a1217df6 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,10 @@ +**PR Checklist** + +- [ ] A description of the changes is added to the description of this PR. +- [ ] If there is a related issue, make sure it is linked to this PR. +- [ ] If you've fixed a bug or added code that should be tested, add tests! +- [ ] If you've added or modified a feature, documentation in `docs` is updated + +**Description of changes** + + From ca88b1bfb8a52454960c5e219603c476d161021b Mon Sep 17 00:00:00 2001 From: Florian Maas Date: Fri, 14 Jun 2024 20:07:17 +0200 Subject: [PATCH 08/14] Remove outdated license in `copybara.py` file (#6) Since the repository now falls under the Apache 2.0 license, this can probably be removed. --- dev/copybara.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/dev/copybara.py b/dev/copybara.py index 380a9e70a..f430c8b43 100644 --- a/dev/copybara.py +++ b/dev/copybara.py @@ -1,22 +1,3 @@ -# -# DATABRICKS CONFIDENTIAL & PROPRIETARY -# __________________ -# -# Copyright 2019 Databricks, Inc. -# All Rights Reserved. -# -# NOTICE: All information contained herein is, and remains the property of Databricks, Inc. -# and its suppliers, if any. The intellectual and technical concepts contained herein are -# proprietary to Databricks, Inc. and its suppliers and may be covered by U.S. and foreign Patents, -# patents in process, and are protected by trade secret and/or copyright law. Dissemination, use, -# or reproduction of this information is strictly forbidden unless prior written permission is -# obtained from Databricks, Inc. -# -# If you view or obtain a copy of this information and believe Databricks, Inc. may not have -# intended it to be made available, please promptly report it to Databricks Legal Department -# @ legal@databricks.com. -# - import os import subprocess import sys From 49bf1a57e14559aa02857308a3bb358354766c43 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 14 Jun 2024 14:23:05 -0400 Subject: [PATCH 09/14] removed unnecessary file (#27) --- dev/copybara.py | 154 ------------------------------------------------ 1 file changed, 154 deletions(-) delete mode 100644 dev/copybara.py diff --git a/dev/copybara.py b/dev/copybara.py deleted file mode 100644 index f430c8b43..000000000 --- a/dev/copybara.py +++ /dev/null @@ -1,154 +0,0 @@ -import os -import subprocess -import sys -import random -import string -import tempfile - -def run_copybara( - origin_url, - origin_ref, - target_git_repo, - target_branch, - config_file, - workflow, - auto_commit -): - copybara_command = download_copybara(os.path.dirname(os.path.dirname(config_file))) - # - # ["java", "-Xmx6g", "-jar", "/Users/tdas/Projects/copybara/bazel-bin/java/com/google/copybara/copybara_deploy.jar"] - - copybara_args = ["migrate", config_file, workflow, "--force", "--init-history", - "--ignore-noop"] - locations_file = os.path.join(os.path.dirname(config_file), ".locations.bara.sky") - locations_file_content = """ -originUrl="{origin_url}" -originRef="{origin_ref}" -destinationUrl="{destination_url}" -destinationRef="{destination_ref}" -copybaraMode="{migration_mode}" -""".format( - origin_url=origin_url, - origin_ref=origin_ref, - destination_url=target_git_repo, - destination_ref=target_branch, - migration_mode="SQUASH") - - with WorkingDirectory(os.path.dirname(config_file)): - try: - with open(locations_file, "w") as location_file: - print("Writing Location File %s:\n%s" % (locations_file, locations_file_content)) - location_file.write(locations_file_content) - - if not os.path.exists(locations_file): - raise Exception("Failed to write Locations File %s" % locations_file) - - # If we are using the current HEAD, check if there are uncommitted changes - if origin_ref == "HEAD": - (exit_code, _, _) = run_cmd(["git", "diff-index", "--quiet", "HEAD", "--"], - throw_on_error=False) - has_uncommitted_changes = exit_code != 0 - if has_uncommitted_changes: - if auto_commit: - print("Committing uncommitted Changes") - run_cmd(["git", "commit", "-am", "Copybara Auto Commit"], - stream_output=True) - else: - print("================ WARNING ================") - print("Using HEAD as the origin commit will not sync uncommitted changes") - print("Use --auto-commit to automatically commit changes") - print("=========================================") - print("Running copybara command: %s" % str(copybara_command + copybara_args)) - run_cmd(copybara_command + copybara_args, stream_output=True) - finally: - if os.path.exists(locations_file): - os.remove(locations_file) - - -def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): - """Runs a command as a child process. - - A convenience wrapper for running a command from a Python script. - Keyword arguments: - cmd -- the command to run, as a list of strings - throw_on_error -- if true, raises an Exception if the exit code of the program is nonzero - env -- additional environment variables to be defined when running the child process - stream_output -- if true, does not capture standard output and error; if false, captures these - streams and returns them - - Note on the return value: If stream_output is true, then only the exit code is returned. If - stream_output is false, then a tuple of the exit code, standard output and standard error is - returned. - """ - cmd_env = os.environ.copy() - if env: - cmd_env.update(env) - - if stream_output: - # Flush buffered output before running the command so that the output of the command will - # show up after the current buffered output - sys.stdout.flush() - sys.stderr.flush() - child = subprocess.Popen(cmd, env=cmd_env, **kwargs) - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception("Non-zero exitcode: %s" % (exit_code)) - return exit_code - else: - child = subprocess.Popen( - cmd, - env=cmd_env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs) - (stdout, stderr) = child.communicate() - exit_code = child.wait() - if throw_on_error and exit_code != 0: - raise Exception( - "Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" % - (exit_code, stdout, stderr)) - return (exit_code, stdout, stderr) - - -def download_copybara(download_dir): - download_path = os.path.join(download_dir, "copybara_deploy.jar") - if not os.path.exists(download_path): - # Download copybara source from https://github.com/tdas/copybara/tree/updated_by_td - # and build it. - print("Downloading and building copybara at %s" % download_path) - download_script = os.path.join(download_dir, "build_copybara.sh") - run_cmd(["sh", "-c", download_script]) - if not os.path.exists(download_path): - print("Run %s manually and ensure copybara JAR is present at %s" % - (download_script, download_path)) - - copybara_cmd = ["java", "-Xmx10g", "-jar", download_path] - return copybara_cmd - - -def create_random_branch_name(): - return ''.join( - random.choice(string.ascii_lowercase + string.digits) - for _ in range(8)) - - -def initialize_empty_git_repo(): - git_repo = tempfile.mkdtemp() - with WorkingDirectory(git_repo): - run_cmd(["git", "init"]) - branch_name = create_random_branch_name() - - return git_repo, branch_name - - -# pylint: disable=too-few-public-methods -class WorkingDirectory(object): - def __init__(self, working_directory): - self.working_directory = working_directory - self.old_workdir = os.getcwd() - - def __enter__(self): - os.chdir(self.working_directory) - - def __exit__(self, tpe, value, traceback): - os.chdir(self.old_workdir) From 5c91eaf294350ea6eb90efa444cc2aa394b6e115 Mon Sep 17 00:00:00 2001 From: Ravi Vijay Date: Fri, 14 Jun 2024 11:47:29 -0700 Subject: [PATCH 10/14] Remove Upload Dependency Graph Step (#29) Remove Failing Step in workflow --- .github/workflows/sbt-tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/sbt-tests.yml b/.github/workflows/sbt-tests.yml index 26ff45d9f..4474091e0 100644 --- a/.github/workflows/sbt-tests.yml +++ b/.github/workflows/sbt-tests.yml @@ -29,6 +29,3 @@ jobs: cache: 'sbt' - name: Run tests run: sbt test - # Optional: This step uploads information to the GitHub dependency graph and unblocking Dependabot alerts for the repository - - name: Upload dependency graph - uses: scalacenter/sbt-dependency-submission@ab086b50c947c9774b70f39fc7f6e20ca2706c91 From c4e35966ed121f1a34ba317627d39605a0df0781 Mon Sep 17 00:00:00 2001 From: sullis Date: Fri, 14 Jun 2024 13:00:53 -0700 Subject: [PATCH 11/14] Upgrade to sbt 1.10.0 (#12) https://www.scala-sbt.org --- project/build.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index 3b06b0f4f..f716c53e0 100644 --- a/project/build.properties +++ b/project/build.properties @@ -33,4 +33,4 @@ # limitations under the License. # -sbt.version=1.6.1 +sbt.version=1.10.0 From d0098b55eb0da2122ef95cb78f96b8f3182b669d Mon Sep 17 00:00:00 2001 From: sullis Date: Fri, 14 Jun 2024 16:36:10 -0700 Subject: [PATCH 12/14] git ignore .bsp (#30) **Description of changes** update .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 19e8a1057..8c5351828 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ etc/db/h2db.mv.db target/ .idea/ +.bsp/ From 35daabc2d2d8035d62590fcc725908ccbd9806a2 Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Sat, 15 Jun 2024 07:36:52 +0800 Subject: [PATCH 13/14] Fix some typos in docs (#34) Fix some typos in docs. --- CODE_OF_CONDUCT.md | 2 +- NOTICE | 2 +- README.md | 4 ++-- docs/cli.md | 2 +- docs/server.md | 2 +- docs/tutorial.md | 12 ++++++------ 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 62b0937a6..c5962bbd4 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -106,7 +106,7 @@ Violating these terms may lead to a permanent ban. ### 4. Permanent Ban **Community Impact**: Demonstrating a pattern of violation of community -standards, including sustained inappropriate behavior, harassment of an +standards, including sustained inappropriate behavior, harassment of an individual, or aggression toward or disparagement of classes of individuals. **Consequence**: A permanent ban from any sort of public interaction within diff --git a/NOTICE b/NOTICE index 09015e99e..32650825d 100644 --- a/NOTICE +++ b/NOTICE @@ -55,7 +55,7 @@ Notice - https://github.com/delta-io/delta/blob/master/NOTICE.txt sbt/sbt-jupiter-interface - https://github.com/sbt/sbt-jupiter-interface/tree/main Copyright sbt-jupiter-interface authors -LIcense - https://github.com/sbt/sbt-jupiter-interface/blob/main/LICENSE +License - https://github.com/sbt/sbt-jupiter-interface/blob/main/LICENSE raphw/byte-buddy - https://github.com/raphw/byte-buddy Copyright byte-buddy authors diff --git a/README.md b/README.md index 7a8797fe6..e5cb16c39 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Let's take Unity Catalog for spin. In this guide, we are going to do the followi as well as provide a convenient way to explore the content of any UC server implementation. ### Prerequisites -You have to ensure that you local environment has the following: +You have to ensure that your local environment has the following: - Clone this repository. - Ensure the `JAVA_HOME` environment variable your terminal is configured to point to JDK11+. - Compile the project using `build/sbt package` @@ -85,7 +85,7 @@ You should see it print a short table saying `Success` = `true`. Then we attach ```sql ATTACH 'unity' AS unity (TYPE UC_CATALOG); ``` -Now we ready to query. Try the following +Now we are ready to query. Try the following ```sql SHOW ALL TABLES; diff --git a/docs/cli.md b/docs/cli.md index 2b1ef6580..95b79474c 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -330,7 +330,7 @@ This is an experimental feature and only supported for python functions that tak It runs the functions using the python engine script at `etc/data/function/python_engine.py`. Example: -Invoke a python sum function that take two integer inputs: +Invoke a python sum function that takes two integer inputs: ```sh bin/uc function call --full_name my_catalog.my_schema.my_function --input_params "1,2" ``` diff --git a/docs/server.md b/docs/server.md index 450bb5b10..a6be64c22 100644 --- a/docs/server.md +++ b/docs/server.md @@ -10,7 +10,7 @@ bin/start-uc-server ## Configuration -The server config file is at the location `etc/conf/server.properties` (relative to the project root). +The server config file is at the location `etc/conf/server.properties` (relative to the project root). - `server.env`: The environment in which the server is running. This can be set to `dev` or `test`. When set to `test` the server will instantiate an empty in-memory h2 database for storing metadata. diff --git a/docs/tutorial.md b/docs/tutorial.md index e399c9a34..7dc38db90 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -2,11 +2,11 @@ Let's take Unity Catalog for spin. In this tutorial, we are going to do the following: - In one terminal, run the UC server. - In another terminal, we will explore the contents of the UC server using the UC CLI, - which is example UC connector provided to demonstrate how to use the UC SDK for various assets, + which is an example UC connector provided to demonstrate how to use the UC SDK for various assets, as well as provide a convenient way to explore the content of any UC server implementation. ### Prerequisites -You have to ensure that you local environment has the following: +You have to ensure that your local environment has the following: - Clone this repository. - Ensure the `JAVA_HOME` environment variable your terminal is configured to point to JDK11+. - Compile the project running `build/sbt package` in the repository root directory. @@ -67,7 +67,7 @@ Let's try creating a new table. bin/uc table create --full_name unity.default.myTable --columns "col1 int, col2 double" --storage_location /tmp/uc/myTable ``` -If you list the tables again you should see this new table. Next, let's write to the table with +If you list the tables again, you should see this new table. Next, let's write to the table with some randomly generated data (again, powered by [Delta Kernel Java](https://delta.io/blog/delta-kernel/)] and read it back. ``` @@ -104,7 +104,7 @@ You should see it print a short table saying `Success` = `true`. Then we attach ```sh ATTACH 'unity' AS unity (TYPE UC_CATALOG); ``` -Now we ready to query. Try the following +Now we are ready to query. Try the following ```sql SHOW ALL TABLES; @@ -137,7 +137,7 @@ You should see two text files listed and one directory. Let's read the content o ``` bin/uc volume read --full_name unity.default.json_files --path c.json ``` -Voila! You have read the content of a file stored in a volume.We can also list the contents of any subdirectory. +Voila! You have read the content of a file stored in a volume. We can also list the contents of any subdirectory. For e.g.: ``` @@ -179,7 +179,7 @@ with the function name and arguments. bin/uc function call --full_name unity.default.sum --input_params "1,2,3" ``` -Voila! You have called a function stored in UC. Lets try and create a new function. +Voila! You have called a function stored in UC. Let's try and create a new function. ``` bin/uc function create --full_name unity.default.myFunction --data_type INT --input_params "a int, b int" --def "c=a*b\nreturn c" From af5c34244b513a7a34001da3c9297e3892204f9b Mon Sep 17 00:00:00 2001 From: Ravi Vijay Date: Sat, 15 Jun 2024 07:43:00 -0700 Subject: [PATCH 14/14] Storing Parent Entity Id's in All Child Entities (#38) * Storing Parent Id's instead of names in all child entities. * This keeps parent child relationships consistent on parent entity name updates. * Added test cases to test out similar scenarios. * Move all validation (entity not found, entity already existing) under a single transaction. ### Testing Ran All Tests by running `build/sbt clean test` and they succeed. --- etc/db/h2db.mv.db | Bin 36864 -> 36864 bytes .../server/persist/CatalogOperations.java | 121 --------- .../server/persist/CatalogRepository.java | 146 +++++++++++ .../server/persist/FunctionRepository.java | 166 +++++++++---- .../server/persist/SchemaOperations.java | 133 ---------- .../server/persist/SchemaRepository.java | 197 +++++++++++++++ .../server/persist/TableRepository.java | 166 +++++++------ .../server/persist/VolumeOperations.java | 163 ------------- .../server/persist/VolumeRepository.java | 229 ++++++++++++++++++ .../converters/VolumeInfoConverter.java | 13 +- .../server/persist/dao/FunctionInfoDAO.java | 27 +-- .../persist/dao/FunctionParameterInfoDAO.java | 4 +- .../server/persist/dao/SchemaInfoDAO.java | 35 +-- .../server/persist/dao/VolumeInfoDAO.java | 16 +- .../server/service/CatalogService.java | 22 +- .../server/service/FunctionService.java | 18 +- .../server/service/SchemaService.java | 25 +- .../TemporaryVolumeCredentialsService.java | 4 +- .../server/service/VolumeService.java | 25 +- .../server/utils/ValidationUtils.java | 46 +--- .../server/base/BaseCRUDTest.java | 4 + .../base/function/BaseFunctionCRUDTest.java | 27 ++- .../base/schema/BaseSchemaCRUDTest.java | 30 ++- .../server/base/table/BaseTableCRUDTest.java | 26 +- .../base/volume/BaseVolumeCRUDTest.java | 37 ++- 25 files changed, 937 insertions(+), 743 deletions(-) delete mode 100644 server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java create mode 100644 server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java delete mode 100644 server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java create mode 100644 server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java delete mode 100644 server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java create mode 100644 server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java diff --git a/etc/db/h2db.mv.db b/etc/db/h2db.mv.db index 2226ef7e9b16240b168f2d5a3d91505ec21d8ae0..0458fa978a67ee4ab263fdff15348e9dc2f0d97a 100644 GIT binary patch delta 4387 zcma)Adw3I77QZujblN5*(4o*GZ3+eCkvfx^Br`lTDJ^ZFDYTS_P#0z zMoI^A0m3dT630+Zg0YKE*}*Ub%PEX3vK;JlPblI`>H!D7k1qBs{#4Q~a`vh1H&Ni{um-r{r=FewyQ0PJ-`Q73i%{aAOv{;#rOl zAF-z`Ki*VUX@YlEARxiQzA*1WZR{1Nfz<2^y;_;a^MXDU-6k z#tb+H0O$kQ_h%ARKWkZn8ta}io5;K(0ElbD@VK$YWWdkq;rV5yaKS1|yvk?Avn#xH zo`$<`r=9Wp=I_FERioo^Q~gE^|E6D#fH#{a!%klrj4n69#S6DUtnn;7chAT;vFHkp z)xsw4WN2MllH;u$H(5X0I>8)+Z-_<60s+S^nF^m-Ivl>#R2o0C)QsuPU8EaczWXZv zC5GHsY(_fP0S{pPavv=6&Wh`rR_L(b!M`sX2e+*l28nyDn*9>Q9uH$HvsC-AcV#DB z?wtZ_h2o^~GGu%`ydW3CM}<2RMz0Ig4cK|jpCeJ#MQh#s*)>(O=b;GAnE0?E7XujR zwjA{IRxVsx*}b)6!B2Xgn0jh~PiPKUB_S-d1Xtz0yYH?Gy>Ar)ZzRwh?n0WNc;Z!~ z38|)QBBuU<@egL134q}`y}_780P?nR)!*-bH^(#M`Nu-nPM>{xEd1GOx%6C@-qSzQC9eeHSOtAO`(EKi;uWB{c>b)@Z5U- z1Gr*sFkQ;Cy}jf)zWUR{FY}vUJhi(mVKG0FC))nzx%|u##rxl{v%AO*PapnvU!$+> z*oXNy8=d>+CoHB(+Il6?!G^r7>8>e?!+-0w9TK8S^@6RuX zU#)E}o!9V)bn_uk@y<;pZ(e%j;Ope!qc^$rZ+ouo-a5YX?2aQ2*wxYzO_f4Hd~#|o zkw<_mG8NWHrn-Y^@f{4Wo=jc7j!X@kVQ}$+wt6DXJ3K8X$TgfVC$op?6Zr+^GTy;( zE?Q!2E>YlZtlcl#1ksL0tkdpNIG2OxY4~1i1J`ZYx#b^!xio&xzE_^w^?B(XmL=vg zyTni;!*Vtrbq1uuQ#RhOxNHJrmw85VDU`y)Kp?X+!^xB)$PSxRq9hwDIvE@1cd6!C zg_RWEjxeApm|0noWQUWIc$-6%6dUVckWpD-Y_e=uB)`kf$r2BHgN@N{3-fW|i@OUd zR&77^a?h0f-is&(&y^p~vFHK*<5r+Em;D%Du}r~?rbkn6F8eXwBR4U!yfAk-j#7m- zd1~|*dHnv6HQW)j1|qGZ+#V`7yms^KQ}^3(-Q0Cu;nl%FwYRPj&TJcJ>$cnsPQJSQ z6<_?px|L@pfIlz8L3Jq7imSM(;ek)Kl@=UJBIRgEa~C!?c)P;KKLUX=R>p6g2YVNpi|D3vxgP-r1Rk(Q0JLF)q znV#%6-moCH$Te%_E_&|0LQ6!Z$<=1@j6HYW=7;ThPc)odQ948^sRQdn`HO09D+EKVV=+E*)v&k;x+`wFbH^(bQ9@p;A_aNK4onl2*&Df_e@;^8z}D z=J}KJDq^v{%TkayiBbU(6Sq)%hL6&zRe|bsumqCX1ob{4Agq%8)Z(eh`lRZ>;s`q_wGcr4qCm3}Y;V;JAlYB5vR#oe zXbWTtZJz9)FvEG zsF=&)m`*EnN3!5+_vS{~(<4Uf)cuya2wl1xXbYyr5^cdO$NTw(TCe#Rb-h+PJLf}b-v)_=3fzfN7Tjh8y0ktuqyy=HfN3|}+J~a(>N7;u zcq{i7rX5RbyF)bbA7HAD4NP`z8#NX!vwvd(TpdN#eFt>vFj4%fC2F{47p0CCwj|8cW7q%qo1A-)yE~=9fc)&wc=6_S%jN<3?kUnDU|2L6g%Y7rQ(j` zt`=cf^4XL(Yc!ha1H;~ae{6{XJr>Iq0F2C?C*t3(pMnj(`6h0h(Eq5b-*|A~Q3#Ej zCK?j&KS*ynNc6w;;3sH0@fwT|kCHlk)~NWT%`}czbjFu$bzmlTS|kt&$^OI(^vP|| z#N0YD1}@rO5=A#dCP#g020YQf^>9lkbS%W(HO|N_zt3dAUAg}dY=K1`#AcocZ1yHI}wcm6R{Q1 zK42pM2!H_q(<%Z{cNCbmzJ&%z;BkN`66iqyMu4vU+YxmGvjYJbW5C=|iD(}%Z`pzV zK|uVY4Qxm zJ^uBi#fqL^z8-zz*U;R+lW1&t7g^%E=ejxQ3^QR$5i4va z@{|$Xw4yRahiDSr(mpq}xnnnB_{+TaQfF8Fh#LNG(RWGTG!^asCrR`m zf7=pt``U#i=V_{1Vk~oCY9B$@byT6RUr2@w$##fcEadrDE$EWu0BIP zLZ&gTxrR^c|8#>Dy}2Pc$)W|#?Ol|zUc1vk9xsh9u+7;%=asrl2#Uo~CzTus$wt>zzXSrvl)5w{HEa3H#= z`1cPl`|Mn*473bK`r|wCN>EYVWVYd@g)6bpczf+Ri`@zc(qJ^1EhSdqdfI>Z_{l(U z4AJ}FQs5#BqBp6PfsN1p<{5LVj;k&smRp`-ci= zeelr}i!12BqCL*aeU|y}pF}Nv(ZX4K{g=PueXr%W&yBqNpmeVkOk2Ed-kI*7*N5tk zeRj5Q$*+>{y58_g_U`zQ4+^NiW6Ct5;cLQ1YDWJf(%6ghOti{qmxBW zBOs@@dt!QCH(>kI*}D-(W*R%fFo_M!eZ{j#9C{r1i$~L^3YnVG<5!fJJH9>Yjtmcm z)uC9O>7^&m{UXVdhLu})#`~g?=9c#Eo^*YwQ&bHO55((Y!-J?RT9s(cGDy&jS)^`y zI$q6KmIKkqHgzZ-b3=D5KGYxC$(1JUj3kPKsLooOtCqbQ4GWmk7+OMCORIVg4= z%ftfoZnXM5M4<@*#)W@ZvoYIjQjg zLxw%U(}{(jEUR(lSJRd_3=gO)wx~OqmD}LJu*$d`r__VH?g}_}e@-5L`NZB^FR+Hq ztYKZxY<=b9w|jjL?)u`RnIo@!cGQLIUSd)aox)Jn2DKVGS|0xidiU_g=M%!Mv&#&?P-qBW zck+EmjyMZ;jo;RF!upbV*)j<|qKuCTeCSE~amFYfDlAAU6{bZ|gMuW}o@|v&!#GCk z(9f$sdija(i1%v!18OWZF&eLmb{c&;v=Ch}JUf9MbxdD89v!_1u@dxZEaicUL~)9S z&nO-rS+XR{kk{}hhoWU~b$TSztJhCTwz|!+Xe491JvW>%&Z=>Fse^8ZcjnsW)hB;`Vt7{6#409XlQz5S86H7Cspi4Ybl z(P3O7r-&+E+|;k&*ye{Dze+12a8Sj`Q79r?24MofGseQ4$S5>TCCDf=PbJb7Vtx8f zk4WWn=bM4)0tJpSV_{giKo*^$FWDOT6-;8mxFoAE?=NZ+ZFR71}F2If( zFBO2)N9i}(ppVk8sy^|N`l_;uNzojS*AyX~B5HcP5_#VgQv_;?@JtbyDFUKne5~Q7 zc?*pI-cGEoI@pWN3_W;Air&0P9S_fLyaJrlzGT-Emvkzd-C~)4Bg}f?^+TEg42ZvGykZ3J1V813w#mz{b9gX%g6WHDNjmYzKA&V8ZgxU&Yjy0CwkX zc#s74OMRG*0(&i1UX pU6^9|YbyZ&n12SN)M+{aeR=}oX8>l+aiwYI@2MM|7L?qR{2#Wre@g%W diff --git a/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java b/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java deleted file mode 100644 index fa30ad7c0..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java +++ /dev/null @@ -1,121 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CatalogInfo; -import io.unitycatalog.server.model.ListCatalogsResponse; -import io.unitycatalog.server.persist.dao.CatalogInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.query.Query; -import io.unitycatalog.server.model.CreateCatalog; -import io.unitycatalog.server.model.UpdateCatalog; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.stream.Collectors; - -public class CatalogOperations { - @Getter - private static final CatalogOperations instance = new CatalogOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(CatalogOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - private CatalogOperations() {} - - public CatalogInfo addCatalog(CreateCatalog createCatalog) { - ValidationUtils.validateSqlObjectName(createCatalog.getName()); - CatalogInfoDAO catalogInfo = new CatalogInfoDAO(); - catalogInfo.setId(java.util.UUID.randomUUID()); - catalogInfo.setName(createCatalog.getName()); - catalogInfo.setComment(createCatalog.getComment()); - catalogInfo.setCreatedAt(new Date()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - session.persist(catalogInfo); - tx.commit(); - System.out.println("Added catalog: " + catalogInfo.getName()); - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } catch(Exception e) { - LOGGER.error("Error adding catalog", e); - return null; - } - } - - public ListCatalogsResponse listCatalogs() { - ListCatalogsResponse response = new ListCatalogsResponse(); - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - response.setCatalogs(session - .createQuery("from CatalogInfoDAO ", CatalogInfoDAO.class).list() - .stream().map(CatalogInfoDAO::toCatalogInfo).collect(Collectors.toList())); - return response; - } catch(Exception e) { - LOGGER.error("Error listing catalogs", e); - return null; - } - } - - public CatalogInfo getCatalog(String name) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - return getCatalog(session, name); - } catch(Exception e) { - LOGGER.error("Error getting catalog", e); - return null; - } - } - - public CatalogInfo getCatalog(Session session, String name) { - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - if (catalogInfo == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); - } - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } - - public CatalogInfoDAO getCatalogInfoDAO(Session session, String name) { - Query query = session - .createQuery("FROM CatalogInfoDAO WHERE name = :value", CatalogInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - return query.uniqueResult(); - } - - public CatalogInfo updateCatalog(String name, UpdateCatalog updateCatalog) { - ValidationUtils.validateSqlObjectName(updateCatalog.getNewName()); - // cna make this just update once we have an identifier that is not the name - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - catalogInfo.setName(updateCatalog.getNewName()); - catalogInfo.setComment(updateCatalog.getComment()); - catalogInfo.setUpdatedAt(new Date()); - session.merge(catalogInfo); - tx.commit(); - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } catch(Exception e) { - LOGGER.error("Error updating catalog", e); - return null; - } - } - - public void deleteCatalog(String name) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - if (catalogInfo != null) { - session.remove(catalogInfo); - tx.commit(); - LOGGER.info("Deleted catalog: {}", catalogInfo.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); - } - } catch (Exception e) { - LOGGER.error("Error deleting catalog", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java b/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java new file mode 100644 index 000000000..dd78a44b7 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java @@ -0,0 +1,146 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.CatalogInfo; +import io.unitycatalog.server.model.ListCatalogsResponse; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.query.Query; +import io.unitycatalog.server.model.CreateCatalog; +import io.unitycatalog.server.model.UpdateCatalog; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.stream.Collectors; + +public class CatalogRepository { + @Getter + private static final CatalogRepository instance = new CatalogRepository(); + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + private CatalogRepository() {} + + public CatalogInfo addCatalog(CreateCatalog createCatalog) { + ValidationUtils.validateSqlObjectName(createCatalog.getName()); + CatalogInfoDAO catalogInfo = new CatalogInfoDAO(); + catalogInfo.setId(java.util.UUID.randomUUID()); + catalogInfo.setName(createCatalog.getName()); + catalogInfo.setComment(createCatalog.getComment()); + catalogInfo.setCreatedAt(new Date()); + + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + if (getCatalogDAO(session, createCatalog.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Catalog already exists: " + createCatalog.getName()); + } + session.persist(catalogInfo); + tx.commit(); + System.out.println("Added catalog: " + catalogInfo.getName()); + return CatalogInfoDAO.toCatalogInfo(catalogInfo); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public ListCatalogsResponse listCatalogs() { + ListCatalogsResponse response = new ListCatalogsResponse(); + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + response.setCatalogs(session + .createQuery("from CatalogInfoDAO ", CatalogInfoDAO.class).list() + .stream().map(CatalogInfoDAO::toCatalogInfo).collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + return response; + } + } + + public CatalogInfo getCatalog(String name) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + CatalogInfoDAO catalogInfo = null; + try { + catalogInfo = getCatalogDAO(session, name); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + if (catalogInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + return CatalogInfoDAO. + toCatalogInfo(getCatalogDAO(session, name)); + } + } + + public CatalogInfoDAO getCatalogDAO(Session session, String name) { + Query query = session + .createQuery("FROM CatalogInfoDAO WHERE name = :value", CatalogInfoDAO.class); + query.setParameter("value", name); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public CatalogInfo updateCatalog(String name, UpdateCatalog updateCatalog) { + ValidationUtils.validateSqlObjectName(updateCatalog.getNewName()); + // cna make this just update once we have an identifier that is not the name + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + CatalogInfoDAO catalogInfo = getCatalogDAO(session, name); + if (catalogInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + if (getCatalogDAO(session, updateCatalog.getNewName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Catalog already exists: " + updateCatalog.getNewName()); + } + catalogInfo.setName(updateCatalog.getNewName()); + catalogInfo.setComment(updateCatalog.getComment()); + catalogInfo.setUpdatedAt(new Date()); + session.merge(catalogInfo); + tx.commit(); + return CatalogInfoDAO.toCatalogInfo(catalogInfo); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteCatalog(String name) { + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + CatalogInfoDAO catalogInfo = getCatalogDAO(session, name); + if (catalogInfo != null) { + session.remove(catalogInfo); + tx.commit(); + LOGGER.info("Deleted catalog: {}", catalogInfo.getName()); + } else { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java b/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java index da8c79410..58387a594 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java +++ b/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java @@ -2,11 +2,9 @@ import io.unitycatalog.server.exception.BaseException; import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CreateFunction; -import io.unitycatalog.server.model.CreateFunctionRequest; -import io.unitycatalog.server.model.FunctionInfo; -import io.unitycatalog.server.model.ListFunctionsResponse; +import io.unitycatalog.server.model.*; import io.unitycatalog.server.persist.dao.FunctionInfoDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; import io.unitycatalog.server.utils.ValidationUtils; import lombok.Getter; import org.hibernate.Session; @@ -24,6 +22,7 @@ public class FunctionRepository { @Getter private static final FunctionRepository instance = new FunctionRepository(); + private static final SchemaRepository schemaRepository = SchemaRepository.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRepository.class); private static final SessionFactory SESSION_FACTORY = HibernateUtil.getSessionFactory(); @@ -58,77 +57,142 @@ public FunctionInfo createFunction(CreateFunctionRequest createFunctionRequest) try (Session session = SESSION_FACTORY.openSession()) { Transaction tx = session.beginTransaction(); - FunctionInfoDAO dao = FunctionInfoDAO.from(functionInfo); - dao.getInputParams().forEach(p -> { - p.setId(UUID.randomUUID().toString()); - p.setFunction(dao); - }); - dao.getReturnParams().forEach(p -> { - p.setId(UUID.randomUUID().toString()); - p.setFunction(dao); - }); - session.persist(dao); - tx.commit(); - return functionInfo; - } catch(Exception e) { - LOGGER.error("Error adding function", e); - return null; + try { + String catalogName = createFunction.getCatalogName(); + String schemaName = createFunction.getSchemaName(); + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, + catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + if (getFunctionDAO(session, catalogName, schemaName, createFunction.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, "Function already exists: " + createFunction.getName()); + } + FunctionInfoDAO dao = FunctionInfoDAO.from(functionInfo); + dao.setSchemaId(schemaInfo.getId()); + dao.getInputParams().forEach(p -> { + p.setId(UUID.randomUUID().toString()); + p.setFunction(dao); + }); + dao.getReturnParams().forEach(p -> { + p.setId(UUID.randomUUID().toString()); + p.setFunction(dao); + }); + session.persist(dao); + tx.commit(); + return functionInfo; + } catch (Exception e) { + tx.rollback(); + throw e; + } } } public ListFunctionsResponse listFunctions(String catalogName, String schemaName, Optional maxResults, Optional nextPageToken) { ListFunctionsResponse response = new ListFunctionsResponse(); try (Session session = SESSION_FACTORY.openSession()) { - String queryString = "from FunctionInfoDAO f where f.catalogName = :catalogName and f.schemaName = :schemaName"; - Query query = session.createQuery(queryString, FunctionInfoDAO.class); - query.setParameter("catalogName", catalogName); - query.setParameter("schemaName", schemaName); - - maxResults.ifPresent(query::setMaxResults); - - if (nextPageToken.isPresent()) { - // Perform pagination logic here if needed - // Example: query.setFirstResult(startIndex); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName + "." + schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + String queryString = "from FunctionInfoDAO f where f.schemaId = :schemaId"; + Query query = session.createQuery(queryString, FunctionInfoDAO.class); + query.setParameter("schemaId", schemaInfo.getId()); + maxResults.ifPresent(query::setMaxResults); + if (nextPageToken.isPresent()) { + // Perform pagination logic here if needed + // Example: query.setFirstResult(startIndex); + } + List functions = query.list(); + response.setFunctions( + functions.stream().map(FunctionInfoDAO::toFunctionInfo) + .peek(f -> addNamespaceInfo(f, catalogName, schemaName)) + .collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; } - List functions = query.list(); - response.setFunctions(functions.stream().map(FunctionInfoDAO::toFunctionInfo).collect(Collectors.toList())); - return response; - } catch(Exception e) { - LOGGER.error("Error listing functions", e); - return null; } + return response; } public FunctionInfo getFunction(String name) { + FunctionInfo functionInfo = null; try (Session session = SESSION_FACTORY.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM FunctionInfoDAO WHERE fullName = :value", FunctionInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - FunctionInfoDAO functionInfoDAO = query.uniqueResult(); - return functionInfoDAO == null ? null : functionInfoDAO.toFunctionInfo(); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String[] parts = name.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid function name: " + name); + } + String catalogName = parts[0], schemaName = parts[1], functionName = parts[2]; + FunctionInfoDAO functionInfoDAO = getFunctionDAO(session, catalogName, schemaName, functionName); + if (functionInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } + functionInfo = functionInfoDAO.toFunctionInfo(); + addNamespaceInfo(functionInfo, catalogName, schemaName); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } } catch (Exception e) { LOGGER.error("Error getting function", e); return null; } + return functionInfo; + } + + public void addNamespaceInfo(FunctionInfo functionInfo, String catalogName, String schemaName) { + functionInfo.setCatalogName(catalogName); + functionInfo.setSchemaName(schemaName); + functionInfo.setFullName(catalogName + "." + schemaName + "." + functionInfo.getName()); + } + + public FunctionInfoDAO getFunctionDAO(Session session, String catalogName, String schemaName, String functionName) { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + Query query = session.createQuery( + "FROM FunctionInfoDAO WHERE name = :name and schemaId = :schemaId", FunctionInfoDAO.class); + query.setParameter("name", functionName); + query.setParameter("schemaId", schemaInfo.getId()); + query.setMaxResults(1); + return query.uniqueResult(); } public void deleteFunction(String name, Boolean force) { try (Session session = SESSION_FACTORY.openSession()) { Transaction tx = session.beginTransaction(); - Query query = session.createQuery("FROM FunctionInfoDAO WHERE fullName = :value", FunctionInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - FunctionInfoDAO functionInfoDAO = query.uniqueResult(); - if (functionInfoDAO != null) { + try { + String[] parts = name.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid function name: " + name); + } + String catalogName = parts[0], schemaName = parts[1], functionName = parts[2]; + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, + catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + FunctionInfoDAO functionInfoDAO = getFunctionDAO(session, catalogName, schemaName, functionName); + if (functionInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } session.remove(functionInfoDAO); tx.commit(); LOGGER.info("Deleted function: {}", functionInfoDAO.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } catch (Exception e) { + tx.rollback(); + throw e; } - } catch (Exception e) { - LOGGER.error("Error deleting function", e); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java b/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java deleted file mode 100644 index 3e6da606b..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java +++ /dev/null @@ -1,133 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CreateSchema; -import io.unitycatalog.server.model.ListSchemasResponse; -import io.unitycatalog.server.model.SchemaInfo; -import io.unitycatalog.server.persist.dao.SchemaInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.query.Query; -import io.unitycatalog.server.model.UpdateSchema; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; - -public class SchemaOperations { - @Getter - public static final SchemaOperations instance = new SchemaOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(SchemaOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - - private SchemaOperations() {} - - public SchemaInfo createSchema(CreateSchema createSchema) { - ValidationUtils.validateSqlObjectName(createSchema.getName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = new SchemaInfoDAO(); - schemaInfo.setSchemaId(UUID.randomUUID()); - schemaInfo.setName(createSchema.getName()); - schemaInfo.setCatalogName(createSchema.getCatalogName()); - schemaInfo.setComment(createSchema.getComment()); - //schemaInfo.setProperties(createSchema.getProperties()); - schemaInfo.setFullName(createSchema.getCatalogName() + "." + createSchema.getName()); - schemaInfo.setCreatedAt(new Date()); - schemaInfo.setUpdatedAt(null); - session.persist(schemaInfo); - tx.commit(); - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } catch (Exception e) { - LOGGER.error("Error creating schema", e); - return null; - } - } - - public ListSchemasResponse listSchemas(String catalogName, Optional maxResults, Optional pageToken) { - try (Session session = sessionFactory.openSession()) { - // TODO: Implement pagination and filtering if required - // For now, returning all schemas without pagination - session.beginTransaction(); - ListSchemasResponse response = new ListSchemasResponse(); - Query query = session.createQuery("FROM SchemaInfoDAO WHERE catalogName = :value", SchemaInfoDAO.class); - query.setParameter("value", catalogName); - response.setSchemas(query.list().stream().map(SchemaInfoDAO::toSchemaInfo).collect(Collectors.toList())); - return response; - } catch (Exception e) { - LOGGER.error("Error listing schemas", e); - return null; - } - } - - public SchemaInfo getSchema(String fullName) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - return getSchema(session, fullName); - } catch (Exception e) { - LOGGER.error("Error getting schema", e); - return null; - } - } - - public SchemaInfoDAO getSchemaInfoDAO(Session session, String fullName) { - Query query = session.createQuery("FROM SchemaInfoDAO WHERE fullName = :value", SchemaInfoDAO.class); - query.setParameter("value", fullName); - query.setMaxResults(1); - return query.uniqueResult(); - } - - public SchemaInfo getSchema(Session session, String fullName) { - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - if (schemaInfo == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); - } - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } - - public SchemaInfo updateSchema(String fullName, UpdateSchema updateSchema) { - ValidationUtils.validateSqlObjectName(updateSchema.getNewName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - - // Update the schema with new values - if (updateSchema.getComment() != null) { - schemaInfo.setComment(updateSchema.getComment()); - } - if (updateSchema.getNewName() != null) { - schemaInfo.setName(updateSchema.getNewName()); - schemaInfo.setFullName(schemaInfo.getCatalogName() + "." + updateSchema.getNewName()); - } - schemaInfo.setUpdatedAt(new Date()); - session.merge(schemaInfo); - tx.commit(); - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } catch (Exception e) { - LOGGER.error("Error updating schema", e); - return null; - } - } - - public void deleteSchema(String fullName) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - if (schemaInfo != null) { - session.remove(schemaInfo); - tx.commit(); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); - } - } catch (Exception e) { - LOGGER.error("Error deleting schema", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java b/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java new file mode 100644 index 000000000..9017b8722 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java @@ -0,0 +1,197 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.*; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.query.Query; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public class SchemaRepository { + @Getter + public static final SchemaRepository instance = new SchemaRepository(); + @Getter + public static final CatalogRepository catalogRepository = CatalogRepository.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + + private SchemaRepository() {} + + public SchemaInfo createSchema(CreateSchema createSchema) { + ValidationUtils.validateSqlObjectName(createSchema.getName()); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + if (getSchemaDAO(session, createSchema.getCatalogName(), createSchema.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Schema already exists: " + createSchema.getName()); + } + CatalogInfoDAO catalogDAO = catalogRepository + .getCatalogDAO(session, createSchema.getCatalogName()); + SchemaInfoDAO schemaInfo = new SchemaInfoDAO(); + schemaInfo.setId(UUID.randomUUID()); + schemaInfo.setName(createSchema.getName()); + schemaInfo.setCatalogId(catalogDAO.getId()); + schemaInfo.setComment(createSchema.getComment()); + schemaInfo.setCreatedAt(new Date()); + schemaInfo.setUpdatedAt(null); + session.persist(schemaInfo); + tx.commit(); + SchemaInfo toReturn = SchemaInfoDAO.toSchemaInfo(schemaInfo); + addNamespaceData(toReturn, createSchema.getCatalogName()); + return toReturn; + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + private void addNamespaceData(SchemaInfo schemaInfo, String catalogName) { + schemaInfo.setCatalogName(catalogName); + schemaInfo.setFullName(catalogName + "." + schemaInfo.getName()); + } + + private SchemaInfo convertFromDAO(SchemaInfoDAO schemaInfoDAO, String fullName) { + String catalogName = fullName.split("\\.")[0]; + SchemaInfo schemaInfo = SchemaInfoDAO.toSchemaInfo(schemaInfoDAO); + addNamespaceData(schemaInfo, catalogName); + return schemaInfo; + } + + public SchemaInfoDAO getSchemaDAO(Session session, UUID catalogId, String schemaName) { + Query query = session + .createQuery("FROM SchemaInfoDAO WHERE name = :name and catalogId = :catalogId", SchemaInfoDAO.class); + query.setParameter("name", schemaName); + query.setParameter("catalogId", catalogId); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public SchemaInfoDAO getSchemaDAO(Session session, String catalogName, String schemaName) { + CatalogInfoDAO catalog = catalogRepository.getCatalogDAO(session, catalogName); + if (catalog == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); + } + return getSchemaDAO(session, catalog.getId(), schemaName); + } + + public SchemaInfoDAO getSchemaDAO(Session session, String fullName) { + String[] namespace = fullName.split("\\."); + return getSchemaDAO(session, namespace[0], namespace[1]); + } + + public ListSchemasResponse listSchemas(String catalogName, Optional maxResults, + Optional pageToken) { + try (Session session = sessionFactory.openSession()) { + ListSchemasResponse response = new ListSchemasResponse(); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + // TODO: Implement pagination and filtering if required + // For now, returning all schemas without pagination + try { + CatalogInfoDAO catalog = catalogRepository.getCatalogDAO(session, catalogName); + if (catalog == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); + } + + Query query = session + .createQuery("FROM SchemaInfoDAO WHERE catalogId = :value", SchemaInfoDAO.class); + query.setParameter("value", catalog.getId()); + response.setSchemas(query.list().stream().map(SchemaInfoDAO::toSchemaInfo) + .peek(x -> addNamespaceData(x, catalogName)) + .collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + return response; + } + } + + public SchemaInfo getSchema(String fullName) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + SchemaInfoDAO schemaInfo = null; + try { + schemaInfo = getSchemaDAO(session, fullName); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); + } + return convertFromDAO(schemaInfo, fullName); + } + } + + public SchemaInfo updateSchema(String fullName, UpdateSchema updateSchema) { + ValidationUtils.validateSqlObjectName(updateSchema.getNewName()); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = getSchemaDAO(session, fullName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + fullName); + } + if (updateSchema.getNewName() != null) { + if (getSchemaDAO(session, fullName.split("\\.")[0], updateSchema + .getNewName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Schema already exists: " + updateSchema.getNewName()); + } + } + // Update the schema with new values + if (updateSchema.getComment() != null) { + schemaInfo.setComment(updateSchema.getComment()); + } + if (updateSchema.getNewName() != null) { + schemaInfo.setName(updateSchema.getNewName()); + } + schemaInfo.setUpdatedAt(new Date()); + session.merge(schemaInfo); + tx.commit(); + return convertFromDAO(schemaInfo, fullName); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteSchema(String fullName) { + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = getSchemaDAO(session, fullName); + if (schemaInfo != null) { + session.remove(schemaInfo); + tx.commit(); + } else { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + fullName); + } + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java b/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java index 8e86449f0..f76ed6606 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java +++ b/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java @@ -3,7 +3,9 @@ import io.unitycatalog.server.exception.BaseException; import io.unitycatalog.server.model.*; import io.unitycatalog.server.persist.converters.TableInfoConverter; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; import io.unitycatalog.server.persist.dao.PropertyDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; import io.unitycatalog.server.persist.dao.TableInfoDAO; import io.unitycatalog.server.utils.ValidationUtils; import lombok.Getter; @@ -22,49 +24,66 @@ public class TableRepository { private static final TableRepository instance = new TableRepository(); private static final Logger LOGGER = LoggerFactory.getLogger(TableRepository.class); private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - private static final CatalogOperations catalogOperations = CatalogOperations.getInstance(); - private static final SchemaOperations schemaOperations = SchemaOperations.getInstance(); + private static final CatalogRepository catalogOperations = CatalogRepository.getInstance(); + private static final SchemaRepository schemaOperations = SchemaRepository.getInstance(); private TableRepository() {} public TableInfo getTableById(String tableId) { LOGGER.debug("Getting table by id: " + tableId); try (Session session = sessionFactory.openSession()) { - TableInfoDAO tableInfoDAO = session.get(TableInfoDAO.class, UUID.fromString(tableId)); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + tableId); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + TableInfoDAO tableInfoDAO = session.get(TableInfoDAO.class, UUID.fromString(tableId)); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + tableId); + } + TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); + tx.commit(); + return tableInfo; + } catch (Exception e) { + tx.rollback(); + throw e; } - - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); - tableInfo.setProperties(TableInfoConverter - .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); - return tableInfo; } } public TableInfo getTable(String fullName) { LOGGER.debug("Getting table: " + fullName); + TableInfo tableInfo = null; try (Session session = sessionFactory.openSession()) { - String[] parts = fullName.split("\\."); - if (parts.length != 3) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid table name: " + fullName); - } - String catalogName = parts[0]; - String schemaName = parts[1]; - String tableName = parts[2]; - TableInfoDAO tableInfoDAO = findTable(session, catalogName, schemaName, tableName); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String[] parts = fullName.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid table name: " + fullName); + } + String catalogName = parts[0]; + String schemaName = parts[1]; + String tableName = parts[2]; + TableInfoDAO tableInfoDAO = findTable(session, catalogName, schemaName, tableName); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + } + tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + tableInfo.setProperties(TableInfoConverter + .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); + tableInfo.setCatalogName(catalogName); + tableInfo.setSchemaName(schemaName); + tx.commit(); + } catch (Exception e) { + if (tx != null && tx.getStatus().canRollback()) { + tx.rollback(); + } + throw e; } - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); - tableInfo.setProperties(TableInfoConverter - .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); - tableInfo.setCatalogName(catalogName); - tableInfo.setSchemaName(schemaName); - return tableInfo; } + return tableInfo; } private List findProperties(Session session, UUID tableId) { @@ -161,18 +180,11 @@ private String getTableFullName(TableInfo tableInfo) { } public String getSchemaId(Session session, String catalogName, String schemaName) { - - CatalogInfo catalogInfoDAO = catalogOperations.getCatalog(session, catalogName); - if (catalogInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); - } - SchemaInfo schemaInfo = schemaOperations.getSchema(session, catalogName - + "." + schemaName); + SchemaInfoDAO schemaInfo = schemaOperations.getSchemaDAO(session, catalogName, schemaName); if (schemaInfo == null) { throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); } - return schemaInfo.getSchemaId(); - + return schemaInfo.getId().toString(); } public static Date convertMillisToDate(String millisString) { @@ -206,37 +218,45 @@ public static String getNextPageToken(List tables) { * @return */ public ListTablesResponse listTables(String catalogName, - String schemaName, - Integer maxResults, - String nextPageToken, - Boolean omitProperties, - Boolean omitColumns) { + String schemaName, + Integer maxResults, + String nextPageToken, + Boolean omitProperties, + Boolean omitColumns) { List result = new ArrayList<>(); String returnNextPageToken = null; String hql = "FROM TableInfoDAO t WHERE t.schemaId = :schemaId and " + "(t.updatedAt < :pageToken OR :pageToken is null) order by t.updatedAt desc"; try (Session session = sessionFactory.openSession()) { - String schemaId = getSchemaId(session, catalogName, schemaName); - Query query = session.createQuery(hql, TableInfoDAO.class); - query.setParameter("schemaId", UUID.fromString(schemaId)); - query.setParameter("pageToken", convertMillisToDate(nextPageToken)); - query.setMaxResults(maxResults); - List tableInfoDAOList = query.list(); - - returnNextPageToken = getNextPageToken(tableInfoDAOList); - - for (TableInfoDAO tableInfoDAO : tableInfoDAOList) { - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - if (!omitColumns) { - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String schemaId = getSchemaId(session, catalogName, schemaName); + Query query = session.createQuery(hql, TableInfoDAO.class); + query.setParameter("schemaId", UUID.fromString(schemaId)); + query.setParameter("pageToken", convertMillisToDate(nextPageToken)); + query.setMaxResults(maxResults); + List tableInfoDAOList = query.list(); + returnNextPageToken = getNextPageToken(tableInfoDAOList); + for (TableInfoDAO tableInfoDAO : tableInfoDAOList) { + TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + if (!omitColumns) { + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + } + if (!omitProperties) { + tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap( + findProperties(session, tableInfoDAO.getId()))); + } + tableInfo.setCatalogName(catalogName); + tableInfo.setSchemaName(schemaName); + result.add(tableInfo); } - if (!omitProperties) { - tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap( - findProperties(session, tableInfoDAO.getId()))); + tx.commit(); + } catch (Exception e) { + if (tx != null && tx.getStatus().canRollback()) { + tx.rollback(); } - tableInfo.setCatalogName(catalogName); - tableInfo.setSchemaName(schemaName); - result.add(tableInfo); + throw e; } } return new ListTablesResponse().tables(result).nextPageToken(returnNextPageToken); @@ -252,22 +272,20 @@ public void deleteTable(String fullName) { String catalogName = parts[0]; String schemaName = parts[1]; String tableName = parts[2]; - - String schemaId = getSchemaId(session, catalogName, schemaName); - - TableInfoDAO tableInfoDAO = findBySchemaIdAndName(session, schemaId, tableName); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); - } Transaction tx = session.beginTransaction(); try { + String schemaId = getSchemaId(session, catalogName, schemaName); + TableInfoDAO tableInfoDAO = findBySchemaIdAndName(session, schemaId, tableName); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + } if (TableType.MANAGED.getValue().equals(tableInfoDAO.getType())) { - FileUtils.deleteDirectory(tableInfoDAO.getUrl()); + try { + FileUtils.deleteDirectory(tableInfoDAO.getUrl()); + } catch (Throwable e) { + LOGGER.error("Error deleting table directory: " + tableInfoDAO.getUrl()); + } } - } catch (Throwable e) { - LOGGER.error("Error deleting table directory: " + tableInfoDAO.getUrl()); - } - try { findProperties(session, tableInfoDAO.getId()).forEach(session::remove); session.remove(tableInfoDAO); tx.commit(); @@ -280,4 +298,4 @@ public void deleteTable(String fullName) { } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java b/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java deleted file mode 100644 index 5be826884..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java +++ /dev/null @@ -1,163 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.*; -import io.unitycatalog.server.persist.converters.VolumeInfoConverter; -import io.unitycatalog.server.persist.dao.VolumeInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.hibernate.query.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; - -public class VolumeOperations { - - @Getter - public static final VolumeOperations instance = new VolumeOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(VolumeOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - - private VolumeOperations() {} - - public VolumeInfo createVolume(CreateVolumeRequestContent createVolumeRequest) { - ValidationUtils.validateSqlObjectName(createVolumeRequest.getName()); - String volumeFullName = createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName() + "." + createVolumeRequest.getName(); - VolumeInfo volumeInfo = new VolumeInfo(); - volumeInfo.setVolumeId(UUID.randomUUID().toString()); - volumeInfo.setCatalogName(createVolumeRequest.getCatalogName()); - volumeInfo.setSchemaName(createVolumeRequest.getSchemaName()); - volumeInfo.setName(createVolumeRequest.getName()); - volumeInfo.setComment(createVolumeRequest.getComment()); - volumeInfo.setFullName(volumeFullName); - volumeInfo.setCreatedAt(System.currentTimeMillis()); - volumeInfo.setVolumeType(createVolumeRequest.getVolumeType()); - if (VolumeType.MANAGED.equals(createVolumeRequest.getVolumeType())) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Managed volume creation is not supported"); - } - if (createVolumeRequest.getStorageLocation() == null) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Storage location is required for external volume"); - } - volumeInfo.setStorageLocation(createVolumeRequest.getStorageLocation()); - VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - session.persist(volumeInfoDAO); - tx.commit(); - LOGGER.info("Added volume: {}", volumeInfo.getName()); - return VolumeInfoConverter.fromDAO(volumeInfoDAO); - } catch (Exception e) { - LOGGER.error("Error adding volume", e); - return null; - } - } - - public VolumeInfo getVolume(String fullName) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM VolumeInfoDAO WHERE fullName = :value", VolumeInfoDAO.class); - query.setParameter("value", fullName); - query.setMaxResults(1); - return VolumeInfoConverter.fromDAO(query.uniqueResult()); - } catch (Exception e) { - LOGGER.error("Error getting volume", e); - return null; - } - } - - public VolumeInfo getVolumeById(String volumeId) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM VolumeInfoDAO WHERE volumeId = :value", VolumeInfoDAO.class); - query.setParameter("value", volumeId); - query.setMaxResults(1); - return VolumeInfoConverter.fromDAO(query.uniqueResult()); - } catch (Exception e) { - throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + volumeId); - } - } - - public ListVolumesResponseContent listVolumes(String catalogName, String schemaName, Optional maxResults, Optional pageToken, Optional includeBrowse) { - ListVolumesResponseContent responseContent = new ListVolumesResponseContent(); - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - - String queryString = "from VolumeInfoDAO v where v.catalogName = :catalogName and v.schemaName = :schemaName"; - Query query = session.createQuery(queryString, VolumeInfoDAO.class); - query.setParameter("catalogName", catalogName); - query.setParameter("schemaName", schemaName); - - maxResults.ifPresent(query::setMaxResults); - - if (pageToken.isPresent()) { - // Perform pagination logic here if needed - // Example: query.setFirstResult(startIndex); - } - - responseContent.setVolumes(query.list().stream() - .map(VolumeInfoConverter::fromDAO).collect(Collectors.toList())); - return responseContent; - } catch (Exception e) { - LOGGER.error("Error listing volumes", e); - return null; - } - } - - public VolumeInfo updateVolume(String name, UpdateVolumeRequestContent updateVolumeRequest) { - ValidationUtils.validateSqlObjectName(updateVolumeRequest.getNewName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - VolumeInfoDAO volumeInfo = VolumeInfoConverter.toDAO(getVolume(name)); - if (volumeInfo != null) { - if (updateVolumeRequest.getNewName() != null) { - volumeInfo.setName(updateVolumeRequest.getNewName()); - String fullName = volumeInfo.getCatalogName() + "." + volumeInfo.getSchemaName() + "." + updateVolumeRequest.getNewName(); - volumeInfo.setFullName(fullName); - } - if (updateVolumeRequest.getComment() != null) { - volumeInfo.setComment(updateVolumeRequest.getComment()); - } - volumeInfo.setUpdatedAt(new Date()); - session.merge(volumeInfo); - tx.commit(); - LOGGER.info("Updated volume: {}", volumeInfo.getName()); - } - return VolumeInfoConverter.fromDAO(volumeInfo); - } catch (Exception e) { - LOGGER.error("Error updating volume", e); - return null; - } - } - - public void deleteVolume(String name) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - VolumeInfo volumeInfo = getVolume(name); - VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); - if (volumeInfoDAO != null) { - if (VolumeType.MANAGED.getValue().equals(volumeInfoDAO.getVolumeType())) { - try { - FileUtils.deleteDirectory(volumeInfo.getStorageLocation()); - } catch (Exception e) { - LOGGER.error("Error deleting volume directory", e); - } - } - session.remove(volumeInfoDAO); - tx.commit(); - LOGGER.info("Deleted volume: {}", volumeInfo.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); - } - } catch (Exception e) { - LOGGER.error("Error deleting volume", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java b/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java new file mode 100644 index 000000000..4edc2a6db --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java @@ -0,0 +1,229 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.*; +import io.unitycatalog.server.persist.converters.VolumeInfoConverter; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; +import io.unitycatalog.server.persist.dao.VolumeInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.hibernate.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public class VolumeRepository { + + @Getter + public static final VolumeRepository instance = new VolumeRepository(); + public static final SchemaRepository schemaRepository = SchemaRepository.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(VolumeRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + + private VolumeRepository() {} + + public VolumeInfo createVolume(CreateVolumeRequestContent createVolumeRequest) { + ValidationUtils.validateSqlObjectName(createVolumeRequest.getName()); + String volumeFullName = createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName() + "." + createVolumeRequest.getName(); + VolumeInfo volumeInfo = new VolumeInfo(); + volumeInfo.setVolumeId(UUID.randomUUID().toString()); + volumeInfo.setCatalogName(createVolumeRequest.getCatalogName()); + volumeInfo.setSchemaName(createVolumeRequest.getSchemaName()); + volumeInfo.setName(createVolumeRequest.getName()); + volumeInfo.setComment(createVolumeRequest.getComment()); + volumeInfo.setFullName(volumeFullName); + volumeInfo.setCreatedAt(System.currentTimeMillis()); + volumeInfo.setVolumeType(createVolumeRequest.getVolumeType()); + if (VolumeType.MANAGED.equals(createVolumeRequest.getVolumeType())) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Managed volume creation is not supported"); + } + if (createVolumeRequest.getStorageLocation() == null) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Storage location is required for external volume"); + } + volumeInfo.setStorageLocation(createVolumeRequest.getStorageLocation()); + VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfoDAO = schemaRepository.getSchemaDAO(session, + createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName()); + if (schemaInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName()); + } + if (getVolumeDAO(session, createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName(), createVolumeRequest.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, "Volume already exists: " + volumeFullName); + } + volumeInfoDAO.setSchemaId(schemaInfoDAO.getId()); + session.persist(volumeInfoDAO); + tx.commit(); + LOGGER.info("Added volume: {}", volumeInfo.getName()); + return convertFromDAO(volumeInfoDAO, createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName()); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public VolumeInfo getVolume(String fullName) { + try (Session session = sessionFactory.openSession()) { + String[] namespace = fullName.split("\\."); + if (namespace.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid volume name: " + fullName); + } + String catalogName = namespace[0]; + String schemaName = namespace[1]; + String volumeName = namespace[2]; + return convertFromDAO(getVolumeDAO(session, catalogName, schemaName, volumeName), + catalogName, schemaName); + } catch (Exception e) { + LOGGER.error("Error getting volume", e); + return null; + } + } + + public VolumeInfoDAO getVolumeDAO(Session session, String catalogName, String schemaName, String volumeName) { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + catalogName + "." + schemaName); + } + Query query = session.createQuery( + "FROM VolumeInfoDAO WHERE name = :name and schemaId = :schemaId", VolumeInfoDAO.class); + query.setParameter("name", volumeName); + query.setParameter("schemaId", schemaInfo.getId()); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public VolumeInfo getVolumeById(String volumeId) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + Query query = session.createQuery("FROM VolumeInfoDAO WHERE id = :value", VolumeInfoDAO.class); + query.setParameter("value", UUID.fromString(volumeId)); + query.setMaxResults(1); + VolumeInfoDAO volumeInfoDAO = query.uniqueResult(); + tx.commit(); + return VolumeInfoConverter.fromDAO(volumeInfoDAO); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public ListVolumesResponseContent listVolumes(String catalogName, String schemaName, Optional maxResults, Optional pageToken, Optional includeBrowse) { + ListVolumesResponseContent responseContent = new ListVolumesResponseContent(); + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + catalogName + "." + schemaName); + } + String queryString = "from VolumeInfoDAO v where v.schemaId = :schemaId"; + Query query = session.createQuery(queryString, VolumeInfoDAO.class); + query.setParameter("schemaId", schemaInfo.getId()); + maxResults.ifPresent(query::setMaxResults); + if (pageToken.isPresent()) { + // Perform pagination logic here if needed + // Example: query.setFirstResult(startIndex); + } + responseContent.setVolumes(query.list().stream() + .map(x -> convertFromDAO(x, catalogName, schemaName)) + .collect(Collectors.toList())); + tx.commit(); + return responseContent; + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + private VolumeInfo convertFromDAO(VolumeInfoDAO volumeInfoDAO, String catalogName, String schemaName) { + VolumeInfo volumeInfo = VolumeInfoConverter.fromDAO(volumeInfoDAO); + volumeInfo.setCatalogName(catalogName); + volumeInfo.setSchemaName(schemaName); + volumeInfo.setFullName(catalogName + "." + schemaName + "." + volumeInfo.getName()); + return volumeInfo; + } + + public VolumeInfo updateVolume(String name, UpdateVolumeRequestContent updateVolumeRequest) { + ValidationUtils.validateSqlObjectName(updateVolumeRequest.getNewName()); + String[] namespace =name.split("\\."); + String catalog = namespace[0], schema = namespace[1], volume = namespace[2]; + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + VolumeInfoDAO volumeInfo = getVolumeDAO(session, catalog, schema, volume); + if (volumeInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); + } + if (updateVolumeRequest.getNewName() != null) { + VolumeInfoDAO existingVolume = getVolumeDAO(session, catalog, schema, + updateVolumeRequest.getNewName()); + if (existingVolume != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Volume already exists: " + updateVolumeRequest.getNewName()); + } + } + if (updateVolumeRequest.getNewName() != null) { + volumeInfo.setName(updateVolumeRequest.getNewName()); + } + if (updateVolumeRequest.getComment() != null) { + volumeInfo.setComment(updateVolumeRequest.getComment()); + } + volumeInfo.setUpdatedAt(new Date()); + session.merge(volumeInfo); + tx.commit(); + LOGGER.info("Updated volume: {}", volumeInfo.getName()); + return convertFromDAO(volumeInfo, catalog, schema); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteVolume(String name) { + try (Session session = sessionFactory.openSession()) { + String[] namespace = name.split("\\."); + if (namespace.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid volume name: " + name); + } + String catalog = namespace[0], schema = namespace[1], volume = namespace[2]; + Transaction tx = session.beginTransaction(); + try { + VolumeInfoDAO volumeInfoDAO = getVolumeDAO(session, catalog, schema, volume); + if (volumeInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); + } + if (VolumeType.MANAGED.getValue().equals(volumeInfoDAO.getVolumeType())) { + try { + FileUtils.deleteDirectory(volumeInfoDAO.getStorageLocation()); + } catch (Exception e) { + LOGGER.error("Error deleting volume directory", e); + } + } + session.remove(volumeInfoDAO); + tx.commit(); + LOGGER.info("Deleted volume: {}", volumeInfoDAO.getName()); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java b/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java index d681f292b..b5a6532e2 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java +++ b/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java @@ -6,6 +6,7 @@ import io.unitycatalog.server.persist.dao.VolumeInfoDAO; import java.util.Date; +import java.util.UUID; public class VolumeInfoConverter { @@ -14,15 +15,12 @@ public static VolumeInfoDAO toDAO(VolumeInfo volumeInfo) { return null; } return VolumeInfoDAO.builder() - .volumeId(volumeInfo.getVolumeId()) + .id(UUID.fromString(volumeInfo.getVolumeId())) .name(volumeInfo.getName()) - .catalogName(volumeInfo.getCatalogName()) - .schemaName(volumeInfo.getSchemaName()) .comment(volumeInfo.getComment()) .storageLocation(volumeInfo.getStorageLocation()) .createdAt(volumeInfo.getCreatedAt() != null? new Date(volumeInfo.getCreatedAt()) : new Date()) .updatedAt(volumeInfo.getUpdatedAt() != null ? new Date(volumeInfo.getUpdatedAt()) : new Date()) - .fullName(volumeInfo.getFullName()) .volumeType(volumeInfo.getVolumeType().getValue()) .build(); } @@ -32,16 +30,13 @@ public static VolumeInfo fromDAO(VolumeInfoDAO dao) { return null; } return new VolumeInfo() - .volumeId(dao.getVolumeId()) + .volumeId(dao.getId().toString()) .name(dao.getName()) - .catalogName(dao.getCatalogName()) - .schemaName(dao.getSchemaName()) .comment(dao.getComment()) .storageLocation(FileUtils.convertRelativePathToURI(dao.getStorageLocation())) .createdAt(dao.getCreatedAt().getTime()) .updatedAt(dao.getUpdatedAt().getTime()) - .fullName(dao.getFullName()) .volumeType(VolumeType.valueOf(dao.getVolumeType())); } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java index 2a7ea9abb..192a36ca5 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java +++ b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java @@ -7,6 +7,7 @@ import io.unitycatalog.server.model.ColumnTypeName; import java.util.List; +import java.util.UUID; // Hibernate annotations @Entity @@ -20,17 +21,14 @@ @Builder public class FunctionInfoDAO { @Id - @Column(name = "function_id") - private String functionId; + @Column(name = "id", columnDefinition = "BINARY(16)") + private UUID id; @Column(name = "name", nullable = false) private String name; - @Column(name = "catalog_name", nullable = false) - private String catalogName; - - @Column(name = "schema_name", nullable = false) - private String schemaName; + @Column(name = "schema_id", columnDefinition = "BINARY(16)") + private UUID schemaId; @Column(name = "comment") private String comment; @@ -44,9 +42,6 @@ public class FunctionInfoDAO { @Column(name = "data_type") private ColumnTypeName dataType; - @Column(name = "full_name") - private String fullName; - @Column(name = "full_data_type") private String fullDataType; @@ -88,15 +83,12 @@ public class FunctionInfoDAO { public static FunctionInfoDAO from(FunctionInfo functionInfo) { FunctionInfoDAO functionInfoDAO = FunctionInfoDAO.builder() - .functionId(functionInfo.getFunctionId()) + .id(functionInfo.getFunctionId()!= null? UUID.fromString(functionInfo.getFunctionId()) : null) .name(functionInfo.getName()) - .catalogName(functionInfo.getCatalogName()) - .schemaName(functionInfo.getSchemaName()) .comment(functionInfo.getComment()) .createdAt(functionInfo.getCreatedAt()) .updatedAt(functionInfo.getUpdatedAt()) .dataType(functionInfo.getDataType()) - .fullName(functionInfo.getFullName()) .fullDataType(functionInfo.getFullDataType()) .externalLanguage(functionInfo.getExternalLanguage()) .isDeterministic(functionInfo.getIsDeterministic()) @@ -122,15 +114,12 @@ public static FunctionInfoDAO from(FunctionInfo functionInfo) { public FunctionInfo toFunctionInfo() { FunctionInfo functionInfo = new FunctionInfo() - .functionId(functionId) + .functionId(id.toString()) .name(name) - .catalogName(catalogName) - .schemaName(schemaName) .comment(comment) .createdAt(createdAt) .updatedAt(updatedAt) .dataType(dataType) - .fullName(fullName) .fullDataType(fullDataType) .externalLanguage(externalLanguage) .isDeterministic(isDeterministic) @@ -147,4 +136,4 @@ public FunctionInfo toFunctionInfo() { functionInfo.returnParams(FunctionParameterInfoDAO.toFunctionParameterInfos(returnParams)); return functionInfo; } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java index 8846e24fc..e4894a475 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java +++ b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java @@ -28,7 +28,7 @@ public enum InputOrReturnEnum { private String id; @ManyToOne - @JoinColumn(name = "function_id") + @JoinColumn(name = "function_id", referencedColumnName = "id") private FunctionInfoDAO function; // Whether the parameter is an input or return parameter @@ -119,4 +119,4 @@ public static FunctionParameterInfos toFunctionParameterInfos(List includeBrowse) { - VolumeInfo volumeInfo = volumeOperations.getVolume(fullName); - if (volumeInfo != null) { - return HttpResponse.ofJson(volumeInfo); - } - return ValidationUtils.entityNotFoundResponse(ValidationUtils.VOLUME, fullName); + return HttpResponse.ofJson(volumeOperations.getVolume(fullName)); } @Patch("/{full_name}") public HttpResponse updateVolume(@Param("full_name") String fullName, UpdateVolumeRequestContent updateVolumeRequest) { - VolumeInfo updatedVolume = volumeOperations.updateVolume(fullName, updateVolumeRequest); - if (updatedVolume != null) { - return HttpResponse.ofJson(updatedVolume); - } - return ValidationUtils.entityNotFoundResponse(ValidationUtils.VOLUME, fullName); + return HttpResponse.ofJson(volumeOperations.updateVolume(fullName, updateVolumeRequest)); } @Delete("/{full_name}") diff --git a/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java b/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java index e6a964241..a5e4786f8 100644 --- a/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java +++ b/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java @@ -1,62 +1,22 @@ package io.unitycatalog.server.utils; import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.persist.FunctionRepository; -import io.unitycatalog.server.persist.SchemaOperations; +import io.unitycatalog.server.persist.*; import com.linecorp.armeria.common.HttpResponse; -import io.unitycatalog.server.persist.CatalogOperations; -import io.unitycatalog.server.persist.TableRepository; -import io.unitycatalog.server.persist.VolumeOperations; +import io.unitycatalog.server.persist.VolumeRepository; import io.unitycatalog.server.exception.ErrorCode; -import java.util.Optional; import java.util.regex.Pattern; public class ValidationUtils { public static final String CATALOG = "Catalog"; public static final String SCHEMA = "Schema"; - public static final String VOLUME = "Volume"; public static final String FUNCTION = "Function"; - private static final CatalogOperations CATALOG_OPERATIONS = CatalogOperations.getInstance(); - private static final SchemaOperations SCHEMA_OPERATIONS = SchemaOperations.getInstance(); - private static final VolumeOperations VOLUME_OPERATIONS = VolumeOperations.getInstance(); - private static final FunctionRepository FUNCTION_REPOSITORY = FunctionRepository.getInstance(); - private static final TableRepository TABLE_REPOSITORY = TableRepository.getInstance(); // Regex to reject names containing a period, space, forward-slash, C0 + DEL control characters private static final Pattern INVALID_FORMAT = Pattern.compile("[\\.\\ \\/\\x00-\\x1F\\x7F]"); private static final Integer MAX_NAME_LENGTH = 255; - public static HttpResponse entityNotFoundResponse(String entity, String... message) { - throw new BaseException(ErrorCode.NOT_FOUND, entity + " not found: " + String.join(".", message)); - } - - public static HttpResponse entityAlreadyExistsResponse(String entity, String... message) { - throw new BaseException(ErrorCode.ALREADY_EXISTS, entity + " already exists: " + String.join(".", message)); - } - - public static boolean catalogExists(String catalogName) { - return CATALOG_OPERATIONS.getCatalog(catalogName) != null; - } - - public static boolean schemaExists(String catalogName, String schemaName) { - if (!catalogExists(catalogName)) - return false; - return SCHEMA_OPERATIONS.getSchema(catalogName + "." + schemaName) != null; - } - - public static boolean volumeExists(String catalogName, String schemaName, String volumeName) { - if (!schemaExists(catalogName, schemaName)) - return false; - return VOLUME_OPERATIONS.getVolume(catalogName + "." + schemaName + "." + volumeName) != null; - } - - public static boolean functionExists(String catalogName, String schemaName, String functionName) { - if (!schemaExists(catalogName, schemaName)) - return false; - return FUNCTION_REPOSITORY.getFunction(catalogName + "." + schemaName + "." + functionName) != null; - } - public static void validateSqlObjectName(String name) { if (name == null || name.isEmpty()) { throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Name cannot be empty"); @@ -68,4 +28,4 @@ public static void validateSqlObjectName(String name) { throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Name cannot contain a period, space, forward-slash, or control characters"); } } -} +} \ No newline at end of file diff --git a/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java index d4cbab4f3..2d790f340 100644 --- a/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java @@ -18,6 +18,10 @@ protected void cleanUp() { if (catalogOperations.getCatalog(CATALOG_NAME) != null) { catalogOperations.deleteCatalog(CATALOG_NAME); } + } catch (Exception e) { + // Ignore + } + try { if (catalogOperations.getCatalog(CATALOG_NEW_NAME) != null) { catalogOperations.deleteCatalog(CATALOG_NEW_NAME); } diff --git a/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java index d10ed8767..caf43d055 100644 --- a/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java @@ -3,6 +3,7 @@ import io.unitycatalog.client.ApiException; import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; +import io.unitycatalog.server.utils.TestUtils; import org.junit.*; import io.unitycatalog.server.base.ServerConfig; @@ -42,6 +43,20 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); } @@ -105,9 +120,17 @@ public void testFunctionCRUD() throws ApiException { FunctionInfo retrievedFunctionInfo = functionOperations.getFunction(FUNCTION_FULL_NAME); assertEquals(functionInfo, retrievedFunctionInfo); + // now update the parent catalog + catalogOperations.updateCatalog(CATALOG_NAME, CATALOG_NEW_NAME, ""); + // get the function again + FunctionInfo retrievedFunctionInfoAfterCatUpdate = functionOperations.getFunction( + CATALOG_NEW_NAME + "." + SCHEMA_NAME + "." + FUNCTION_NAME); + assertEquals(retrievedFunctionInfo.getFunctionId(), + retrievedFunctionInfoAfterCatUpdate.getFunctionId()); + // Delete function - functionOperations.deleteFunction(FUNCTION_FULL_NAME, true); - assertFalse(contains(functionOperations.listFunctions(CATALOG_NAME, SCHEMA_NAME), + functionOperations.deleteFunction(CATALOG_NEW_NAME + "." + SCHEMA_NAME + "." + FUNCTION_NAME, true); + assertFalse(contains(functionOperations.listFunctions(CATALOG_NEW_NAME, SCHEMA_NAME), functionInfo, f -> f.getFunctionId().equals(functionInfo.getFunctionId()))); } } diff --git a/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java index a53526007..3b1535621 100644 --- a/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java @@ -1,10 +1,7 @@ package io.unitycatalog.server.base.schema; import io.unitycatalog.client.ApiException; -import io.unitycatalog.client.model.CreateCatalog; -import io.unitycatalog.client.model.CreateSchema; -import io.unitycatalog.client.model.SchemaInfo; -import io.unitycatalog.client.model.UpdateSchema; +import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; import io.unitycatalog.server.base.ServerConfig; import io.unitycatalog.server.utils.TestUtils; @@ -38,7 +35,22 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); + } protected void createCommonResources() throws ApiException { @@ -84,10 +96,16 @@ public void testSchemaCRUDL() throws ApiException { Assert.assertEquals(TestUtils.SCHEMA_NEW_FULL_NAME, updatedSchemaInfo.getFullName()); assertNotNull(updatedSchemaInfo.getUpdatedAt()); + //Now update the parent catalog name + catalogOperations.updateCatalog(TestUtils.CATALOG_NAME, TestUtils.CATALOG_NEW_NAME, ""); + SchemaInfo updatedSchemaInfo2 = schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + + "." + TestUtils.SCHEMA_NEW_NAME); + assertEquals(retrievedSchemaInfo.getSchemaId(), updatedSchemaInfo2.getSchemaId()); + // Delete schema System.out.println("Testing delete schema.."); - schemaOperations.deleteSchema(TestUtils.SCHEMA_NEW_FULL_NAME); - assertFalse(TestUtils.contains(schemaOperations.listSchemas(TestUtils.CATALOG_NAME), updatedSchemaInfo, (schema) -> + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + assertFalse(TestUtils.contains(schemaOperations.listSchemas(TestUtils.CATALOG_NEW_NAME), updatedSchemaInfo, (schema) -> schema.getName().equals(TestUtils.SCHEMA_NEW_NAME))); } } diff --git a/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java index d5bbf9c1f..031eab4c0 100644 --- a/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java @@ -54,6 +54,21 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } + super.cleanUp(); } @@ -205,11 +220,18 @@ public void testTableCRUD() throws IOException, ApiException { assertNotNull(managedListTable.getCreatedAt()); assertNotNull(managedListTable.getTableId()); + // Now update the parent schema name + schemaOperations.updateSchema(TestUtils.SCHEMA_FULL_NAME, new UpdateSchema().newName(TestUtils.SCHEMA_NEW_NAME).comment(TestUtils.SCHEMA_COMMENT)); + // now fetch the table again + TableInfo managedTableAfterSchemaUpdate = tableOperations.getTable(TestUtils.CATALOG_NAME + "." + TestUtils.SCHEMA_NEW_NAME + "." + TABLE_NAME); + assertEquals(managedTable.getTableId(), managedTableAfterSchemaUpdate.getTableId()); + // Delete managed table + String newTableFullName = TestUtils.CATALOG_NAME + "." + TestUtils.SCHEMA_NEW_NAME + "." + TABLE_NAME; System.out.println("Testing delete table.."); - tableOperations.deleteTable(TABLE_FULL_NAME); - assertThrows(Exception.class, () -> tableOperations.getTable(TABLE_FULL_NAME)); + tableOperations.deleteTable(newTableFullName); + assertThrows(Exception.class, () -> tableOperations.getTable(newTableFullName)); } } \ No newline at end of file diff --git a/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java index b3a327307..255f09ed4 100644 --- a/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java @@ -6,6 +6,7 @@ import io.unitycatalog.server.persist.FileUtils; import io.unitycatalog.server.persist.HibernateUtil; import io.unitycatalog.server.persist.dao.VolumeInfoDAO; +import io.unitycatalog.server.utils.TestUtils; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.junit.*; @@ -56,13 +57,29 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); } + private SchemaInfo schemaInfo; + protected void createCommonResources() throws ApiException { // Common setup operations such as creating a catalog and schema catalogOperations.createCatalog(CATALOG_NAME, "Common catalog for volumes"); - schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); + schemaInfo = schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); } @Test @@ -126,15 +143,13 @@ public void testVolumeCRUD() throws ApiException { try (Session session = sessionFactory.openSession()) { session.beginTransaction(); VolumeInfoDAO managedVolume = VolumeInfoDAO.builder() - .catalogName(CATALOG_NAME) - .schemaName(SCHEMA_NAME) .volumeType(VolumeType.MANAGED.getValue()) .storageLocation("/tmp/managed_volume") .name(VOLUME_NAME) - .fullName(VOLUME_FULL_NAME) .createdAt(new Date()) .updatedAt(new Date()) - .volumeId(UUID.randomUUID().toString()) + .id(UUID.randomUUID()) + .schemaId(UUID.fromString(schemaInfo.getSchemaId())) .build(); session.persist(managedVolume); session.getTransaction().commit(); @@ -159,9 +174,17 @@ public void testVolumeCRUD() throws ApiException { return volume.getName().equals(VOLUME_NAME); })); + //NOW Update the schema name + schemaOperations.updateSchema(SCHEMA_FULL_NAME, + new UpdateSchema().newName(SCHEMA_NEW_NAME).comment(SCHEMA_COMMENT)); + // get volume + VolumeInfo volumePostSchemaNameChange = volumeOperations.getVolume + (CATALOG_NAME + "." + SCHEMA_NEW_NAME + "." + VOLUME_NAME); + assertEquals(volumePostSchemaNameChange.getVolumeId(), managedVolumeInfo.getVolumeId()); + // Delete volume System.out.println("Testing delete volume.."); - volumeOperations.deleteVolume(VOLUME_FULL_NAME); - assertEquals(0, getSize(volumeOperations.listVolumes(CATALOG_NAME, SCHEMA_NAME))); + volumeOperations.deleteVolume(CATALOG_NAME + "." + SCHEMA_NEW_NAME + "." + VOLUME_NAME); + assertEquals(0, getSize(volumeOperations.listVolumes(CATALOG_NAME, SCHEMA_NEW_NAME))); } } \ No newline at end of file