diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java index bfa887fa..b799bf6a 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java @@ -423,20 +423,20 @@ private void validateTableStructure(TableSchema flinkSchema) { throw new IllegalArgumentException("Couldn't get the sink table's column info."); } // validate primary keys - List primayKeys = new ArrayList<>(); + List primaryKeys = new ArrayList<>(); for (int i = 0; i < rows.size(); i++) { String keysType = rows.get(i).get("COLUMN_KEY").toString(); if (!"PRI".equals(keysType)) { continue; } - primayKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase()); + primaryKeys.add(rows.get(i).get("COLUMN_NAME").toString().toLowerCase()); } - if (!primayKeys.isEmpty()) { + if (!primaryKeys.isEmpty()) { if (!constraint.isPresent()) { throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`."); } - if (constraint.get().getColumns().size() != primayKeys.size() || - !constraint.get().getColumns().stream().allMatch(col -> primayKeys.contains(col.toLowerCase()))) { + if (constraint.get().getColumns().size() != primaryKeys.size() || + !constraint.get().getColumns().stream().allMatch(col -> primaryKeys.contains(col.toLowerCase()))) { throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table."); } sinkOptions.enableUpsertDelete();