This repository has been archived by the owner on Dec 18, 2023. It is now read-only.
forked from awslabs/amazon-redshift-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sp_check_primary_key.sql
166 lines (156 loc) · 8.25 KB
/
sp_check_primary_key.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**********************************************************************************************
Purpose: Check the integrity of the PRIMARY KEY declared on a table
Notes: Only checks PK declared to Redshift. Will error if no PK on the table.
Parameters:
batch_time : Timestamp for this batch. Can be used to group multiple fixes
check_table : Schema qualified name of table to be queried
check_action: Action to be taken when PK errors are found
* "LOG" => PK errors will be logged to `log_table`
* "FIX" => PK duplicates will be removed
log_table : Schema qualified table where actions are to be logged
DDL structure must be as follows:
CREATE TABLE $(log_table) (
batch_time TIMESTAMP,
check_table VARCHAR,
check_time TIMESTAMP,
check_status VARCHAR,
error_count INT);
max_fix_rows: Maximum number of PK errors to correct automatically
History:
2019-10-25 - joeharris76 - Created
**********************************************************************************************/
-- DROP PROCEDURE sp_check_primary(TIMESTAMP,VARCHAR,VARCHAR,VARCHAR,INTEGER);
CREATE OR REPLACE PROCEDURE sp_check_primary_key (
batch_time IN TIMESTAMP
, check_table IN VARCHAR(256)
, check_action IN VARCHAR(10)
, log_table IN VARCHAR(256)
, max_fix_rows IN INTEGER)
AS $$
DECLARE
sql VARCHAR(MAX) := '';
pk_columns VARCHAR(512) := '';
dupe_count INTEGER := 0;
temp_count INTEGER := 0;
BEGIN
IF check_action NOT IN ('LOG', 'FIX') THEN
RAISE EXCEPTION 'The `check_action` parameter must be "LOG" or "FIX".';
END IF;
IF check_table='' OR log_table='' THEN
RAISE EXCEPTION 'Parameters `check_table` and `log_table` cannot be empty.';
END IF;
IF check_action='FIX' AND max_fix_rows<=0 THEN
RAISE EXCEPTION 'Parameter `max_fix_rows` must be >0 when `check_action` = "FIX".';
END IF;
--Retrieve the primary key column(s) for the table
sql := 'SELECT REPLACE(REPLACE(pg_get_constraintdef(con.oid),''PRIMARY KEY ('',''''),'')'','''')'||
' FROM pg_constraint AS con '||
' JOIN pg_class AS tbl ON tbl.relnamespace = con.connamespace AND tbl.oid = con.conrelid'||
' JOIN pg_namespace AS sch ON sch.oid =tbl.relnamespace'||
' WHERE tbl.relkind = ''r'' AND con.contype = ''p''';
IF CHARINDEX('.',check_table) > 0 THEN
sql := sql||' AND sch.nspname = \''||quote_ident(SPLIT_PART(check_table,'.',1))||'\''
||' AND tbl.relname = \''||quote_ident(SPLIT_PART(check_table,'.',2))||'\';';
ELSE
sql := sql||' AND tbl.relname = '''||check_table||''';';
END IF;
EXECUTE sql||' ;' INTO pk_columns;
IF pk_columns = '' THEN
RAISE INFO 'No PRIMARY KEY found for table "%".',check_table;
ELSE
--Count the number of duplicates in the PRIMARY KEY
sql := 'SELECT SUM(dupes) FROM (SELECT '||pk_columns||', COUNT(*) dupes'||
' FROM '||check_table||' GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)';
EXECUTE sql||' ;' INTO dupe_count;
IF dupe_count IS NULL THEN
dupe_count := 0;
END IF;
IF dupe_count = 0 THEN
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''OK - No duplicates found'',0;';
RAISE INFO 'OK - No duplicates found';
ELSIF check_action = 'LOG' THEN
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''ERROR (LOG) - Duplicates found'','||dupe_count||';';
RAISE INFO 'ERROR (LOG) - % Duplicates found',dupe_count;
ELSIF check_action = 'FIX' THEN
IF max_fix_rows <= dupe_count THEN --Too many dupes, cannot fix
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''', '''||SYSDATE||''',''ERROR (FIX) - Duplicate count exceeds `max_fix_rows` value.'','||dupe_count||';';
RAISE INFO 'ERROR (FIX) - Duplicate count % exceeds `max_fix_rows` %',dupe_count,max_fix_rows;
ELSE --Attempt to correct the PK
EXECUTE 'DROP TABLE IF EXISTS tmp_sp_fix_pk;';
EXECUTE 'CREATE TEMPORARY TABLE tmp_sp_fix_pk (LIKE '||check_table||' );';
--Insert distinct rows for PK duplicates into temp table
EXECUTE 'INSERT INTO tmp_sp_fix_pk'||
' SELECT DISTINCT * FROM '||check_table||' WHERE ('||pk_columns||') IN (SELECT '||pk_columns||
' FROM '||check_table||' GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)';
--Check that PK duplciates are removed in the temp table
EXECUTE 'SELECT COUNT(*) FROM (SELECT '||pk_columns||
' FROM tmp_sp_fix_pk GROUP BY '||pk_columns||' HAVING COUNT(*) > 1)' INTO temp_count ;
IF temp_count > 0 THEN
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''','''||SYSDATE||''',''ERROR (FIX) - Failed. Duplicate PK rows are not identical.'','||dupe_count||';';
RAISE INFO 'ERROR (FIX) - Failed. Duplicate PK rows are not identical';
ELSE
--Delete all rows for the PK duplicates from the source table
EXECUTE 'DELETE FROM '||check_table||' WHERE ('||pk_columns||') IN (SELECT '||pk_columns||' FROM tmp_sp_fix_pk);';
--Insert the deduped rows from the temp table into the source
EXECUTE 'INSERT INTO '||check_table||' SELECT * FROM tmp_sp_fix_pk;';
--Update the log for the fix
EXECUTE 'INSERT INTO '||log_table||' SELECT '''||batch_time||''','''||check_table||''','''||SYSDATE||''',''SUCCESS (FIX) - Duplicates corrected.'','||dupe_count||';';
--Commit the fixed data
COMMIT;
RAISE INFO 'SUCCESS (FIX) - % duplicates corrected.',dupe_count;
END IF;
END IF;
ELSE
RAISE EXCEPTION 'ERROR in `sp_check_primary_key` execution';
END IF;
END IF;
END
$$ LANGUAGE plpgsql;
/* Usage Example:
DROP TABLE IF EXISTS tmp_pk_log;
CREATE TABLE tmp_pk_log(
batch_time TIMESTAMP
, check_table VARCHAR
, check_time TIMESTAMP
, check_status VARCHAR
, error_count INT);
DROP TABLE IF EXISTS tmp_pk_test;
CREATE TEMP TABLE tmp_pk_test (
pk_col INTEGER PRIMARY KEY
, entries INTEGER);
INSERT INTO tmp_pk_test VALUES
(100, 7001)
, (100, 7001)
, (100, 7001)
, (100, 7001)
, (101, 20225)
, (102, 22772)
, (103, 4577);
CALL sp_check_primary_key(SYSDATE,'tmp_pk_test','LOG','tmp_pk_log',0);
-- INFO: ERROR (LOG) - 4 Duplicates found
SELECT * FROM tmp_pk_log;
-- -[ RECORD 1 ]+-------------------------------
-- batch_time | 2019-10-31 00:01:33.849821
-- check_table | tmp_pk_test
-- check_time | 2019-10-31 00:01:33.849821
-- check_status | ERROR (LOG) - Duplicates found
-- error_count | 4
CALL sp_check_primary_key(SYSDATE,'tmp_pk_test','FIX','tmp_pk_log',3);
-- INFO: ERROR (FIX) - Duplicate count 4 exceeds `max_fix_rows` 3
SELECT * FROM tmp_pk_log;
-- -[ RECORD 1 ]+------------------------------------------------------------
-- batch_time | 2019-10-31 00:01:34.479395
-- check_table | tmp_pk_test
-- check_time | 2019-10-31 00:01:34.479395
-- check_status | ERROR (FIX) - Duplicate count exceeds `max_fix_rows` value.
-- error_count | 4
CALL sp_check_primary_key(SYSDATE,'tmp_pk_test','FIX','tmp_pk_log',99);
-- INFO: SUCCESS (FIX) - 4 duplicates corrected.
SELECT * FROM tmp_pk_log;
-- -[ RECORD 1 ]+------------------------------------------------------------
-- batch_time | 2019-10-31 00:01:34.892679
-- check_table | tmp_pk_test
-- check_time | 2019-10-31 00:01:34.892679
-- check_status | SUCCESS (FIX) - Duplicates corrected.
-- error_count | 4
*/