Skip to content

Commit

Permalink
update redshift tests to use schema names that follow airbyte convent…
Browse files Browse the repository at this point in the history
…ions (airbytehq#29823)
  • Loading branch information
cgardens authored Aug 26, 2023
1 parent f3ad508 commit 9f2ae91
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.TestingNamespaces;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import java.io.IOException;
Expand Down Expand Up @@ -209,10 +210,11 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
// for each test we create a new schema in the database. run the test in there and then remove it.
@Override
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) throws Exception {
final String schemaName = Strings.addRandomSuffix("integration_test", "_", 5);
final String schemaName = TestingNamespaces.generate();
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
baseConfig = getStaticConfig();
database = createDatabase();
removeOldNamespaces();
getDatabase().query(ctx -> ctx.execute(createSchemaQuery));
final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;",
USER_WITHOUT_CREDS, baseConfig.get("password").asText());
Expand All @@ -221,7 +223,33 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES
((ObjectNode) configForSchema).put("schema", schemaName);
TEST_SCHEMAS.add(schemaName);
config = configForSchema;
this.testDestinationEnv = testEnv;
testDestinationEnv = testEnv;
}

private void removeOldNamespaces() {
final List<String> schemas;
try {
schemas = getDatabase().query(ctx -> ctx.fetch("SELECT schema_name FROM information_schema.schemata;"))
.stream()
.map(record -> record.get("schema_name").toString())
.toList();
} catch (final SQLException e) {
// if we can't fetch the schemas, just return.
return;
}

int schemasDeletedCount = 0;
for (final String schema : schemas) {
if (TestingNamespaces.isOlderThan2Days(schema)) {
try {
getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema)));
schemasDeletedCount++;
} catch (final SQLException e) {
LOGGER.error("Failed to delete old dataset: {}", schema, e);
}
}
}
LOGGER.info("Deleted {} old schemas.", schemasDeletedCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.TestingNamespaces;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.sql.Connection;
Expand Down Expand Up @@ -167,7 +168,7 @@ protected int getMaxRecordValueLimit() {
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) throws Exception {
baseConfig = getStaticConfig();
final JsonNode configForSchema = Jsons.clone(baseConfig);
schemaName = Strings.addRandomSuffix("integration_test", "_", 5);
schemaName = TestingNamespaces.generate();
TEST_SCHEMAS.add(schemaName);
((ObjectNode) configForSchema).put("schema", schemaName);
config = configForSchema;
Expand Down

0 comments on commit 9f2ae91

Please sign in to comment.