diff --git a/etc/db/h2db.mv.db b/etc/db/h2db.mv.db index 2226ef7e9..0458fa978 100644 Binary files a/etc/db/h2db.mv.db and b/etc/db/h2db.mv.db differ diff --git a/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java b/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java deleted file mode 100644 index fa30ad7c0..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/CatalogOperations.java +++ /dev/null @@ -1,121 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CatalogInfo; -import io.unitycatalog.server.model.ListCatalogsResponse; -import io.unitycatalog.server.persist.dao.CatalogInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.query.Query; -import io.unitycatalog.server.model.CreateCatalog; -import io.unitycatalog.server.model.UpdateCatalog; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.stream.Collectors; - -public class CatalogOperations { - @Getter - private static final CatalogOperations instance = new CatalogOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(CatalogOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - private CatalogOperations() {} - - public CatalogInfo addCatalog(CreateCatalog createCatalog) { - ValidationUtils.validateSqlObjectName(createCatalog.getName()); - CatalogInfoDAO catalogInfo = new CatalogInfoDAO(); - catalogInfo.setId(java.util.UUID.randomUUID()); - catalogInfo.setName(createCatalog.getName()); - catalogInfo.setComment(createCatalog.getComment()); - catalogInfo.setCreatedAt(new Date()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - session.persist(catalogInfo); - tx.commit(); - System.out.println("Added catalog: " + catalogInfo.getName()); - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } catch(Exception e) { - LOGGER.error("Error adding catalog", e); - return null; - } - } - - public ListCatalogsResponse listCatalogs() { - ListCatalogsResponse response = new ListCatalogsResponse(); - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - response.setCatalogs(session - .createQuery("from CatalogInfoDAO ", CatalogInfoDAO.class).list() - .stream().map(CatalogInfoDAO::toCatalogInfo).collect(Collectors.toList())); - return response; - } catch(Exception e) { - LOGGER.error("Error listing catalogs", e); - return null; - } - } - - public CatalogInfo getCatalog(String name) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - return getCatalog(session, name); - } catch(Exception e) { - LOGGER.error("Error getting catalog", e); - return null; - } - } - - public CatalogInfo getCatalog(Session session, String name) { - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - if (catalogInfo == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); - } - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } - - public CatalogInfoDAO getCatalogInfoDAO(Session session, String name) { - Query query = session - .createQuery("FROM CatalogInfoDAO WHERE name = :value", CatalogInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - return query.uniqueResult(); - } - - public CatalogInfo updateCatalog(String name, UpdateCatalog updateCatalog) { - ValidationUtils.validateSqlObjectName(updateCatalog.getNewName()); - // cna make this just update once we have an identifier that is not the name - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - catalogInfo.setName(updateCatalog.getNewName()); - catalogInfo.setComment(updateCatalog.getComment()); - catalogInfo.setUpdatedAt(new Date()); - session.merge(catalogInfo); - tx.commit(); - return CatalogInfoDAO.toCatalogInfo(catalogInfo); - } catch(Exception e) { - LOGGER.error("Error updating catalog", e); - return null; - } - } - - public void deleteCatalog(String name) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - CatalogInfoDAO catalogInfo = getCatalogInfoDAO(session, name); - if (catalogInfo != null) { - session.remove(catalogInfo); - tx.commit(); - LOGGER.info("Deleted catalog: {}", catalogInfo.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); - } - } catch (Exception e) { - LOGGER.error("Error deleting catalog", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java b/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java new file mode 100644 index 000000000..dd78a44b7 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/CatalogRepository.java @@ -0,0 +1,146 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.CatalogInfo; +import io.unitycatalog.server.model.ListCatalogsResponse; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.query.Query; +import io.unitycatalog.server.model.CreateCatalog; +import io.unitycatalog.server.model.UpdateCatalog; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.stream.Collectors; + +public class CatalogRepository { + @Getter + private static final CatalogRepository instance = new CatalogRepository(); + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + private CatalogRepository() {} + + public CatalogInfo addCatalog(CreateCatalog createCatalog) { + ValidationUtils.validateSqlObjectName(createCatalog.getName()); + CatalogInfoDAO catalogInfo = new CatalogInfoDAO(); + catalogInfo.setId(java.util.UUID.randomUUID()); + catalogInfo.setName(createCatalog.getName()); + catalogInfo.setComment(createCatalog.getComment()); + catalogInfo.setCreatedAt(new Date()); + + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + if (getCatalogDAO(session, createCatalog.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Catalog already exists: " + createCatalog.getName()); + } + session.persist(catalogInfo); + tx.commit(); + System.out.println("Added catalog: " + catalogInfo.getName()); + return CatalogInfoDAO.toCatalogInfo(catalogInfo); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public ListCatalogsResponse listCatalogs() { + ListCatalogsResponse response = new ListCatalogsResponse(); + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + response.setCatalogs(session + .createQuery("from CatalogInfoDAO ", CatalogInfoDAO.class).list() + .stream().map(CatalogInfoDAO::toCatalogInfo).collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + return response; + } + } + + public CatalogInfo getCatalog(String name) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + CatalogInfoDAO catalogInfo = null; + try { + catalogInfo = getCatalogDAO(session, name); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + if (catalogInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + return CatalogInfoDAO. + toCatalogInfo(getCatalogDAO(session, name)); + } + } + + public CatalogInfoDAO getCatalogDAO(Session session, String name) { + Query query = session + .createQuery("FROM CatalogInfoDAO WHERE name = :value", CatalogInfoDAO.class); + query.setParameter("value", name); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public CatalogInfo updateCatalog(String name, UpdateCatalog updateCatalog) { + ValidationUtils.validateSqlObjectName(updateCatalog.getNewName()); + // cna make this just update once we have an identifier that is not the name + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + CatalogInfoDAO catalogInfo = getCatalogDAO(session, name); + if (catalogInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + if (getCatalogDAO(session, updateCatalog.getNewName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Catalog already exists: " + updateCatalog.getNewName()); + } + catalogInfo.setName(updateCatalog.getNewName()); + catalogInfo.setComment(updateCatalog.getComment()); + catalogInfo.setUpdatedAt(new Date()); + session.merge(catalogInfo); + tx.commit(); + return CatalogInfoDAO.toCatalogInfo(catalogInfo); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteCatalog(String name) { + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + CatalogInfoDAO catalogInfo = getCatalogDAO(session, name); + if (catalogInfo != null) { + session.remove(catalogInfo); + tx.commit(); + LOGGER.info("Deleted catalog: {}", catalogInfo.getName()); + } else { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + name); + } + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java b/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java index da8c79410..58387a594 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java +++ b/server/src/main/java/io/unitycatalog/server/persist/FunctionRepository.java @@ -2,11 +2,9 @@ import io.unitycatalog.server.exception.BaseException; import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CreateFunction; -import io.unitycatalog.server.model.CreateFunctionRequest; -import io.unitycatalog.server.model.FunctionInfo; -import io.unitycatalog.server.model.ListFunctionsResponse; +import io.unitycatalog.server.model.*; import io.unitycatalog.server.persist.dao.FunctionInfoDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; import io.unitycatalog.server.utils.ValidationUtils; import lombok.Getter; import org.hibernate.Session; @@ -24,6 +22,7 @@ public class FunctionRepository { @Getter private static final FunctionRepository instance = new FunctionRepository(); + private static final SchemaRepository schemaRepository = SchemaRepository.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(FunctionRepository.class); private static final SessionFactory SESSION_FACTORY = HibernateUtil.getSessionFactory(); @@ -58,77 +57,142 @@ public FunctionInfo createFunction(CreateFunctionRequest createFunctionRequest) try (Session session = SESSION_FACTORY.openSession()) { Transaction tx = session.beginTransaction(); - FunctionInfoDAO dao = FunctionInfoDAO.from(functionInfo); - dao.getInputParams().forEach(p -> { - p.setId(UUID.randomUUID().toString()); - p.setFunction(dao); - }); - dao.getReturnParams().forEach(p -> { - p.setId(UUID.randomUUID().toString()); - p.setFunction(dao); - }); - session.persist(dao); - tx.commit(); - return functionInfo; - } catch(Exception e) { - LOGGER.error("Error adding function", e); - return null; + try { + String catalogName = createFunction.getCatalogName(); + String schemaName = createFunction.getSchemaName(); + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, + catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + if (getFunctionDAO(session, catalogName, schemaName, createFunction.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, "Function already exists: " + createFunction.getName()); + } + FunctionInfoDAO dao = FunctionInfoDAO.from(functionInfo); + dao.setSchemaId(schemaInfo.getId()); + dao.getInputParams().forEach(p -> { + p.setId(UUID.randomUUID().toString()); + p.setFunction(dao); + }); + dao.getReturnParams().forEach(p -> { + p.setId(UUID.randomUUID().toString()); + p.setFunction(dao); + }); + session.persist(dao); + tx.commit(); + return functionInfo; + } catch (Exception e) { + tx.rollback(); + throw e; + } } } public ListFunctionsResponse listFunctions(String catalogName, String schemaName, Optional maxResults, Optional nextPageToken) { ListFunctionsResponse response = new ListFunctionsResponse(); try (Session session = SESSION_FACTORY.openSession()) { - String queryString = "from FunctionInfoDAO f where f.catalogName = :catalogName and f.schemaName = :schemaName"; - Query query = session.createQuery(queryString, FunctionInfoDAO.class); - query.setParameter("catalogName", catalogName); - query.setParameter("schemaName", schemaName); - - maxResults.ifPresent(query::setMaxResults); - - if (nextPageToken.isPresent()) { - // Perform pagination logic here if needed - // Example: query.setFirstResult(startIndex); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName + "." + schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + String queryString = "from FunctionInfoDAO f where f.schemaId = :schemaId"; + Query query = session.createQuery(queryString, FunctionInfoDAO.class); + query.setParameter("schemaId", schemaInfo.getId()); + maxResults.ifPresent(query::setMaxResults); + if (nextPageToken.isPresent()) { + // Perform pagination logic here if needed + // Example: query.setFirstResult(startIndex); + } + List functions = query.list(); + response.setFunctions( + functions.stream().map(FunctionInfoDAO::toFunctionInfo) + .peek(f -> addNamespaceInfo(f, catalogName, schemaName)) + .collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; } - List functions = query.list(); - response.setFunctions(functions.stream().map(FunctionInfoDAO::toFunctionInfo).collect(Collectors.toList())); - return response; - } catch(Exception e) { - LOGGER.error("Error listing functions", e); - return null; } + return response; } public FunctionInfo getFunction(String name) { + FunctionInfo functionInfo = null; try (Session session = SESSION_FACTORY.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM FunctionInfoDAO WHERE fullName = :value", FunctionInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - FunctionInfoDAO functionInfoDAO = query.uniqueResult(); - return functionInfoDAO == null ? null : functionInfoDAO.toFunctionInfo(); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String[] parts = name.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid function name: " + name); + } + String catalogName = parts[0], schemaName = parts[1], functionName = parts[2]; + FunctionInfoDAO functionInfoDAO = getFunctionDAO(session, catalogName, schemaName, functionName); + if (functionInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } + functionInfo = functionInfoDAO.toFunctionInfo(); + addNamespaceInfo(functionInfo, catalogName, schemaName); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } } catch (Exception e) { LOGGER.error("Error getting function", e); return null; } + return functionInfo; + } + + public void addNamespaceInfo(FunctionInfo functionInfo, String catalogName, String schemaName) { + functionInfo.setCatalogName(catalogName); + functionInfo.setSchemaName(schemaName); + functionInfo.setFullName(catalogName + "." + schemaName + "." + functionInfo.getName()); + } + + public FunctionInfoDAO getFunctionDAO(Session session, String catalogName, String schemaName, String functionName) { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + Query query = session.createQuery( + "FROM FunctionInfoDAO WHERE name = :name and schemaId = :schemaId", FunctionInfoDAO.class); + query.setParameter("name", functionName); + query.setParameter("schemaId", schemaInfo.getId()); + query.setMaxResults(1); + return query.uniqueResult(); } public void deleteFunction(String name, Boolean force) { try (Session session = SESSION_FACTORY.openSession()) { Transaction tx = session.beginTransaction(); - Query query = session.createQuery("FROM FunctionInfoDAO WHERE fullName = :value", FunctionInfoDAO.class); - query.setParameter("value", name); - query.setMaxResults(1); - FunctionInfoDAO functionInfoDAO = query.uniqueResult(); - if (functionInfoDAO != null) { + try { + String[] parts = name.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid function name: " + name); + } + String catalogName = parts[0], schemaName = parts[1], functionName = parts[2]; + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, + catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); + } + FunctionInfoDAO functionInfoDAO = getFunctionDAO(session, catalogName, schemaName, functionName); + if (functionInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } session.remove(functionInfoDAO); tx.commit(); LOGGER.info("Deleted function: {}", functionInfoDAO.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Function not found: " + name); + } catch (Exception e) { + tx.rollback(); + throw e; } - } catch (Exception e) { - LOGGER.error("Error deleting function", e); } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java b/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java deleted file mode 100644 index 3e6da606b..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/SchemaOperations.java +++ /dev/null @@ -1,133 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.CreateSchema; -import io.unitycatalog.server.model.ListSchemasResponse; -import io.unitycatalog.server.model.SchemaInfo; -import io.unitycatalog.server.persist.dao.SchemaInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.query.Query; -import io.unitycatalog.server.model.UpdateSchema; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; - -public class SchemaOperations { - @Getter - public static final SchemaOperations instance = new SchemaOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(SchemaOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - - private SchemaOperations() {} - - public SchemaInfo createSchema(CreateSchema createSchema) { - ValidationUtils.validateSqlObjectName(createSchema.getName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = new SchemaInfoDAO(); - schemaInfo.setSchemaId(UUID.randomUUID()); - schemaInfo.setName(createSchema.getName()); - schemaInfo.setCatalogName(createSchema.getCatalogName()); - schemaInfo.setComment(createSchema.getComment()); - //schemaInfo.setProperties(createSchema.getProperties()); - schemaInfo.setFullName(createSchema.getCatalogName() + "." + createSchema.getName()); - schemaInfo.setCreatedAt(new Date()); - schemaInfo.setUpdatedAt(null); - session.persist(schemaInfo); - tx.commit(); - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } catch (Exception e) { - LOGGER.error("Error creating schema", e); - return null; - } - } - - public ListSchemasResponse listSchemas(String catalogName, Optional maxResults, Optional pageToken) { - try (Session session = sessionFactory.openSession()) { - // TODO: Implement pagination and filtering if required - // For now, returning all schemas without pagination - session.beginTransaction(); - ListSchemasResponse response = new ListSchemasResponse(); - Query query = session.createQuery("FROM SchemaInfoDAO WHERE catalogName = :value", SchemaInfoDAO.class); - query.setParameter("value", catalogName); - response.setSchemas(query.list().stream().map(SchemaInfoDAO::toSchemaInfo).collect(Collectors.toList())); - return response; - } catch (Exception e) { - LOGGER.error("Error listing schemas", e); - return null; - } - } - - public SchemaInfo getSchema(String fullName) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - return getSchema(session, fullName); - } catch (Exception e) { - LOGGER.error("Error getting schema", e); - return null; - } - } - - public SchemaInfoDAO getSchemaInfoDAO(Session session, String fullName) { - Query query = session.createQuery("FROM SchemaInfoDAO WHERE fullName = :value", SchemaInfoDAO.class); - query.setParameter("value", fullName); - query.setMaxResults(1); - return query.uniqueResult(); - } - - public SchemaInfo getSchema(Session session, String fullName) { - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - if (schemaInfo == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); - } - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } - - public SchemaInfo updateSchema(String fullName, UpdateSchema updateSchema) { - ValidationUtils.validateSqlObjectName(updateSchema.getNewName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - - // Update the schema with new values - if (updateSchema.getComment() != null) { - schemaInfo.setComment(updateSchema.getComment()); - } - if (updateSchema.getNewName() != null) { - schemaInfo.setName(updateSchema.getNewName()); - schemaInfo.setFullName(schemaInfo.getCatalogName() + "." + updateSchema.getNewName()); - } - schemaInfo.setUpdatedAt(new Date()); - session.merge(schemaInfo); - tx.commit(); - return SchemaInfoDAO.toSchemaInfo(schemaInfo); - } catch (Exception e) { - LOGGER.error("Error updating schema", e); - return null; - } - } - - public void deleteSchema(String fullName) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - SchemaInfoDAO schemaInfo = getSchemaInfoDAO(session, fullName); - if (schemaInfo != null) { - session.remove(schemaInfo); - tx.commit(); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); - } - } catch (Exception e) { - LOGGER.error("Error deleting schema", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java b/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java new file mode 100644 index 000000000..9017b8722 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/SchemaRepository.java @@ -0,0 +1,197 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.*; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.query.Query; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public class SchemaRepository { + @Getter + public static final SchemaRepository instance = new SchemaRepository(); + @Getter + public static final CatalogRepository catalogRepository = CatalogRepository.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + + private SchemaRepository() {} + + public SchemaInfo createSchema(CreateSchema createSchema) { + ValidationUtils.validateSqlObjectName(createSchema.getName()); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + if (getSchemaDAO(session, createSchema.getCatalogName(), createSchema.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Schema already exists: " + createSchema.getName()); + } + CatalogInfoDAO catalogDAO = catalogRepository + .getCatalogDAO(session, createSchema.getCatalogName()); + SchemaInfoDAO schemaInfo = new SchemaInfoDAO(); + schemaInfo.setId(UUID.randomUUID()); + schemaInfo.setName(createSchema.getName()); + schemaInfo.setCatalogId(catalogDAO.getId()); + schemaInfo.setComment(createSchema.getComment()); + schemaInfo.setCreatedAt(new Date()); + schemaInfo.setUpdatedAt(null); + session.persist(schemaInfo); + tx.commit(); + SchemaInfo toReturn = SchemaInfoDAO.toSchemaInfo(schemaInfo); + addNamespaceData(toReturn, createSchema.getCatalogName()); + return toReturn; + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + private void addNamespaceData(SchemaInfo schemaInfo, String catalogName) { + schemaInfo.setCatalogName(catalogName); + schemaInfo.setFullName(catalogName + "." + schemaInfo.getName()); + } + + private SchemaInfo convertFromDAO(SchemaInfoDAO schemaInfoDAO, String fullName) { + String catalogName = fullName.split("\\.")[0]; + SchemaInfo schemaInfo = SchemaInfoDAO.toSchemaInfo(schemaInfoDAO); + addNamespaceData(schemaInfo, catalogName); + return schemaInfo; + } + + public SchemaInfoDAO getSchemaDAO(Session session, UUID catalogId, String schemaName) { + Query query = session + .createQuery("FROM SchemaInfoDAO WHERE name = :name and catalogId = :catalogId", SchemaInfoDAO.class); + query.setParameter("name", schemaName); + query.setParameter("catalogId", catalogId); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public SchemaInfoDAO getSchemaDAO(Session session, String catalogName, String schemaName) { + CatalogInfoDAO catalog = catalogRepository.getCatalogDAO(session, catalogName); + if (catalog == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); + } + return getSchemaDAO(session, catalog.getId(), schemaName); + } + + public SchemaInfoDAO getSchemaDAO(Session session, String fullName) { + String[] namespace = fullName.split("\\."); + return getSchemaDAO(session, namespace[0], namespace[1]); + } + + public ListSchemasResponse listSchemas(String catalogName, Optional maxResults, + Optional pageToken) { + try (Session session = sessionFactory.openSession()) { + ListSchemasResponse response = new ListSchemasResponse(); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + // TODO: Implement pagination and filtering if required + // For now, returning all schemas without pagination + try { + CatalogInfoDAO catalog = catalogRepository.getCatalogDAO(session, catalogName); + if (catalog == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); + } + + Query query = session + .createQuery("FROM SchemaInfoDAO WHERE catalogId = :value", SchemaInfoDAO.class); + query.setParameter("value", catalog.getId()); + response.setSchemas(query.list().stream().map(SchemaInfoDAO::toSchemaInfo) + .peek(x -> addNamespaceData(x, catalogName)) + .collect(Collectors.toList())); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + return response; + } + } + + public SchemaInfo getSchema(String fullName) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + SchemaInfoDAO schemaInfo = null; + try { + schemaInfo = getSchemaDAO(session, fullName); + tx.commit(); + } catch (Exception e) { + tx.rollback(); + throw e; + } + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + fullName); + } + return convertFromDAO(schemaInfo, fullName); + } + } + + public SchemaInfo updateSchema(String fullName, UpdateSchema updateSchema) { + ValidationUtils.validateSqlObjectName(updateSchema.getNewName()); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = getSchemaDAO(session, fullName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + fullName); + } + if (updateSchema.getNewName() != null) { + if (getSchemaDAO(session, fullName.split("\\.")[0], updateSchema + .getNewName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Schema already exists: " + updateSchema.getNewName()); + } + } + // Update the schema with new values + if (updateSchema.getComment() != null) { + schemaInfo.setComment(updateSchema.getComment()); + } + if (updateSchema.getNewName() != null) { + schemaInfo.setName(updateSchema.getNewName()); + } + schemaInfo.setUpdatedAt(new Date()); + session.merge(schemaInfo); + tx.commit(); + return convertFromDAO(schemaInfo, fullName); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteSchema(String fullName) { + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = getSchemaDAO(session, fullName); + if (schemaInfo != null) { + session.remove(schemaInfo); + tx.commit(); + } else { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + fullName); + } + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java b/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java index 8e86449f0..f76ed6606 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java +++ b/server/src/main/java/io/unitycatalog/server/persist/TableRepository.java @@ -3,7 +3,9 @@ import io.unitycatalog.server.exception.BaseException; import io.unitycatalog.server.model.*; import io.unitycatalog.server.persist.converters.TableInfoConverter; +import io.unitycatalog.server.persist.dao.CatalogInfoDAO; import io.unitycatalog.server.persist.dao.PropertyDAO; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; import io.unitycatalog.server.persist.dao.TableInfoDAO; import io.unitycatalog.server.utils.ValidationUtils; import lombok.Getter; @@ -22,49 +24,66 @@ public class TableRepository { private static final TableRepository instance = new TableRepository(); private static final Logger LOGGER = LoggerFactory.getLogger(TableRepository.class); private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - private static final CatalogOperations catalogOperations = CatalogOperations.getInstance(); - private static final SchemaOperations schemaOperations = SchemaOperations.getInstance(); + private static final CatalogRepository catalogOperations = CatalogRepository.getInstance(); + private static final SchemaRepository schemaOperations = SchemaRepository.getInstance(); private TableRepository() {} public TableInfo getTableById(String tableId) { LOGGER.debug("Getting table by id: " + tableId); try (Session session = sessionFactory.openSession()) { - TableInfoDAO tableInfoDAO = session.get(TableInfoDAO.class, UUID.fromString(tableId)); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + tableId); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + TableInfoDAO tableInfoDAO = session.get(TableInfoDAO.class, UUID.fromString(tableId)); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + tableId); + } + TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); + tx.commit(); + return tableInfo; + } catch (Exception e) { + tx.rollback(); + throw e; } - - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); - tableInfo.setProperties(TableInfoConverter - .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); - return tableInfo; } } public TableInfo getTable(String fullName) { LOGGER.debug("Getting table: " + fullName); + TableInfo tableInfo = null; try (Session session = sessionFactory.openSession()) { - String[] parts = fullName.split("\\."); - if (parts.length != 3) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid table name: " + fullName); - } - String catalogName = parts[0]; - String schemaName = parts[1]; - String tableName = parts[2]; - TableInfoDAO tableInfoDAO = findTable(session, catalogName, schemaName, tableName); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String[] parts = fullName.split("\\."); + if (parts.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid table name: " + fullName); + } + String catalogName = parts[0]; + String schemaName = parts[1]; + String tableName = parts[2]; + TableInfoDAO tableInfoDAO = findTable(session, catalogName, schemaName, tableName); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + } + tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + tableInfo.setProperties(TableInfoConverter + .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); + tableInfo.setCatalogName(catalogName); + tableInfo.setSchemaName(schemaName); + tx.commit(); + } catch (Exception e) { + if (tx != null && tx.getStatus().canRollback()) { + tx.rollback(); + } + throw e; } - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); - tableInfo.setProperties(TableInfoConverter - .convertPropertiesToMap(findProperties(session, tableInfoDAO.getId()))); - tableInfo.setCatalogName(catalogName); - tableInfo.setSchemaName(schemaName); - return tableInfo; } + return tableInfo; } private List findProperties(Session session, UUID tableId) { @@ -161,18 +180,11 @@ private String getTableFullName(TableInfo tableInfo) { } public String getSchemaId(Session session, String catalogName, String schemaName) { - - CatalogInfo catalogInfoDAO = catalogOperations.getCatalog(session, catalogName); - if (catalogInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Catalog not found: " + catalogName); - } - SchemaInfo schemaInfo = schemaOperations.getSchema(session, catalogName - + "." + schemaName); + SchemaInfoDAO schemaInfo = schemaOperations.getSchemaDAO(session, catalogName, schemaName); if (schemaInfo == null) { throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + schemaName); } - return schemaInfo.getSchemaId(); - + return schemaInfo.getId().toString(); } public static Date convertMillisToDate(String millisString) { @@ -206,37 +218,45 @@ public static String getNextPageToken(List tables) { * @return */ public ListTablesResponse listTables(String catalogName, - String schemaName, - Integer maxResults, - String nextPageToken, - Boolean omitProperties, - Boolean omitColumns) { + String schemaName, + Integer maxResults, + String nextPageToken, + Boolean omitProperties, + Boolean omitColumns) { List result = new ArrayList<>(); String returnNextPageToken = null; String hql = "FROM TableInfoDAO t WHERE t.schemaId = :schemaId and " + "(t.updatedAt < :pageToken OR :pageToken is null) order by t.updatedAt desc"; try (Session session = sessionFactory.openSession()) { - String schemaId = getSchemaId(session, catalogName, schemaName); - Query query = session.createQuery(hql, TableInfoDAO.class); - query.setParameter("schemaId", UUID.fromString(schemaId)); - query.setParameter("pageToken", convertMillisToDate(nextPageToken)); - query.setMaxResults(maxResults); - List tableInfoDAOList = query.list(); - - returnNextPageToken = getNextPageToken(tableInfoDAOList); - - for (TableInfoDAO tableInfoDAO : tableInfoDAOList) { - TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); - if (!omitColumns) { - tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + String schemaId = getSchemaId(session, catalogName, schemaName); + Query query = session.createQuery(hql, TableInfoDAO.class); + query.setParameter("schemaId", UUID.fromString(schemaId)); + query.setParameter("pageToken", convertMillisToDate(nextPageToken)); + query.setMaxResults(maxResults); + List tableInfoDAOList = query.list(); + returnNextPageToken = getNextPageToken(tableInfoDAOList); + for (TableInfoDAO tableInfoDAO : tableInfoDAOList) { + TableInfo tableInfo = TableInfoConverter.convertToDTO(tableInfoDAO); + if (!omitColumns) { + tableInfo.setColumns(TableInfoConverter.convertColumnsToDTO(tableInfoDAO.getColumns())); + } + if (!omitProperties) { + tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap( + findProperties(session, tableInfoDAO.getId()))); + } + tableInfo.setCatalogName(catalogName); + tableInfo.setSchemaName(schemaName); + result.add(tableInfo); } - if (!omitProperties) { - tableInfo.setProperties(TableInfoConverter.convertPropertiesToMap( - findProperties(session, tableInfoDAO.getId()))); + tx.commit(); + } catch (Exception e) { + if (tx != null && tx.getStatus().canRollback()) { + tx.rollback(); } - tableInfo.setCatalogName(catalogName); - tableInfo.setSchemaName(schemaName); - result.add(tableInfo); + throw e; } } return new ListTablesResponse().tables(result).nextPageToken(returnNextPageToken); @@ -252,22 +272,20 @@ public void deleteTable(String fullName) { String catalogName = parts[0]; String schemaName = parts[1]; String tableName = parts[2]; - - String schemaId = getSchemaId(session, catalogName, schemaName); - - TableInfoDAO tableInfoDAO = findBySchemaIdAndName(session, schemaId, tableName); - if (tableInfoDAO == null) { - throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); - } Transaction tx = session.beginTransaction(); try { + String schemaId = getSchemaId(session, catalogName, schemaName); + TableInfoDAO tableInfoDAO = findBySchemaIdAndName(session, schemaId, tableName); + if (tableInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Table not found: " + fullName); + } if (TableType.MANAGED.getValue().equals(tableInfoDAO.getType())) { - FileUtils.deleteDirectory(tableInfoDAO.getUrl()); + try { + FileUtils.deleteDirectory(tableInfoDAO.getUrl()); + } catch (Throwable e) { + LOGGER.error("Error deleting table directory: " + tableInfoDAO.getUrl()); + } } - } catch (Throwable e) { - LOGGER.error("Error deleting table directory: " + tableInfoDAO.getUrl()); - } - try { findProperties(session, tableInfoDAO.getId()).forEach(session::remove); session.remove(tableInfoDAO); tx.commit(); @@ -280,4 +298,4 @@ public void deleteTable(String fullName) { } } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java b/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java deleted file mode 100644 index 5be826884..000000000 --- a/server/src/main/java/io/unitycatalog/server/persist/VolumeOperations.java +++ /dev/null @@ -1,163 +0,0 @@ -package io.unitycatalog.server.persist; - -import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.exception.ErrorCode; -import io.unitycatalog.server.model.*; -import io.unitycatalog.server.persist.converters.VolumeInfoConverter; -import io.unitycatalog.server.persist.dao.VolumeInfoDAO; -import io.unitycatalog.server.utils.ValidationUtils; -import lombok.Getter; -import org.hibernate.Session; -import org.hibernate.SessionFactory; -import org.hibernate.Transaction; -import org.hibernate.query.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Optional; -import java.util.UUID; -import java.util.stream.Collectors; - -public class VolumeOperations { - - @Getter - public static final VolumeOperations instance = new VolumeOperations(); - private static final Logger LOGGER = LoggerFactory.getLogger(VolumeOperations.class); - private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); - - private VolumeOperations() {} - - public VolumeInfo createVolume(CreateVolumeRequestContent createVolumeRequest) { - ValidationUtils.validateSqlObjectName(createVolumeRequest.getName()); - String volumeFullName = createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName() + "." + createVolumeRequest.getName(); - VolumeInfo volumeInfo = new VolumeInfo(); - volumeInfo.setVolumeId(UUID.randomUUID().toString()); - volumeInfo.setCatalogName(createVolumeRequest.getCatalogName()); - volumeInfo.setSchemaName(createVolumeRequest.getSchemaName()); - volumeInfo.setName(createVolumeRequest.getName()); - volumeInfo.setComment(createVolumeRequest.getComment()); - volumeInfo.setFullName(volumeFullName); - volumeInfo.setCreatedAt(System.currentTimeMillis()); - volumeInfo.setVolumeType(createVolumeRequest.getVolumeType()); - if (VolumeType.MANAGED.equals(createVolumeRequest.getVolumeType())) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Managed volume creation is not supported"); - } - if (createVolumeRequest.getStorageLocation() == null) { - throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Storage location is required for external volume"); - } - volumeInfo.setStorageLocation(createVolumeRequest.getStorageLocation()); - VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - session.persist(volumeInfoDAO); - tx.commit(); - LOGGER.info("Added volume: {}", volumeInfo.getName()); - return VolumeInfoConverter.fromDAO(volumeInfoDAO); - } catch (Exception e) { - LOGGER.error("Error adding volume", e); - return null; - } - } - - public VolumeInfo getVolume(String fullName) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM VolumeInfoDAO WHERE fullName = :value", VolumeInfoDAO.class); - query.setParameter("value", fullName); - query.setMaxResults(1); - return VolumeInfoConverter.fromDAO(query.uniqueResult()); - } catch (Exception e) { - LOGGER.error("Error getting volume", e); - return null; - } - } - - public VolumeInfo getVolumeById(String volumeId) { - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - Query query = session.createQuery("FROM VolumeInfoDAO WHERE volumeId = :value", VolumeInfoDAO.class); - query.setParameter("value", volumeId); - query.setMaxResults(1); - return VolumeInfoConverter.fromDAO(query.uniqueResult()); - } catch (Exception e) { - throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + volumeId); - } - } - - public ListVolumesResponseContent listVolumes(String catalogName, String schemaName, Optional maxResults, Optional pageToken, Optional includeBrowse) { - ListVolumesResponseContent responseContent = new ListVolumesResponseContent(); - try (Session session = sessionFactory.openSession()) { - session.beginTransaction(); - - String queryString = "from VolumeInfoDAO v where v.catalogName = :catalogName and v.schemaName = :schemaName"; - Query query = session.createQuery(queryString, VolumeInfoDAO.class); - query.setParameter("catalogName", catalogName); - query.setParameter("schemaName", schemaName); - - maxResults.ifPresent(query::setMaxResults); - - if (pageToken.isPresent()) { - // Perform pagination logic here if needed - // Example: query.setFirstResult(startIndex); - } - - responseContent.setVolumes(query.list().stream() - .map(VolumeInfoConverter::fromDAO).collect(Collectors.toList())); - return responseContent; - } catch (Exception e) { - LOGGER.error("Error listing volumes", e); - return null; - } - } - - public VolumeInfo updateVolume(String name, UpdateVolumeRequestContent updateVolumeRequest) { - ValidationUtils.validateSqlObjectName(updateVolumeRequest.getNewName()); - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - VolumeInfoDAO volumeInfo = VolumeInfoConverter.toDAO(getVolume(name)); - if (volumeInfo != null) { - if (updateVolumeRequest.getNewName() != null) { - volumeInfo.setName(updateVolumeRequest.getNewName()); - String fullName = volumeInfo.getCatalogName() + "." + volumeInfo.getSchemaName() + "." + updateVolumeRequest.getNewName(); - volumeInfo.setFullName(fullName); - } - if (updateVolumeRequest.getComment() != null) { - volumeInfo.setComment(updateVolumeRequest.getComment()); - } - volumeInfo.setUpdatedAt(new Date()); - session.merge(volumeInfo); - tx.commit(); - LOGGER.info("Updated volume: {}", volumeInfo.getName()); - } - return VolumeInfoConverter.fromDAO(volumeInfo); - } catch (Exception e) { - LOGGER.error("Error updating volume", e); - return null; - } - } - - public void deleteVolume(String name) { - try (Session session = sessionFactory.openSession()) { - Transaction tx = session.beginTransaction(); - VolumeInfo volumeInfo = getVolume(name); - VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); - if (volumeInfoDAO != null) { - if (VolumeType.MANAGED.getValue().equals(volumeInfoDAO.getVolumeType())) { - try { - FileUtils.deleteDirectory(volumeInfo.getStorageLocation()); - } catch (Exception e) { - LOGGER.error("Error deleting volume directory", e); - } - } - session.remove(volumeInfoDAO); - tx.commit(); - LOGGER.info("Deleted volume: {}", volumeInfo.getName()); - } else { - throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); - } - } catch (Exception e) { - LOGGER.error("Error deleting volume", e); - } - } -} diff --git a/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java b/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java new file mode 100644 index 000000000..4edc2a6db --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/persist/VolumeRepository.java @@ -0,0 +1,229 @@ +package io.unitycatalog.server.persist; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.exception.ErrorCode; +import io.unitycatalog.server.model.*; +import io.unitycatalog.server.persist.converters.VolumeInfoConverter; +import io.unitycatalog.server.persist.dao.SchemaInfoDAO; +import io.unitycatalog.server.persist.dao.VolumeInfoDAO; +import io.unitycatalog.server.utils.ValidationUtils; +import lombok.Getter; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.Transaction; +import org.hibernate.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +public class VolumeRepository { + + @Getter + public static final VolumeRepository instance = new VolumeRepository(); + public static final SchemaRepository schemaRepository = SchemaRepository.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(VolumeRepository.class); + private static final SessionFactory sessionFactory = HibernateUtil.getSessionFactory(); + + private VolumeRepository() {} + + public VolumeInfo createVolume(CreateVolumeRequestContent createVolumeRequest) { + ValidationUtils.validateSqlObjectName(createVolumeRequest.getName()); + String volumeFullName = createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName() + "." + createVolumeRequest.getName(); + VolumeInfo volumeInfo = new VolumeInfo(); + volumeInfo.setVolumeId(UUID.randomUUID().toString()); + volumeInfo.setCatalogName(createVolumeRequest.getCatalogName()); + volumeInfo.setSchemaName(createVolumeRequest.getSchemaName()); + volumeInfo.setName(createVolumeRequest.getName()); + volumeInfo.setComment(createVolumeRequest.getComment()); + volumeInfo.setFullName(volumeFullName); + volumeInfo.setCreatedAt(System.currentTimeMillis()); + volumeInfo.setVolumeType(createVolumeRequest.getVolumeType()); + if (VolumeType.MANAGED.equals(createVolumeRequest.getVolumeType())) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Managed volume creation is not supported"); + } + if (createVolumeRequest.getStorageLocation() == null) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Storage location is required for external volume"); + } + volumeInfo.setStorageLocation(createVolumeRequest.getStorageLocation()); + VolumeInfoDAO volumeInfoDAO = VolumeInfoConverter.toDAO(volumeInfo); + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfoDAO = schemaRepository.getSchemaDAO(session, + createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName()); + if (schemaInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, + "Schema not found: " + createVolumeRequest.getCatalogName() + "." + createVolumeRequest.getSchemaName()); + } + if (getVolumeDAO(session, createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName(), createVolumeRequest.getName()) != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, "Volume already exists: " + volumeFullName); + } + volumeInfoDAO.setSchemaId(schemaInfoDAO.getId()); + session.persist(volumeInfoDAO); + tx.commit(); + LOGGER.info("Added volume: {}", volumeInfo.getName()); + return convertFromDAO(volumeInfoDAO, createVolumeRequest.getCatalogName(), createVolumeRequest.getSchemaName()); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public VolumeInfo getVolume(String fullName) { + try (Session session = sessionFactory.openSession()) { + String[] namespace = fullName.split("\\."); + if (namespace.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid volume name: " + fullName); + } + String catalogName = namespace[0]; + String schemaName = namespace[1]; + String volumeName = namespace[2]; + return convertFromDAO(getVolumeDAO(session, catalogName, schemaName, volumeName), + catalogName, schemaName); + } catch (Exception e) { + LOGGER.error("Error getting volume", e); + return null; + } + } + + public VolumeInfoDAO getVolumeDAO(Session session, String catalogName, String schemaName, String volumeName) { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + catalogName + "." + schemaName); + } + Query query = session.createQuery( + "FROM VolumeInfoDAO WHERE name = :name and schemaId = :schemaId", VolumeInfoDAO.class); + query.setParameter("name", volumeName); + query.setParameter("schemaId", schemaInfo.getId()); + query.setMaxResults(1); + return query.uniqueResult(); + } + + public VolumeInfo getVolumeById(String volumeId) { + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + Query query = session.createQuery("FROM VolumeInfoDAO WHERE id = :value", VolumeInfoDAO.class); + query.setParameter("value", UUID.fromString(volumeId)); + query.setMaxResults(1); + VolumeInfoDAO volumeInfoDAO = query.uniqueResult(); + tx.commit(); + return VolumeInfoConverter.fromDAO(volumeInfoDAO); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public ListVolumesResponseContent listVolumes(String catalogName, String schemaName, Optional maxResults, Optional pageToken, Optional includeBrowse) { + ListVolumesResponseContent responseContent = new ListVolumesResponseContent(); + try (Session session = sessionFactory.openSession()) { + session.setDefaultReadOnly(true); + Transaction tx = session.beginTransaction(); + try { + SchemaInfoDAO schemaInfo = schemaRepository.getSchemaDAO(session, catalogName, schemaName); + if (schemaInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Schema not found: " + catalogName + "." + schemaName); + } + String queryString = "from VolumeInfoDAO v where v.schemaId = :schemaId"; + Query query = session.createQuery(queryString, VolumeInfoDAO.class); + query.setParameter("schemaId", schemaInfo.getId()); + maxResults.ifPresent(query::setMaxResults); + if (pageToken.isPresent()) { + // Perform pagination logic here if needed + // Example: query.setFirstResult(startIndex); + } + responseContent.setVolumes(query.list().stream() + .map(x -> convertFromDAO(x, catalogName, schemaName)) + .collect(Collectors.toList())); + tx.commit(); + return responseContent; + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + private VolumeInfo convertFromDAO(VolumeInfoDAO volumeInfoDAO, String catalogName, String schemaName) { + VolumeInfo volumeInfo = VolumeInfoConverter.fromDAO(volumeInfoDAO); + volumeInfo.setCatalogName(catalogName); + volumeInfo.setSchemaName(schemaName); + volumeInfo.setFullName(catalogName + "." + schemaName + "." + volumeInfo.getName()); + return volumeInfo; + } + + public VolumeInfo updateVolume(String name, UpdateVolumeRequestContent updateVolumeRequest) { + ValidationUtils.validateSqlObjectName(updateVolumeRequest.getNewName()); + String[] namespace =name.split("\\."); + String catalog = namespace[0], schema = namespace[1], volume = namespace[2]; + try (Session session = sessionFactory.openSession()) { + Transaction tx = session.beginTransaction(); + try { + VolumeInfoDAO volumeInfo = getVolumeDAO(session, catalog, schema, volume); + if (volumeInfo == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); + } + if (updateVolumeRequest.getNewName() != null) { + VolumeInfoDAO existingVolume = getVolumeDAO(session, catalog, schema, + updateVolumeRequest.getNewName()); + if (existingVolume != null) { + throw new BaseException(ErrorCode.ALREADY_EXISTS, + "Volume already exists: " + updateVolumeRequest.getNewName()); + } + } + if (updateVolumeRequest.getNewName() != null) { + volumeInfo.setName(updateVolumeRequest.getNewName()); + } + if (updateVolumeRequest.getComment() != null) { + volumeInfo.setComment(updateVolumeRequest.getComment()); + } + volumeInfo.setUpdatedAt(new Date()); + session.merge(volumeInfo); + tx.commit(); + LOGGER.info("Updated volume: {}", volumeInfo.getName()); + return convertFromDAO(volumeInfo, catalog, schema); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } + + public void deleteVolume(String name) { + try (Session session = sessionFactory.openSession()) { + String[] namespace = name.split("\\."); + if (namespace.length != 3) { + throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Invalid volume name: " + name); + } + String catalog = namespace[0], schema = namespace[1], volume = namespace[2]; + Transaction tx = session.beginTransaction(); + try { + VolumeInfoDAO volumeInfoDAO = getVolumeDAO(session, catalog, schema, volume); + if (volumeInfoDAO == null) { + throw new BaseException(ErrorCode.NOT_FOUND, "Volume not found: " + name); + } + if (VolumeType.MANAGED.getValue().equals(volumeInfoDAO.getVolumeType())) { + try { + FileUtils.deleteDirectory(volumeInfoDAO.getStorageLocation()); + } catch (Exception e) { + LOGGER.error("Error deleting volume directory", e); + } + } + session.remove(volumeInfoDAO); + tx.commit(); + LOGGER.info("Deleted volume: {}", volumeInfoDAO.getName()); + } catch (Exception e) { + tx.rollback(); + throw e; + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java b/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java index d681f292b..b5a6532e2 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java +++ b/server/src/main/java/io/unitycatalog/server/persist/converters/VolumeInfoConverter.java @@ -6,6 +6,7 @@ import io.unitycatalog.server.persist.dao.VolumeInfoDAO; import java.util.Date; +import java.util.UUID; public class VolumeInfoConverter { @@ -14,15 +15,12 @@ public static VolumeInfoDAO toDAO(VolumeInfo volumeInfo) { return null; } return VolumeInfoDAO.builder() - .volumeId(volumeInfo.getVolumeId()) + .id(UUID.fromString(volumeInfo.getVolumeId())) .name(volumeInfo.getName()) - .catalogName(volumeInfo.getCatalogName()) - .schemaName(volumeInfo.getSchemaName()) .comment(volumeInfo.getComment()) .storageLocation(volumeInfo.getStorageLocation()) .createdAt(volumeInfo.getCreatedAt() != null? new Date(volumeInfo.getCreatedAt()) : new Date()) .updatedAt(volumeInfo.getUpdatedAt() != null ? new Date(volumeInfo.getUpdatedAt()) : new Date()) - .fullName(volumeInfo.getFullName()) .volumeType(volumeInfo.getVolumeType().getValue()) .build(); } @@ -32,16 +30,13 @@ public static VolumeInfo fromDAO(VolumeInfoDAO dao) { return null; } return new VolumeInfo() - .volumeId(dao.getVolumeId()) + .volumeId(dao.getId().toString()) .name(dao.getName()) - .catalogName(dao.getCatalogName()) - .schemaName(dao.getSchemaName()) .comment(dao.getComment()) .storageLocation(FileUtils.convertRelativePathToURI(dao.getStorageLocation())) .createdAt(dao.getCreatedAt().getTime()) .updatedAt(dao.getUpdatedAt().getTime()) - .fullName(dao.getFullName()) .volumeType(VolumeType.valueOf(dao.getVolumeType())); } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java index 2a7ea9abb..192a36ca5 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java +++ b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionInfoDAO.java @@ -7,6 +7,7 @@ import io.unitycatalog.server.model.ColumnTypeName; import java.util.List; +import java.util.UUID; // Hibernate annotations @Entity @@ -20,17 +21,14 @@ @Builder public class FunctionInfoDAO { @Id - @Column(name = "function_id") - private String functionId; + @Column(name = "id", columnDefinition = "BINARY(16)") + private UUID id; @Column(name = "name", nullable = false) private String name; - @Column(name = "catalog_name", nullable = false) - private String catalogName; - - @Column(name = "schema_name", nullable = false) - private String schemaName; + @Column(name = "schema_id", columnDefinition = "BINARY(16)") + private UUID schemaId; @Column(name = "comment") private String comment; @@ -44,9 +42,6 @@ public class FunctionInfoDAO { @Column(name = "data_type") private ColumnTypeName dataType; - @Column(name = "full_name") - private String fullName; - @Column(name = "full_data_type") private String fullDataType; @@ -88,15 +83,12 @@ public class FunctionInfoDAO { public static FunctionInfoDAO from(FunctionInfo functionInfo) { FunctionInfoDAO functionInfoDAO = FunctionInfoDAO.builder() - .functionId(functionInfo.getFunctionId()) + .id(functionInfo.getFunctionId()!= null? UUID.fromString(functionInfo.getFunctionId()) : null) .name(functionInfo.getName()) - .catalogName(functionInfo.getCatalogName()) - .schemaName(functionInfo.getSchemaName()) .comment(functionInfo.getComment()) .createdAt(functionInfo.getCreatedAt()) .updatedAt(functionInfo.getUpdatedAt()) .dataType(functionInfo.getDataType()) - .fullName(functionInfo.getFullName()) .fullDataType(functionInfo.getFullDataType()) .externalLanguage(functionInfo.getExternalLanguage()) .isDeterministic(functionInfo.getIsDeterministic()) @@ -122,15 +114,12 @@ public static FunctionInfoDAO from(FunctionInfo functionInfo) { public FunctionInfo toFunctionInfo() { FunctionInfo functionInfo = new FunctionInfo() - .functionId(functionId) + .functionId(id.toString()) .name(name) - .catalogName(catalogName) - .schemaName(schemaName) .comment(comment) .createdAt(createdAt) .updatedAt(updatedAt) .dataType(dataType) - .fullName(fullName) .fullDataType(fullDataType) .externalLanguage(externalLanguage) .isDeterministic(isDeterministic) @@ -147,4 +136,4 @@ public FunctionInfo toFunctionInfo() { functionInfo.returnParams(FunctionParameterInfoDAO.toFunctionParameterInfos(returnParams)); return functionInfo; } -} +} \ No newline at end of file diff --git a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java index 8846e24fc..e4894a475 100644 --- a/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java +++ b/server/src/main/java/io/unitycatalog/server/persist/dao/FunctionParameterInfoDAO.java @@ -28,7 +28,7 @@ public enum InputOrReturnEnum { private String id; @ManyToOne - @JoinColumn(name = "function_id") + @JoinColumn(name = "function_id", referencedColumnName = "id") private FunctionInfoDAO function; // Whether the parameter is an input or return parameter @@ -119,4 +119,4 @@ public static FunctionParameterInfos toFunctionParameterInfos(List includeBrowse) { - VolumeInfo volumeInfo = volumeOperations.getVolume(fullName); - if (volumeInfo != null) { - return HttpResponse.ofJson(volumeInfo); - } - return ValidationUtils.entityNotFoundResponse(ValidationUtils.VOLUME, fullName); + return HttpResponse.ofJson(volumeOperations.getVolume(fullName)); } @Patch("/{full_name}") public HttpResponse updateVolume(@Param("full_name") String fullName, UpdateVolumeRequestContent updateVolumeRequest) { - VolumeInfo updatedVolume = volumeOperations.updateVolume(fullName, updateVolumeRequest); - if (updatedVolume != null) { - return HttpResponse.ofJson(updatedVolume); - } - return ValidationUtils.entityNotFoundResponse(ValidationUtils.VOLUME, fullName); + return HttpResponse.ofJson(volumeOperations.updateVolume(fullName, updateVolumeRequest)); } @Delete("/{full_name}") diff --git a/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java b/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java index e6a964241..a5e4786f8 100644 --- a/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java +++ b/server/src/main/java/io/unitycatalog/server/utils/ValidationUtils.java @@ -1,62 +1,22 @@ package io.unitycatalog.server.utils; import io.unitycatalog.server.exception.BaseException; -import io.unitycatalog.server.persist.FunctionRepository; -import io.unitycatalog.server.persist.SchemaOperations; +import io.unitycatalog.server.persist.*; import com.linecorp.armeria.common.HttpResponse; -import io.unitycatalog.server.persist.CatalogOperations; -import io.unitycatalog.server.persist.TableRepository; -import io.unitycatalog.server.persist.VolumeOperations; +import io.unitycatalog.server.persist.VolumeRepository; import io.unitycatalog.server.exception.ErrorCode; -import java.util.Optional; import java.util.regex.Pattern; public class ValidationUtils { public static final String CATALOG = "Catalog"; public static final String SCHEMA = "Schema"; - public static final String VOLUME = "Volume"; public static final String FUNCTION = "Function"; - private static final CatalogOperations CATALOG_OPERATIONS = CatalogOperations.getInstance(); - private static final SchemaOperations SCHEMA_OPERATIONS = SchemaOperations.getInstance(); - private static final VolumeOperations VOLUME_OPERATIONS = VolumeOperations.getInstance(); - private static final FunctionRepository FUNCTION_REPOSITORY = FunctionRepository.getInstance(); - private static final TableRepository TABLE_REPOSITORY = TableRepository.getInstance(); // Regex to reject names containing a period, space, forward-slash, C0 + DEL control characters private static final Pattern INVALID_FORMAT = Pattern.compile("[\\.\\ \\/\\x00-\\x1F\\x7F]"); private static final Integer MAX_NAME_LENGTH = 255; - public static HttpResponse entityNotFoundResponse(String entity, String... message) { - throw new BaseException(ErrorCode.NOT_FOUND, entity + " not found: " + String.join(".", message)); - } - - public static HttpResponse entityAlreadyExistsResponse(String entity, String... message) { - throw new BaseException(ErrorCode.ALREADY_EXISTS, entity + " already exists: " + String.join(".", message)); - } - - public static boolean catalogExists(String catalogName) { - return CATALOG_OPERATIONS.getCatalog(catalogName) != null; - } - - public static boolean schemaExists(String catalogName, String schemaName) { - if (!catalogExists(catalogName)) - return false; - return SCHEMA_OPERATIONS.getSchema(catalogName + "." + schemaName) != null; - } - - public static boolean volumeExists(String catalogName, String schemaName, String volumeName) { - if (!schemaExists(catalogName, schemaName)) - return false; - return VOLUME_OPERATIONS.getVolume(catalogName + "." + schemaName + "." + volumeName) != null; - } - - public static boolean functionExists(String catalogName, String schemaName, String functionName) { - if (!schemaExists(catalogName, schemaName)) - return false; - return FUNCTION_REPOSITORY.getFunction(catalogName + "." + schemaName + "." + functionName) != null; - } - public static void validateSqlObjectName(String name) { if (name == null || name.isEmpty()) { throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Name cannot be empty"); @@ -68,4 +28,4 @@ public static void validateSqlObjectName(String name) { throw new BaseException(ErrorCode.INVALID_ARGUMENT, "Name cannot contain a period, space, forward-slash, or control characters"); } } -} +} \ No newline at end of file diff --git a/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java index d4cbab4f3..2d790f340 100644 --- a/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/BaseCRUDTest.java @@ -18,6 +18,10 @@ protected void cleanUp() { if (catalogOperations.getCatalog(CATALOG_NAME) != null) { catalogOperations.deleteCatalog(CATALOG_NAME); } + } catch (Exception e) { + // Ignore + } + try { if (catalogOperations.getCatalog(CATALOG_NEW_NAME) != null) { catalogOperations.deleteCatalog(CATALOG_NEW_NAME); } diff --git a/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java index d10ed8767..caf43d055 100644 --- a/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/function/BaseFunctionCRUDTest.java @@ -3,6 +3,7 @@ import io.unitycatalog.client.ApiException; import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; +import io.unitycatalog.server.utils.TestUtils; import org.junit.*; import io.unitycatalog.server.base.ServerConfig; @@ -42,6 +43,20 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); } @@ -105,9 +120,17 @@ public void testFunctionCRUD() throws ApiException { FunctionInfo retrievedFunctionInfo = functionOperations.getFunction(FUNCTION_FULL_NAME); assertEquals(functionInfo, retrievedFunctionInfo); + // now update the parent catalog + catalogOperations.updateCatalog(CATALOG_NAME, CATALOG_NEW_NAME, ""); + // get the function again + FunctionInfo retrievedFunctionInfoAfterCatUpdate = functionOperations.getFunction( + CATALOG_NEW_NAME + "." + SCHEMA_NAME + "." + FUNCTION_NAME); + assertEquals(retrievedFunctionInfo.getFunctionId(), + retrievedFunctionInfoAfterCatUpdate.getFunctionId()); + // Delete function - functionOperations.deleteFunction(FUNCTION_FULL_NAME, true); - assertFalse(contains(functionOperations.listFunctions(CATALOG_NAME, SCHEMA_NAME), + functionOperations.deleteFunction(CATALOG_NEW_NAME + "." + SCHEMA_NAME + "." + FUNCTION_NAME, true); + assertFalse(contains(functionOperations.listFunctions(CATALOG_NEW_NAME, SCHEMA_NAME), functionInfo, f -> f.getFunctionId().equals(functionInfo.getFunctionId()))); } } diff --git a/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java index a53526007..3b1535621 100644 --- a/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/schema/BaseSchemaCRUDTest.java @@ -1,10 +1,7 @@ package io.unitycatalog.server.base.schema; import io.unitycatalog.client.ApiException; -import io.unitycatalog.client.model.CreateCatalog; -import io.unitycatalog.client.model.CreateSchema; -import io.unitycatalog.client.model.SchemaInfo; -import io.unitycatalog.client.model.UpdateSchema; +import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; import io.unitycatalog.server.base.ServerConfig; import io.unitycatalog.server.utils.TestUtils; @@ -38,7 +35,22 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); + } protected void createCommonResources() throws ApiException { @@ -84,10 +96,16 @@ public void testSchemaCRUDL() throws ApiException { Assert.assertEquals(TestUtils.SCHEMA_NEW_FULL_NAME, updatedSchemaInfo.getFullName()); assertNotNull(updatedSchemaInfo.getUpdatedAt()); + //Now update the parent catalog name + catalogOperations.updateCatalog(TestUtils.CATALOG_NAME, TestUtils.CATALOG_NEW_NAME, ""); + SchemaInfo updatedSchemaInfo2 = schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + + "." + TestUtils.SCHEMA_NEW_NAME); + assertEquals(retrievedSchemaInfo.getSchemaId(), updatedSchemaInfo2.getSchemaId()); + // Delete schema System.out.println("Testing delete schema.."); - schemaOperations.deleteSchema(TestUtils.SCHEMA_NEW_FULL_NAME); - assertFalse(TestUtils.contains(schemaOperations.listSchemas(TestUtils.CATALOG_NAME), updatedSchemaInfo, (schema) -> + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + assertFalse(TestUtils.contains(schemaOperations.listSchemas(TestUtils.CATALOG_NEW_NAME), updatedSchemaInfo, (schema) -> schema.getName().equals(TestUtils.SCHEMA_NEW_NAME))); } } diff --git a/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java index d5bbf9c1f..031eab4c0 100644 --- a/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/table/BaseTableCRUDTest.java @@ -54,6 +54,21 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } + super.cleanUp(); } @@ -205,11 +220,18 @@ public void testTableCRUD() throws IOException, ApiException { assertNotNull(managedListTable.getCreatedAt()); assertNotNull(managedListTable.getTableId()); + // Now update the parent schema name + schemaOperations.updateSchema(TestUtils.SCHEMA_FULL_NAME, new UpdateSchema().newName(TestUtils.SCHEMA_NEW_NAME).comment(TestUtils.SCHEMA_COMMENT)); + // now fetch the table again + TableInfo managedTableAfterSchemaUpdate = tableOperations.getTable(TestUtils.CATALOG_NAME + "." + TestUtils.SCHEMA_NEW_NAME + "." + TABLE_NAME); + assertEquals(managedTable.getTableId(), managedTableAfterSchemaUpdate.getTableId()); + // Delete managed table + String newTableFullName = TestUtils.CATALOG_NAME + "." + TestUtils.SCHEMA_NEW_NAME + "." + TABLE_NAME; System.out.println("Testing delete table.."); - tableOperations.deleteTable(TABLE_FULL_NAME); - assertThrows(Exception.class, () -> tableOperations.getTable(TABLE_FULL_NAME)); + tableOperations.deleteTable(newTableFullName); + assertThrows(Exception.class, () -> tableOperations.getTable(newTableFullName)); } } \ No newline at end of file diff --git a/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java b/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java index b3a327307..255f09ed4 100644 --- a/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java +++ b/server/src/test/java/io/unitycatalog/server/base/volume/BaseVolumeCRUDTest.java @@ -6,6 +6,7 @@ import io.unitycatalog.server.persist.FileUtils; import io.unitycatalog.server.persist.HibernateUtil; import io.unitycatalog.server.persist.dao.VolumeInfoDAO; +import io.unitycatalog.server.utils.TestUtils; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.junit.*; @@ -56,13 +57,29 @@ protected void cleanUp() { } catch (Exception e) { // Ignore } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NEW_NAME); + } + } catch (Exception e) { + // Ignore + } + try { + if (schemaOperations.getSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME) != null) { + schemaOperations.deleteSchema(TestUtils.CATALOG_NEW_NAME + "." + TestUtils.SCHEMA_NAME); + } + } catch (Exception e) { + // Ignore + } super.cleanUp(); } + private SchemaInfo schemaInfo; + protected void createCommonResources() throws ApiException { // Common setup operations such as creating a catalog and schema catalogOperations.createCatalog(CATALOG_NAME, "Common catalog for volumes"); - schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); + schemaInfo = schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); } @Test @@ -126,15 +143,13 @@ public void testVolumeCRUD() throws ApiException { try (Session session = sessionFactory.openSession()) { session.beginTransaction(); VolumeInfoDAO managedVolume = VolumeInfoDAO.builder() - .catalogName(CATALOG_NAME) - .schemaName(SCHEMA_NAME) .volumeType(VolumeType.MANAGED.getValue()) .storageLocation("/tmp/managed_volume") .name(VOLUME_NAME) - .fullName(VOLUME_FULL_NAME) .createdAt(new Date()) .updatedAt(new Date()) - .volumeId(UUID.randomUUID().toString()) + .id(UUID.randomUUID()) + .schemaId(UUID.fromString(schemaInfo.getSchemaId())) .build(); session.persist(managedVolume); session.getTransaction().commit(); @@ -159,9 +174,17 @@ public void testVolumeCRUD() throws ApiException { return volume.getName().equals(VOLUME_NAME); })); + //NOW Update the schema name + schemaOperations.updateSchema(SCHEMA_FULL_NAME, + new UpdateSchema().newName(SCHEMA_NEW_NAME).comment(SCHEMA_COMMENT)); + // get volume + VolumeInfo volumePostSchemaNameChange = volumeOperations.getVolume + (CATALOG_NAME + "." + SCHEMA_NEW_NAME + "." + VOLUME_NAME); + assertEquals(volumePostSchemaNameChange.getVolumeId(), managedVolumeInfo.getVolumeId()); + // Delete volume System.out.println("Testing delete volume.."); - volumeOperations.deleteVolume(VOLUME_FULL_NAME); - assertEquals(0, getSize(volumeOperations.listVolumes(CATALOG_NAME, SCHEMA_NAME))); + volumeOperations.deleteVolume(CATALOG_NAME + "." + SCHEMA_NEW_NAME + "." + VOLUME_NAME); + assertEquals(0, getSize(volumeOperations.listVolumes(CATALOG_NAME, SCHEMA_NEW_NAME))); } } \ No newline at end of file