-
Notifications
You must be signed in to change notification settings - Fork 0
/
paf_postgresql_import_postgis.plpgsql
1278 lines (1054 loc) · 60.6 KB
/
paf_postgresql_import_postgis.plpgsql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
DROP FUNCTION IF EXISTS public.import_pc_paf(varchar, varchar);
CREATE OR REPLACE FUNCTION public.import_pc_paf(in_edition varchar, in_data varchar)
RETURNS boolean AS
$BODY$
DECLARE
v_data_root varchar;
v_data_main varchar;
v_data_postzon varchar;
v_data_alias varchar;
v_processed integer;
v_main_footer varchar;
v_std_footer varchar;
v_sql varchar;
v_table_created boolean;
BEGIN
v_data_root := in_data || '/' || in_edition || '/';
v_data_main := v_data_root || 'PAF MAIN FILE/';
v_data_postzon := v_data_root || 'POSTZON 100M/';
v_data_alias := v_data_root || 'ALIAS/';
v_main_footer = ' %';
v_std_footer = '99999999%';
RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root;
CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP;
-- 1) Localaties
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging localaties', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'local.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%LOCALITY' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging localaties, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_localities') THEN
RAISE NOTICE 'Table "public"."pc_paf_localities" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_localities"';
CREATE TABLE public.pc_paf_localities (
locality_key integer NOT NULL,
post_town varchar(30),
dependent_locality varchar(35),
double_dependent_locality varchar(35)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_localities CASCADE;
INSERT INTO public.pc_paf_localities
SELECT substring(data,1,6)::integer,
nullif(trim(substring(data,52,30)),''),
nullif(trim(substring(data,82,35)),''),
nullif(trim(substring(data,117,35)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_localities ADD PRIMARY KEY (locality_key);
END IF;
RAISE NOTICE '%: Done importing localities (imported % records)', clock_timestamp(), v_processed;
-- 2) Thoroughfares
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging thoroughfares', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thfare.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%THOROUGH' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging thoroughfares, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfares') THEN
RAISE NOTICE 'Table "public"."pc_paf_thoroughfares" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfares"';
CREATE TABLE public.pc_paf_thoroughfares (
thoroughfare_key integer NOT NULL,
thoroughfare_name varchar(60)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public. pc_paf_thoroughfares CASCADE;
INSERT INTO public. pc_paf_thoroughfares
SELECT substring(data,1,8)::integer,
nullif(trim(substring(data,9,60)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_thoroughfares ADD PRIMARY KEY (thoroughfare_key);
END IF;
RAISE NOTICE '%: Done importing thoroughfares (imported % records)', clock_timestamp(), v_processed;
-- 3) Thoroughfares Descriptor
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging thoroughfares descriptor', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thdesc.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%THDESCRI' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging thoroughfares descriptor, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfare_descriptor') THEN
RAISE NOTICE 'Table "public"."pc_paf_thoroughfare_descriptor" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfare_descriptor"';
CREATE TABLE public.pc_paf_thoroughfare_descriptor (
thoroughfare_descriptor_key integer NOT NULL,
thoroughfare_descriptor varchar(20),
approved_abbreviation varchar(6)
);
v_table_created := true;
end if;
TRUNCATE TABLE public.pc_paf_thoroughfare_descriptor CASCADE;
INSERT INTO public.pc_paf_thoroughfare_descriptor
SELECT substring(data,1,4)::integer,
nullif(trim(substring(data,5,20)),''),
nullif(trim(substring(data,25,6)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_thoroughfare_descriptor ADD PRIMARY KEY (thoroughfare_descriptor_key);
END IF;
RAISE NOTICE '%: Done importing thoroughfares descriptor (imported % records)', clock_timestamp(), v_processed;
-- 4) Building Names
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging building names', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'bname.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%BUILDING' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging building names, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_building_names') THEN
RAISE NOTICE 'Table "public"."pc_paf_building_names" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_building_names"';
CREATE TABLE public.pc_paf_building_names (
building_name_key integer NOT NULL,
building_name varchar(50)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_building_names CASCADE;
INSERT INTO public.pc_paf_building_names
SELECT substring(data,1,8)::integer,
nullif(trim(substring(data,9,50)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_building_names ADD PRIMARY KEY (building_name_key);
END IF;
RAISE NOTICE '%: Done importing building names (imported % records)', clock_timestamp(), v_processed;
-- 5) Sub Building Names file (subbname.c01)
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging sub building names', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'subbname.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%SUBBUILD' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging sub building names, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_sub_building_names') THEN
RAISE NOTICE 'Table "public"."pc_paf_sub_building_names" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_sub_building_names"';
CREATE TABLE public.pc_paf_sub_building_names (
sub_building_name_key integer NOT NULL,
sub_building_name varchar(50)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_sub_building_names CASCADE;
INSERT INTO public.pc_paf_sub_building_names
SELECT substring(data,1,8)::integer,
nullif(trim(substring(data,9,30)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_sub_building_names ADD PRIMARY KEY (sub_building_name_key);
END IF;
RAISE NOTICE '%: Done importing sub building names (imported % records)', clock_timestamp(), v_processed;
-- 6) Organisations
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging organisations', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'org.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%ORGANISA' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging organisations, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_organisations') THEN
RAISE NOTICE 'Table "public"."pc_paf_organisations" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_organisations"';
CREATE TABLE public.pc_paf_organisations (
organisation_key integer NOT NULL,
postcode_type varchar(1) NOT NULL,
organisation_name varchar(60),
department_name varchar(60)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_organisations CASCADE;
INSERT INTO public.pc_paf_organisations
SELECT substring(data,1,8)::integer,
nullif(trim(substring(data,9,1)),''),
nullif(trim(substring(data,10,60)),''),
nullif(trim(substring(data,70,60)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_organisations ADD PRIMARY KEY (organisation_key, postcode_type);
COMMENT ON COLUMN public.pc_paf_organisations.organisation_key IS 'When postcode type is L organisation_key relates to address_key';
END IF;
RAISE NOTICE '%: Done importing organisations (imported % records)', clock_timestamp(), v_processed;
-- 7) Postzon with latlon generated using postgis
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging postzon', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_postzon || 'pzone100.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%PZONE100' || in_edition || '%' OR data like v_main_footer;
RAISE NOTICE '%: Done staging postzon, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_postzon_100m') THEN
RAISE NOTICE 'Table "public"."pc_paf_postzon_100m" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_postzon_100m"';
CREATE TABLE public.pc_paf_postzon_100m (
postzon_100m_key serial NOT NULL,
outward_code varchar(4) NOT NULL,
inward_code varchar(3) NOT NULL,
introduction_date date,
grid_reference_east integer,
grid_reference_north integer,
country_code varchar(9),
area_code_county varchar(9),
area_code_district varchar(9),
ward_code varchar(9),
nhs_region varchar(9),
nhs_code varchar(9),
user_type smallint,
grid_status smallint,
latitude double precision,
longitude double precision
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_postzon_100m CASCADE;
INSERT INTO public.pc_paf_postzon_100m(outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north,
country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude)
SELECT nullif(trim(substring(data,1,4)),'') AS outward_code,
nullif(trim(substring(data,5,3)),'') AS inward_code,
to_date(substring(data,8,6), 'YYYYMM') AS introduction_date,
(nullif(trim(substring(data,14,5)),'') || '0')::integer AS grid_reference_east,
(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ E'[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')::integer AS grid_reference_north,
nullif(trim(substring(data,24,9)),'') AS country_code,
nullif(trim(substring(data,33,9)),'') AS area_code_county,
nullif(trim(substring(data,42,9)),'') AS area_code_district,
nullif(trim(substring(data,51,9)),'') AS ward_code,
nullif(trim(substring(data,60,9)),'') AS nhs_region,
nullif(trim(substring(data,69,9)),'') AS nhs_code,
nullif(trim(substring(data,78,1)),'')::smallint AS user_type,
nullif(trim(substring(data,79,1)),'')::smallint AS grid_status,
ST_y(ST_transform(ST_GeomFromText('POINT('||(nullif(trim(substring(data,14,5)),'') || '0')||' '||(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ E'[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')||')',CASE WHEN nullif(trim(substring(data,1,4)),'') LIKE 'BT%' THEN 29903 ELSE 27700 END),4326))::numeric(8,6) AS latitude,
ST_x(ST_transform(ST_GeomFromText('POINT('||(nullif(trim(substring(data,14,5)),'') || '0')||' '||(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ E'[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')||')',CASE WHEN nullif(trim(substring(data,1,4)),'') LIKE 'BT%' THEN 29903 ELSE 27700 END),4326))::numeric(8,6) AS longitude
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_postzon_100m ADD PRIMARY KEY (postzon_100m_key),
ADD CONSTRAINT pc_paf_postzon_100m_unique UNIQUE (outward_code, inward_code);
CREATE INDEX pc_paf_postzon_100m_longitude ON public.pc_paf_postzon_100m (longitude);
CREATE INDEX pc_paf_postzon_100m_latitude ON public.pc_paf_postzon_100m (latitude);
COMMENT ON TABLE public.pc_paf_postzon_100m IS 'Geographical data from the Royal Mail with accuracy of 100m';
COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_east IS 'BT postcodes reflect the Irish Grid public (different origin and scale); subtract 17000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion';
COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_north IS 'BT postcodes reflect the Irish Grid public (different origin and scale); add 13000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion';
COMMENT ON COLUMN public.pc_paf_postzon_100m.user_type IS '0 Small User 1 Large User';
COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_status IS E'0 Status not supplied by OS \n
1 Within the building of the matched address closest to the Postcode mean. \n
2 Co-ordinates allocated by GROS during Postcode boundary creation to the building nearest the centre of the populated part of the Postcode (Scotland only) \n
3 Approximate to within 50m of true position \n
4 Postcode unit mean (direct copy from ADDRESS-POINT (GB) and COMPAS (NI) - mean of matched addresses with the same Postcode) \n
5 Postcode imputed by ONS to 1 metre resolution \n
6 Postcode sector mean - mainly PO Boxes \n
9 No co-ordinates available';
END IF;
RAISE NOTICE '%: Done importing postzon (imported % records)', clock_timestamp(), v_processed;
-- 8) Mainfile
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging mainfile', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c02');
EXECUTE v_sql;
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c03');
EXECUTE v_sql;
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c04');
EXECUTE v_sql;
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c05');
EXECUTE v_sql;
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c06');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer;
RAISE NOTICE '%: Done staging mainfile, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile') THEN
RAISE NOTICE 'Table "public"."pc_paf_mainfile" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_mainfile"';
CREATE TABLE public.pc_paf_mainfile (
paf_record_key serial NOT NULL,
outward_code varchar(4) NOT NULL,
inward_code varchar(3) NOT NULL,
address_key integer NOT NULL,
locality_key integer,
thoroughfare_key integer,
thoroughfare_descriptor_key integer,
dependent_thoroughfare_key integer,
dependent_thoroughfare_descriptor_key integer,
building_number integer,
building_name_key integer,
sub_building_name_key integer,
number_of_households integer,
organisation_key integer,
postcode_type varchar(1),
concatenation_indicator varchar(1),
delivery_point_suffix varchar(2),
small_user_organisation_indicator varchar(1),
po_box_number varchar(6)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_mainfile RESTART IDENTITY;
INSERT INTO public.pc_paf_mainfile(outward_code,inward_code,address_key,locality_key,thoroughfare_key,
thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number,
building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator,
delivery_point_suffix,small_user_organisation_indicator,po_box_number)
SELECT nullif(trim(substring(data,1,4)),''),
nullif(trim(substring(data,5,3)),''),
nullif(substring(data,8,8)::integer,0),
nullif(substring(data,16,6)::integer,0),
nullif(substring(data,22,8)::integer,0),
nullif(substring(data,30,4)::integer,0),
nullif(substring(data,34,8)::integer,0),
nullif(substring(data,42,4)::integer,0),
nullif(substring(data,46,4)::integer,0),
nullif(substring(data,50,8)::integer,0),
nullif(substring(data,58,8)::integer,0),
nullif(substring(data,66,4)::integer,0),
nullif(substring(data,70,8)::integer,0),
nullif(trim(substring(data,78,1)),''),
nullif(trim(substring(data,79,1)),''),
nullif(trim(substring(data,80,2)),''),
nullif(trim(substring(data,82,1)),''),
nullif(trim(substring(data,83,6)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
-- Not all of the indexes below are required, however without these the update_pc_paf function will
-- take a while to commit as the deferred foreign key constraints are checked
CREATE INDEX pc_paf_mainfile_postcode ON public.pc_paf_mainfile USING btree (outward_code, inward_code);
CREATE INDEX pc_paf_mainfile_building_name_key ON public.pc_paf_mainfile USING BTREE (building_name_key);
CREATE INDEX pc_paf_mainfile_locality_key ON public.pc_paf_mainfile USING BTREE (locality_key);
CREATE INDEX pc_paf_mainfile_organisation_key ON public.pc_paf_mainfile USING BTREE (organisation_key, postcode_type);
CREATE INDEX pc_paf_mainfile_sub_building_name_key ON public.pc_paf_mainfile USING BTREE (sub_building_name_key);
CREATE INDEX pc_paf_mainfile_thoroughfare_key ON public.pc_paf_mainfile USING BTREE (thoroughfare_key);
ALTER TABLE public.pc_paf_mainfile
ADD PRIMARY KEY (paf_record_key),
ADD CONSTRAINT pc_paf_mainfile_unique UNIQUE (address_key, organisation_key, postcode_type),
ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE;
COMMENT ON COLUMN public.pc_paf_mainfile.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!';
END IF;
RAISE NOTICE '%: Done importing mainfile (imported % records)', clock_timestamp(), v_processed;
-- 9) Welsh Mainfile
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging welsh alternative mainfile', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'wfmainfl.c06');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer;
RAISE NOTICE '%: Done staging welsh alternative mainfile, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile_welsh') THEN
RAISE NOTICE 'Table "public"."pc_paf_mainfile_welsh" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_mainfile_welsh"';
CREATE TABLE public.pc_paf_mainfile_welsh (
paf_record_key serial NOT NULL,
outward_code varchar(4) NOT NULL,
inward_code varchar(3) NOT NULL,
address_key integer NOT NULL,
locality_key integer,
thoroughfare_key integer,
thoroughfare_descriptor_key integer,
dependent_thoroughfare_key integer,
dependent_thoroughfare_descriptor_key integer,
building_number integer,
building_name_key integer,
sub_building_name_key integer,
number_of_households integer,
organisation_key integer,
postcode_type varchar(1),
concatenation_indicator varchar(1),
delivery_point_suffix varchar(2),
small_user_organisation_indicator varchar(1),
po_box_number varchar(6)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_mainfile_welsh RESTART IDENTITY;
INSERT INTO public.pc_paf_mainfile_welsh(outward_code,inward_code,address_key,locality_key,thoroughfare_key,
thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number,
building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator,
delivery_point_suffix,small_user_organisation_indicator,po_box_number)
SELECT nullif(trim(substring(data,1,4)),''),
nullif(trim(substring(data,5,3)),''),
nullif(substring(data,8,8)::integer,0),
nullif(substring(data,16,6)::integer,0),
nullif(substring(data,22,8)::integer,0),
nullif(substring(data,30,4)::integer,0),
nullif(substring(data,34,8)::integer,0),
nullif(substring(data,42,4)::integer,0),
nullif(substring(data,46,4)::integer,0),
nullif(substring(data,50,8)::integer,0),
nullif(substring(data,58,8)::integer,0),
nullif(substring(data,66,4)::integer,0),
nullif(substring(data,70,8)::integer,0),
nullif(trim(substring(data,78,1)),''),
nullif(trim(substring(data,79,1)),''),
nullif(trim(substring(data,80,2)),''),
nullif(trim(substring(data,82,1)),''),
nullif(trim(substring(data,83,6)),'')
FROM data_stage;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
-- As above, these are required for faster commit on update_pc_paf
CREATE INDEX pc_paf_mainfile_welsh_postcode ON public.pc_paf_mainfile_welsh USING btree (outward_code, inward_code);
CREATE INDEX pc_paf_mainfile_welsh_building_name_key ON public.pc_paf_mainfile_welsh USING BTREE (building_name_key);
CREATE INDEX pc_paf_mainfile_welsh_locality_key ON public.pc_paf_mainfile_welsh USING BTREE (locality_key);
CREATE INDEX pc_paf_mainfile_welsh_organisation_key ON public.pc_paf_mainfile_welsh USING BTREE (organisation_key, postcode_type);
CREATE INDEX pc_paf_mainfile_welsh_sub_building_name_key ON public.pc_paf_mainfile_welsh USING BTREE (sub_building_name_key);
CREATE INDEX pc_paf_mainfile_welsh_thoroughfare_key ON public.pc_paf_mainfile_welsh USING BTREE (thoroughfare_key);
ALTER TABLE public.pc_paf_mainfile_welsh
ADD PRIMARY KEY (paf_record_key),
ADD CONSTRAINT pc_paf_mainfile_welsh_unique UNIQUE (address_key, organisation_key, postcode_type),
ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE;
COMMENT ON COLUMN public.pc_paf_mainfile_welsh.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!';
END IF;
RAISE NOTICE '%: Done importing welsh alternative mainfile (imported % records)', clock_timestamp(), v_processed;
-- 10) Alias file
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging alias file', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_alias || 'aliasfle.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%ALIASFLE' || in_edition || '%' OR data like v_std_footer;
RAISE NOTICE '%: Done staging alias file, importing', clock_timestamp();
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_counties') THEN
RAISE NOTICE 'Table "public"."pc_paf_counties" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_counties"';
CREATE TABLE public.pc_paf_counties (
county_key integer NOT NULL,
county_name varchar(30),
county_type varchar(1)
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_counties CASCADE;
INSERT INTO public.pc_paf_counties
SELECT substring(data,2,4)::integer AS county_key,
trim(substring(data,6,30)) AS county_name,
trim(substring(data,36,1)) AS county_type
FROM data_stage
WHERE substring(data,1,1)::integer = 4;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_counties ADD PRIMARY KEY (county_key),
ADD CONSTRAINT pc_paf_counties_unique UNIQUE (county_name, county_type);
COMMENT ON COLUMN public.pc_paf_counties.county_type IS 'T (Traditional County), P (Former Postal County) or A (Administrative County)';
END IF;
RAISE NOTICE '%: Done importing counties (imported % records)', clock_timestamp(), v_processed;
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_county_alias') THEN
RAISE NOTICE 'Table "public"."pc_paf_county_alias" already exists';
v_table_created := false;
ELSE
RAISE NOTICE 'Creating table "public"."pc_paf_county_alias"';
CREATE TABLE public.pc_paf_county_alias (
county_alias_key serial NOT NULL,
postcode varchar(7) NOT NULL,
former_postal_county integer,
traditional_county integer,
administrative_county integer
);
v_table_created := true;
END IF;
TRUNCATE TABLE public.pc_paf_county_alias CASCADE;
INSERT INTO public.pc_paf_county_alias (postcode, former_postal_county, traditional_county, administrative_county)
SELECT trim(substring(data,2,7)) AS postcode,
nullif(substring(data,9,4)::integer,0) AS former_postal_county,
nullif(substring(data,13,4)::integer,0) AS traditional_county,
nullif(substring(data,17,4)::integer,0) AS administrative_county
FROM data_stage
WHERE substring(data,1,1)::integer = 5;
GET DIAGNOSTICS v_processed = ROW_COUNT;
IF (v_table_created) THEN
ALTER TABLE public.pc_paf_county_alias ADD PRIMARY KEY (county_alias_key),
ADD CONSTRAINT pc_paf_county_alias_unique UNIQUE (postcode),
ADD FOREIGN KEY (former_postal_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (traditional_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE,
ADD FOREIGN KEY (administrative_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE;
END IF;
RAISE NOTICE '%: Done importing county_alias (imported % records)', clock_timestamp(), v_processed;
RAISE NOTICE '%: Completed', clock_timestamp();
RETURN true;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE;
--
DROP FUNCTION IF EXISTS public.update_pc_paf(varchar, varchar);
CREATE OR REPLACE FUNCTION public.update_pc_paf(in_edition varchar, in_data varchar)
RETURNS boolean AS
$BODY$
DECLARE
v_data_root varchar;
v_data_update varchar;
v_data_postzon varchar;
v_data_alias varchar;
v_processed integer;
v_main_footer varchar;
v_std_footer varchar;
v_sql varchar;
v_table_created boolean;
BEGIN
v_data_root := in_data || '/' || in_edition || '_CHANGES/';
v_data_update := v_data_root || 'CONSOLIDATED CHANGES/';
v_data_postzon := v_data_root || 'POSTZON 100M/';
v_data_alias := v_data_root || 'ALIAS/';
v_main_footer = ' %';
RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root;
CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP;
--changes1.c01 - Changes1 (Changes to satelite tables except Organisations) - A single changes file
RAISE NOTICE '%: Begin staging Changes1 file', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'changes1.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%CHANGES1' || in_edition || '%' OR data like v_main_footer;
RAISE NOTICE '%: Done staging Changes1 file',clock_timestamp();
SET CONSTRAINTS ALL DEFERRED;
--Record Type 1 - Localities
RAISE NOTICE '%: Preparing to update localities', clock_timestamp();
CREATE TEMPORARY TABLE tmp_localities ON COMMIT DROP AS
SELECT substring(data,2,8)::integer AS locality_key,
to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,24,1)),'') AS amendment_type,
nullif(trim(substring(data,70,30)),'') AS post_town,
nullif(trim(substring(data,100,35)),'') AS dependent_locality,
nullif(trim(substring(data,135,35)),'') AS double_dependent_locality
FROM data_stage
WHERE substring(data,1,1)::integer = 1
AND nullif(trim(substring(data,24,1)),'') IS NOT NULL
ORDER BY to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on localities',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_localities l
USING tmp_localities lt WHERE lt.locality_key = l.locality_key
AND lt.amendment_type IN ('D', 'B')
AND l.* = (lt.locality_key, lt.post_town::character varying(30), lt.dependent_locality::character varying(30), lt.double_dependent_locality::character varying(30));
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from localities',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_localities
SELECT lt.locality_key, lt.post_town, lt.dependent_locality, lt.double_dependent_locality
FROM tmp_localities lt
WHERE lt.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in localities',clock_timestamp(), v_processed;
--Record Type 2 - Thoroughfares
RAISE NOTICE '%: Preparing to update thoroughfares', clock_timestamp();
CREATE TEMPORARY TABLE tmp_thoroughfares ON COMMIT DROP AS
SELECT substring(data,2,8)::integer AS thoroughfare_key,
to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,24,1)),'') AS amendment_type,
nullif(trim(substring(data,25,60)),'') AS thoroughfare_name
FROM data_stage
WHERE substring(data,1,1)::integer = 2 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on thoroughfares',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_thoroughfares t
USING tmp_thoroughfares tt WHERE tt.thoroughfare_key = t.thoroughfare_key
AND tt.amendment_type IN ('D', 'B')
AND t.* = (tt.thoroughfare_key, t.thoroughfare_name::character varying(60));
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from thoroughfares',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_thoroughfares
SELECT tt.thoroughfare_key, tt.thoroughfare_name
FROM tmp_thoroughfares tt
WHERE tt.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in thoroughfares',clock_timestamp(), v_processed;
--Record Type 3 - Thoroughfare Descriptors
RAISE NOTICE '%: Preparing to update thoroughfare_descriptor', clock_timestamp();
CREATE TEMPORARY TABLE tmp_thoroughfare_descriptor ON COMMIT DROP AS
SELECT substring(data,2,8)::integer AS thoroughfare_descriptor_key ,
to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,24,1)),'') AS amendment_type,
nullif(trim(substring(data,25,20)),'') AS thoroughfare_descriptor,
nullif(trim(substring(data,45,6)),'') AS approved_abbreviation
FROM data_stage
WHERE substring(data,1,1)::integer = 3 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on thoroughfare_descriptor',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_thoroughfare_descriptor td
USING tmp_thoroughfare_descriptor tdt WHERE tdt.thoroughfare_descriptor_key = td.thoroughfare_descriptor_key
AND tdt.amendment_type IN ('D', 'B')
AND td.* = (tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor::character varying(20), tdt.approved_abbreviation::character varying(6));
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from thoroughfare_descriptor',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_thoroughfare_descriptor
SELECT tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor, tdt.approved_abbreviation
FROM tmp_thoroughfare_descriptor tdt
WHERE tdt.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in thoroughfare_descriptor',clock_timestamp(), v_processed;
--Record Type 4 - Building Names
RAISE NOTICE '%: Preparing to update building_names', clock_timestamp();
CREATE TEMPORARY TABLE tmp_building_names ON COMMIT DROP AS
SELECT substring(data,2,8)::integer AS building_name_key,
to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,24,1)),'') AS amendment_type,
nullif(trim(substring(data,25,50)),'') AS building_name
FROM data_stage
WHERE substring(data,1,1)::integer = 4 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on building_names',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_building_names bn
USING tmp_building_names tbn WHERE bn.building_name_key = tbn.building_name_key
AND tbn.amendment_type IN ('D', 'B')
AND bn.* = (tbn.building_name_key, tbn.building_name::character varying(50));
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from building_names',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_building_names
SELECT tbn.building_name_key, tbn.building_name
FROM tmp_building_names tbn
WHERE tbn.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in building_names',clock_timestamp(), v_processed;
--Record Type 5 - Sub Building Names
RAISE NOTICE '%: Preparing to update sub_building_names', clock_timestamp();
CREATE TEMPORARY TABLE tmp_sub_building_names ON COMMIT DROP AS
SELECT substring(data,2,8)::integer AS sub_building_name_key,
to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,24,1)),'') AS amendment_type,
nullif(trim(substring(data,25,30)),'') AS sub_building_name
FROM data_stage
WHERE substring(data,1,1)::integer = 5 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on sub_building_names',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_sub_building_names sbn
USING tmp_sub_building_names tsbn WHERE sbn.sub_building_name_key = tsbn.sub_building_name_key
AND tsbn.amendment_type IN ('D', 'B')
AND sbn.* = (tsbn.sub_building_name_key, tsbn.sub_building_name::character varying(50));
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from sub_building_names',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_sub_building_names
SELECT tsbn.sub_building_name_key, tsbn.sub_building_name
FROM tmp_sub_building_names tsbn
WHERE tsbn.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in sub_building_names',clock_timestamp(), v_processed;
TRUNCATE TABLE data_stage;
-- fpchgsng.c01 - Changes2 (Changes to Mainfile and Organisations) -- fpchgsng.c01 = Single changes file, fpchngs2.c01 = Timeline changes file
RAISE NOTICE '%: Begin staging Changes2 file', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'fpchgsng.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%CHANGES2' || in_edition || '%' OR data like v_main_footer;
RAISE NOTICE '%: Done staging Changes2 file',clock_timestamp();
-- Mainfile
RAISE NOTICE '%: Preparing to update mainfile', clock_timestamp();
CREATE TEMPORARY TABLE tmp_mainfile ON COMMIT DROP AS
SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,15,4)),'') AS outward_code,
nullif(trim(substring(data,19,3)),'') AS inward_code,
nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
nullif(trim(substring(data,24,1)),'') AS postcode_type,
nullif(substring(data,25,8)::integer,0) AS address_key,
nullif(substring(data,33,8)::integer,0) AS organisation_key,
nullif(trim(substring(data,41,1)),'') AS amendment_type,
nullif(substring(data,42,1)::integer,0) AS record_type,
nullif(substring(data,43,8)::integer,0) AS locality_key,
nullif(substring(data,51,8)::integer,0) AS thoroughfare_key,
nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key,
nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key,
nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key,
nullif(substring(data,83,4)::integer,0) AS building_number,
nullif(substring(data,87,8)::integer,0) AS building_name_key,
nullif(substring(data,95,8)::integer,0) AS sub_building_name_key,
nullif(substring(data,103,4)::integer,0) AS number_of_households,
nullif(trim(substring(data,107,1)),'') AS concatenation_indicator,
nullif(trim(substring(data,108,6)),'') AS po_box_number,
nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator,
CASE nullif(substring(data,115,2)::integer,0)
WHEN 1 THEN 'New'
WHEN 2 THEN 'Correction'
WHEN 3 THEN ''
WHEN 4 THEN 'Coding Revision'
WHEN 5 THEN 'Organisation Change'
WHEN 6 THEN 'Status Change'
WHEN 7 THEN 'Large User Deleted'
WHEN 8 THEN 'Building/Sub Building Change'
WHEN 9 THEN 'Large User Change'
END AS reason_for_amendment,
nullif(trim(substring(data,117,4)),'') AS new_outward_code,
nullif(trim(substring(data,121,3)),'') AS new_inward_code
FROM data_stage
WHERE nullif(substring(data,42,1)::integer,0) = 1;
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on mainfile',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_mainfile m
USING tmp_mainfile tm WHERE tm.address_key = m.address_key AND tm.organisation_key IS NOT DISTINCT FROM m.organisation_key AND tm.postcode_type = m.postcode_type
AND tm.amendment_type IN ('D', 'B');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from mainfile',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_mainfile (outward_code, inward_code, address_key, locality_key, thoroughfare_key,
thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key,
building_number, building_name_key, sub_building_name_key, number_of_households,
organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix,
small_user_organisation_indicator, po_box_number)
SELECT COALESCE(tm.new_outward_code, tm.outward_code), COALESCE(tm.new_inward_code, tm.inward_code), tm.address_key, tm.locality_key, tm.thoroughfare_key,
tm.thoroughfare_descriptor_key, tm.dependent_thoroughfare_key, tm.dependent_thoroughfare_descriptor_key,
tm.building_number, tm.building_name_key, tm.sub_building_name_key, tm.number_of_households,
tm.organisation_key, tm.postcode_type, tm.concatenation_indicator, tm.delivery_point_suffix,
tm.small_user_organisation_indicator, tm.po_box_number
FROM (
SELECT cume_dist() OVER w, *
FROM tmp_mainfile
WINDOW w AS (PARTITION BY address_key, organisation_key, postcode_type ORDER BY timestamp)
) tm WHERE tm.cume_dist = 1 AND tm.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in mainfile',clock_timestamp(), v_processed;
-- Organisations
RAISE NOTICE '%: Preparing to update organisations', clock_timestamp();
CREATE TEMPORARY TABLE tmp_organisations ON COMMIT DROP AS
SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,15,4)),'') AS outward_code,
nullif(trim(substring(data,19,3)),'') AS inward_code,
nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
nullif(trim(substring(data,24,1)),'') AS postcode_type,
nullif(substring(data,25,8)::integer,0) AS address_key,
nullif(substring(data,33,8)::integer,0) AS organisation_key,
nullif(trim(substring(data,41,1)),'') AS amendment_type,
nullif(substring(data,42,1)::integer,0) AS record_type,
nullif(trim(substring(data,43,60)),'') AS organisation_name,
nullif(trim(substring(data,103,60)),'') AS department_name,
nullif(trim(substring(data,163,6)),'') AS po_box_number
FROM data_stage
WHERE nullif(substring(data,42,1)::integer,0) IN (2,3);
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on organisations',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_organisations o
USING tmp_organisations tx WHERE COALESCE(tx.organisation_key, tx.address_key) = o.organisation_key AND tx.postcode_type = o.postcode_type
AND tx.amendment_type IN ('D', 'B');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from organisations',clock_timestamp(), v_processed;
INSERT INTO public.pc_paf_organisations
SELECT COALESCE(tx.organisation_key, tx.address_key), tx.postcode_type, tx.organisation_name, tx.department_name
FROM (
SELECT cume_dist() OVER w, *
FROM tmp_organisations
WINDOW w AS (PARTITION BY COALESCE(organisation_key, address_key), postcode_type ORDER BY timestamp)
) tx WHERE tx.cume_dist = 1 AND tx.amendment_type IN ('I', 'C');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Created % records in organisations',clock_timestamp(), v_processed;
-- wchanges.c01 -- Welsh changes (In same format as regular changes2 file (hence has timelined data, not single changes)
TRUNCATE TABLE data_stage;
RAISE NOTICE '%: Begin staging WChanges file', clock_timestamp();
v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'wchanges.c01');
EXECUTE v_sql;
DELETE FROM data_stage WHERE data like '%WCHANGES' || in_edition || '%' OR data like v_main_footer;
RAISE NOTICE '%: Done staging WChanges file',clock_timestamp();
RAISE NOTICE '%: Preparing to update welsh mainfile', clock_timestamp();
CREATE TEMPORARY TABLE tmp_mainfile_welsh ON COMMIT DROP AS
SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
nullif(trim(substring(data,15,4)),'') AS outward_code,
nullif(trim(substring(data,19,3)),'') AS inward_code,
nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
nullif(trim(substring(data,24,1)),'') AS postcode_type,
nullif(substring(data,25,8)::integer,0) AS address_key,
nullif(substring(data,33,8)::integer,0) AS organisation_key,
nullif(trim(substring(data,41,1)),'') AS amendment_type,
nullif(substring(data,42,1)::integer,0) AS record_type,
nullif(substring(data,43,8)::integer,0) AS locality_key,
nullif(substring(data,51,8)::integer,0) AS thoroughfare_key,
nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key,
nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key,
nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key,
nullif(substring(data,83,4)::integer,0) AS building_number,
nullif(substring(data,87,8)::integer,0) AS building_name_key,
nullif(substring(data,95,8)::integer,0) AS sub_building_name_key,
nullif(substring(data,103,4)::integer,0) AS number_of_households,
nullif(trim(substring(data,107,1)),'') AS concatenation_indicator,
nullif(trim(substring(data,108,6)),'') AS po_box_number,
nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator,
CASE nullif(substring(data,115,2)::integer,0)
WHEN 1 THEN 'New'
WHEN 2 THEN 'Correction'
WHEN 3 THEN ''
WHEN 4 THEN 'Coding Revision'
WHEN 5 THEN 'Organisation Change'
WHEN 6 THEN 'Status Change'
WHEN 7 THEN 'Large User Deleted'
WHEN 8 THEN 'Building/Sub Building Change'
WHEN 9 THEN 'Large User Change'
END AS reason_for_amendment,
nullif(trim(substring(data,117,4)),'') AS new_outward_code,
nullif(trim(substring(data,121,3)),'') AS new_inward_code
FROM data_stage
WHERE nullif(substring(data,42,1)::integer,0) = 1;
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Prepared % records for update on welsh mainfile',clock_timestamp(), v_processed;
DELETE FROM public.pc_paf_mainfile_welsh mw
USING tmp_mainfile_welsh twm WHERE twm.address_key = mw.address_key AND twm.organisation_key IS NOT DISTINCT FROM mw.organisation_key AND twm.postcode_type = mw.postcode_type
AND twm.amendment_type IN ('D', 'B');
GET DIAGNOSTICS v_processed = ROW_COUNT;
RAISE NOTICE '%: Removed % records from welsh mainfile',clock_timestamp(), v_processed;