diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java index 0d2b7c5967d1..ad4f7dafd755 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java @@ -21,6 +21,7 @@ import io.airbyte.integrations.destination.record_buffer.FileBuffer; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.TestingNamespaces; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import java.io.IOException; @@ -209,10 +210,11 @@ private List retrieveRecordsFromTable(final String tableName, final St // for each test we create a new schema in the database. run the test in there and then remove it. @Override protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { - final String schemaName = Strings.addRandomSuffix("integration_test", "_", 5); + final String schemaName = TestingNamespaces.generate(); final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); baseConfig = getStaticConfig(); database = createDatabase(); + removeOldNamespaces(); getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;", USER_WITHOUT_CREDS, baseConfig.get("password").asText()); @@ -221,7 +223,33 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet TES ((ObjectNode) configForSchema).put("schema", schemaName); TEST_SCHEMAS.add(schemaName); config = configForSchema; - this.testDestinationEnv = testEnv; + testDestinationEnv = testEnv; + } + + private void removeOldNamespaces() { + final List schemas; + try { + schemas = getDatabase().query(ctx -> ctx.fetch("SELECT schema_name FROM information_schema.schemata;")) + .stream() + .map(record -> record.get("schema_name").toString()) + .toList(); + } catch (final SQLException e) { + // if we can't fetch the schemas, just return. + return; + } + + int schemasDeletedCount = 0; + for (final String schema : schemas) { + if (TestingNamespaces.isOlderThan2Days(schema)) { + try { + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); + schemasDeletedCount++; + } catch (final SQLException e) { + LOGGER.error("Failed to delete old dataset: {}", schema, e); + } + } + } + LOGGER.info("Deleted {} old schemas.", schemasDeletedCount); } @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java index 5ef1e1d947f1..bd0d9f6639c0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -24,6 +24,7 @@ import io.airbyte.integrations.base.ssh.SshTunnel; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.TestingNamespaces; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import java.io.IOException; import java.sql.Connection; @@ -167,7 +168,7 @@ protected int getMaxRecordValueLimit() { protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { baseConfig = getStaticConfig(); final JsonNode configForSchema = Jsons.clone(baseConfig); - schemaName = Strings.addRandomSuffix("integration_test", "_", 5); + schemaName = TestingNamespaces.generate(); TEST_SCHEMAS.add(schemaName); ((ObjectNode) configForSchema).put("schema", schemaName); config = configForSchema;