-
Notifications
You must be signed in to change notification settings - Fork 1
/
ZINCLUDE_SEND_TO_KAFKA.txt
420 lines (365 loc) · 17 KB
/
ZINCLUDE_SEND_TO_KAFKA.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
* Copyright 2018-2020 Cargill Incorporated
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*&---------------------------------------------------------------------*
*& Include ZADD_TO_KAFKA_STAGING
*&---------------------------------------------------------------------*
DATA lv_count TYPE i.
DATA lv_tablename TYPE tabname.
FIELD-SYMBOLS: <any_tab> TYPE ANY TABLE.
FIELD-SYMBOLS <fs_record> TYPE any.
FIELD-SYMBOLS <lv_operation> TYPE any.
FIELD-SYMBOLS <lv_change_flag> TYPE char1.
FIELD-SYMBOLS <lv_update_ts> TYPE timestamp.
FIELD-SYMBOLS <lv_recordno> TYPE any.
FIELD-SYMBOLS <fs_record_src> TYPE any.
FIELD-SYMBOLS <ls_staging> TYPE zkafka_error.
CONSTANTS lc_content_type TYPE string VALUE 'application/vnd.kafka.json.v2+json'.
CONSTANTS lc_kafka_variant TYPE rvari_vnam VALUE 'ZBI_KAFKA_SERVER'.
DATA lv_kafka_url TYPE string.
DATA lv_kafka_url_topic TYPE string.
DATA lv_response_string TYPE string.
DATA lv_return_code TYPE i.
DATA lv_reason TYPE string.
DATA lo_json TYPE REF TO /ui2/cl_json_serializer.
DATA lv_json TYPE string.
DATA lv_json_final TYPE string.
DATA lv_json_record TYPE string.
DATA lv_json_records TYPE string.
DATA lo_data TYPE REF TO data.
DATA lv_structure_name TYPE char50.
DATA lv_topic TYPE string.
DATA lv_topic_prefix TYPE string.
DATA lo_client TYPE REF TO if_http_client.
DATA lo_staging TYPE REF TO zcl_kafka.
DATA lv_tabname TYPE tabname.
DATA lv_table_string TYPE c LENGTH 100.
DATA lv_recordno TYPE int4.
DATA lv_timestamp TYPE timestampl.
DATA lv_start_ts TYPE timestampl.
DATA lv_end_ts TYPE timestampl.
DATA lv_batch_ts TYPE timestampl.
DATA lv_batch_end_ts TYPE timestampl.
DATA lv_runtime TYPE tzntstmpl.
DATA ls_staging TYPE zkafka_error.
DATA lt_staging TYPE STANDARD TABLE OF zkafka_error.
DATA lv_guid TYPE sysuuid_x16.
DATA lv_jobcount TYPE tbtcjob-jobcount.
DATA lv_jobname TYPE tbtcjob-jobname.
DATA lv_index TYPE i.
DATA lv_batch_rec_count TYPE i."send 20 records
DATA lv_batch_count TYPE i."send 20 records
DATA lv_batch_size TYPE i VALUE 1000.
DATA lv_total TYPE i.
DATA lv_success TYPE i.
DATA lv_message TYPE char255.
DATA lv_retry TYPE c.
DATA lv_retry_attempts TYPE i.
DATA lv_log_ts TYPE timestampl.
DATA ls_tbl_log TYPE zkafka_tbl_log.
DATA lv_change_flag TYPE c.
DATA lv_update_count TYPE i.
DATA lv_insert_count TYPE i.
DATA lv_delete_count TYPE i.
DATA lv_logging_flag TYPE c.
DATA ls_error TYPE zkafka_error.
DATA lv_error_occurred TYPE c.
DATA lv_error_count TYPE i.
FIELD-SYMBOLS <lv_belnr> TYPE any.
DATA lv_total_string TYPE c LENGTH 10.
DATA lv_run_guid TYPE sysuuid_x16.
CLEAR: lv_error_count, lv_error_occurred, lt_staging, lv_batch_rec_count, lv_batch_count, lv_return_code,
lv_insert_count, lv_update_count, lv_delete_count.
lv_message = |Begin Configuration: { _mt_id ALIGN = LEFT } Table: { _cobj_alias ALIGN = LEFT } |.
MESSAGE lv_message TYPE 'S'.
* Cluster/Pool tables need to change the table. We could change this to a look up to some custom table
IF _cobj_alias = 'RFBLG'.
lv_tabname = 'BSEG'.
ELSEIF _cobj_alias = 'ATAB'.
lv_tabname = 'T001T'.
ELSEIF _cobj_alias = 'CDCLS'.
lv_tabname = 'CDPOS'.
ELSEIF _cobj_alias = 'KOCLU'.
lv_tabname = 'KONV'.
ELSEIF _cobj_alias = 'REGUC'.
lv_tabname = 'REGUP'.
ELSE.
lv_tabname = _cobj_alias.
ENDIF.
CONCATENATE '_IT_S_' lv_tabname INTO lv_tablename.
ASSIGN (lv_tablename) TO <any_tab>.
CONCATENATE '<WA_S_' lv_tabname '>' INTO lv_structure_name.
ASSIGN (lv_structure_name) TO <fs_record_src>.
CONCATENATE '<WA_R_' lv_tabname '>' INTO lv_structure_name.
ASSIGN (lv_structure_name) TO <fs_record>.
lv_count = lines( <any_tab> ).
"IF lv_tabname = 'BSEG' OR lv_tabname = 'T001T'.
" lv_total = lines( <any_tab> ).
" lv_message = |Regards in ANY_TAB { lv_total ALIGN = LEFT } |.
" MESSAGE lv_message TYPE 'S'.
" clear lv_total.
"ENDIF.
lv_total = lines( <any_tab> ).
lv_message = |Begin Program: { _mt_id ALIGN = LEFT } Table: { _cobj_alias ALIGN = LEFT } Rows: { lv_total ALIGN = LEFT } |.
MESSAGE lv_message TYPE 'S'.
IF lv_total > 0. " Only do the below code if there are records in the loop
* Remove Intitial / and Replace others with _
IF lv_tabname(1) = '/'.
SHIFT lv_tabname BY 1 PLACES.
ENDIF.
REPLACE ALL OCCURENCES OF '/' IN lv_tabname WITH '_'.
ls_staging-mt_id = _mt_id.
ls_staging-tablename = lv_tabname.
GET TIME STAMP FIELD lv_timestamp.
ls_staging-createdon = lv_timestamp.
"CALL FUNCTION 'RRBA_CONVERT_TIMESTAMP_TO_STR'
"EXPORTING
" i_timestamp = lv_timestamp
"IMPORTING
" e_output = ls_staging-createdonstring.
ls_staging-createdonstring = lv_timestamp.
* Table Specific Topic
SELECT topicname INTO lv_topic FROM zkafka_tbl_topic WHERE mt_id = _mt_id AND tablename = lv_tabname.
ENDSELECT.
IF lv_topic IS INITIAL OR lv_topic = ''.
" MT Specific Topic if there is no table specific
SELECT topic_prefix INTO lv_topic_prefix FROM zkafka_mt_tpcpre WHERE mt_id = _mt_id.
ENDSELECT.
CONCATENATE lv_topic_prefix '.' lv_tabname INTO lv_topic.
IF lv_topic IS INITIAL OR lv_topic = ''.
lv_message = |No Topic set up in table ZKAFKA_TBL_TOPIC for Table { _mt_id ALIGN = LEFT } { lv_tabname ALIGN = LEFT } |.
MESSAGE lv_message TYPE 'E'.
ENDIF.
* EXIT.
ENDIF.
ls_staging-topicname = lv_topic.
* This section of the code looks up whether the table is set up for extra logging
SELECT logging_flag INTO lv_logging_flag FROM zkafka_tbl_topic WHERE mt_id = _mt_id AND tablename = lv_tabname.
ENDSELECT.
IF lv_logging_flag = 'X'.
lv_guid = cl_system_uuid=>create_uuid_x16_static( ).
ls_tbl_log-log_guid = lv_guid.
ls_tbl_log-starttime = lv_timestamp.
ls_tbl_log-mt_id = _mt_id.
ls_tbl_log-topicname = lv_topic.
ls_tbl_log-tablename = lv_tabname.
ENDIF.
* Section of the code that loads any of the Error Records to lt_staging
SELECT mandt entryid mt_id tablename topicname returncode message entrydata createdon recno createdonstring
FROM zkafka_error INTO TABLE lt_staging WHERE mt_id = _mt_id AND tablename = lv_tabname .
SORT lt_staging BY createdon recno.
lv_error_count = lines( lt_staging ).
* End of section for handling previous error records
LOOP AT <any_tab> ASSIGNING <fs_record_src>.
* lv_message = |Begin Loop: { _mt_id ALIGN = LEFT } Table: { _cobj_alias ALIGN = LEFT } |.
* MESSAGE lv_message TYPE 'S'.
lv_index = syst-tabix.
lv_guid = cl_system_uuid=>create_uuid_x16_static( ).
ls_staging-entryid = lv_guid.
ASSIGN COMPONENT '_RECNO' OF STRUCTURE <fs_record_src> TO <lv_recordno>.
MOVE-CORRESPONDING <fs_record_src> TO <fs_record>.
IF <fs_record> IS ASSIGNED.
CLEAR lv_json.
CLEAR lv_json_final.
CLEAR lv_kafka_url.
* Check the change flag
ASSIGN COMPONENT 'IUUC_OPERAT_FLAG' OF STRUCTURE <fs_record> TO <lv_operation>.
IF sy-subrc = 0.
* Add the change flag
ASSIGN COMPONENT 'CHANGE_FLAG' OF STRUCTURE <fs_record> TO <lv_change_flag>.
IF sy-subrc EQ 0 AND <lv_change_flag> IS ASSIGNED.
<lv_change_flag> = <lv_operation>.
lv_change_flag = <lv_operation>.
ENDIF.
ELSE. " There is no IUUC_OPERAT_FLAG, so it's an initial load
ASSIGN COMPONENT 'CHANGE_FLAG' OF STRUCTURE <fs_record> TO <lv_change_flag>.
IF sy-subrc EQ 0 AND <lv_change_flag> IS ASSIGNED.
<lv_change_flag> = 'N'.
lv_change_flag = 'N'.
ENDIF.
ENDIF.
IF lv_change_flag = 'N' OR lv_change_flag = 'I'.
lv_insert_count = lv_insert_count + 1.
ELSEIF lv_change_flag = 'U'.
lv_update_count = lv_update_count + 1.
ELSEIF lv_change_flag = 'D'.
lv_delete_count = lv_delete_count + 1.
ENDIF.
ASSIGN COMPONENT 'UPDATE_TS' OF STRUCTURE <fs_record> TO <lv_update_ts>.
IF sy-subrc = 0.
GET TIME STAMP FIELD <lv_update_ts>.
ENDIF.
* Get the reoord number
ASSIGN COMPONENT '_RECNO' OF STRUCTURE <fs_record> TO <lv_recordno>.
lv_recordno = <lv_recordno>.
GET REFERENCE OF <fs_record> INTO lo_data.
CREATE OBJECT lo_json.
lv_json = lo_json->/ui2/if_serialize~serialize( EXPORTING ir_data = lo_data ).
IF lv_json IS NOT INITIAL.
* CONCATENATE '{"records":[{"value":' lv_json '}]}' INTO lv_json_final."converting the json into kafka's internal format
* add the table name to the json string,
* TRANSLATE lv_topic TO UPPER CASE. Unnecessary
lv_tabname = lv_tabname.
lv_table_string = |\{"origintable":"{ lv_tabname }",|.
REPLACE FIRST OCCURRENCE OF '{' IN lv_json WITH lv_table_string.
lv_json_final = lv_json.
* This is just for testing
* ls_staging-message = strlen( lv_json_final ) .
* CONDENSE ls_staging-message.
ls_staging-entrydata = lv_json_final.
ls_staging-recno = lv_recordno.
APPEND ls_staging TO lt_staging.
ENDIF.
ENDIF.
ENDLOOP.
*IF lv_tabname = 'BSEG' .
* lv_run_guid = cl_system_uuid=>create_uuid_x16_static( ).
* ls_staging-run_guid = lv_run_guid.
* INSERT ZKAFKA_STAGING FROM TABLE lt_staging.
* COMMIT WORK AND WAIT.
*ENDIF.
*INSERT ZKAFKA_STAGING FROM TABLE lt_staging.
*COMMIT WORK AND WAIT.
GET TIME STAMP FIELD lv_start_ts.
lv_total = lines( lt_staging ).
SELECT SINGLE low INTO lv_kafka_url FROM tvarvc WHERE name = lc_kafka_variant AND type = 'P' AND numb = space.
IF sy-subrc EQ 0 AND lv_kafka_url IS NOT INITIAL.
WRITE: / 'Attempting to send a total of ', lv_total, ' records to Kafka at', lv_start_ts .
lv_message = |{ lv_topic ALIGN = LEFT } - Attempting to send a total of: { lv_total ALIGN = LEFT } records to Kafka at: { lv_start_ts ALIGN = LEFT }|.
"character_string = |The length of text element 001 ({ text–001 }) is { strlen( text–001 ) }|.
MESSAGE lv_message TYPE 'S'.
CLEAR lv_total.
LOOP AT lt_staging ASSIGNING <ls_staging>.
IF lv_error_occurred <> 'X'.
ADD 1 TO lv_total.
CLEAR lv_kafka_url_topic.
* IF <ls_staging>-topicname IS NOT INITIAL.
* lv_topic = <ls_staging>-topicname.
* ELSE.
IF lv_topic IS INITIAL.
lv_topic = 'slt.no_topic'.
ENDIF.
ADD 1 TO lv_batch_rec_count.
CLEAR lv_json_record.
CONCATENATE '{"value":' <ls_staging>-entrydata '}' INTO lv_json_record."convert each entry into kafka format
IF lv_json_records IS NOT INITIAL.
CONCATENATE lv_json_records lv_json_record INTO lv_json_records SEPARATED BY ','."create entry strings
ELSE.
lv_json_records = lv_json_record."first entry
ENDIF.
*
* Either batch size is reached or table end has been reached
IF lv_batch_rec_count GE lv_batch_size OR lv_total EQ lines( lt_staging ).
ADD 1 TO lv_batch_count.
CONCATENATE '{"records":[' lv_json_records ']}' INTO lv_json_final."converting the json payload into kafka's internal format
IF lv_topic IS NOT INITIAL.
CONCATENATE lv_kafka_url '/' lv_topic '/' INTO lv_kafka_url_topic.
ELSE.
lv_kafka_url_topic = lv_kafka_url.
ENDIF.
* Call the KAFKA API. If it fails try again every 30 seconds for an hour and then every 5 minutes after that
* lv_retry = 'X'.
* lv_retry_attempts = 0.
* WHILE lv_retry = 'X'.
"replace all occurrences of regex '[^[:print:]]' in lv_json_final with ' '.
"replace all occurrences of regex ':<.' in lv_json_final with ':0.'.
REPLACE ALL OCCURRENCES OF REGEX '":<' IN lv_json_final WITH '":'.
REPLACE ALL OCCURRENCES OF REGEX '\/' IN lv_json_final WITH '/'.
TRANSLATE lv_kafka_url_topic TO LOWER CASE.
CALL METHOD cl_http_client=>create_by_url
EXPORTING
url = lv_kafka_url_topic
IMPORTING
client = lo_client.
lo_client->request->set_method( method = if_http_request=>co_request_method_post ).
lo_client->request->set_content_type( content_type = lc_content_type ).
lo_client->request->set_cdata( lv_json_final ).
GET TIME STAMP FIELD lv_batch_ts.
lv_message = |Sending batch - { lv_batch_count ALIGN = LEFT } - with { lv_batch_rec_count ALIGN = LEFT } record(s) at { lv_batch_ts ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
lo_client->send(
EXCEPTIONS
http_communication_failure = 1
http_invalid_state = 2
).
IF sy-subrc <> 0.
lv_message = |Send Failed - { sy-subrc ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
ENDIF.
lo_client->receive(
EXCEPTIONS
http_communication_failure = 1
http_invalid_state = 2
http_processing_failed = 3
).
IF sy-subrc <> 0.
lv_message = |Receive Failed - { sy-subrc ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
ENDIF.
CLEAR: lv_return_code, lv_reason.
lv_response_string = lo_client->response->get_cdata( ).
lo_client->response->get_status( IMPORTING code = lv_return_code reason = lv_reason ).
lo_client->close( ).
IF lv_return_code = 200."delete the entries
* lv_retry = ''. " Clear the Retry Flag
IF lv_error_count > 0.
DELETE FROM zkafka_error WHERE mt_id = _mt_id AND tablename = lv_tabname
AND ( createdonstring < <ls_staging>-createdonstring OR ( createdonstring = <ls_staging>-createdonstring AND recno <= <ls_staging>-recno ) ).
ENDIF.
ADD lv_batch_rec_count TO lv_success.
lv_message = |Total Records sent to Kafka in sub-batch: { lv_batch_rec_count ALIGN = LEFT }. Return Code: { lv_return_code ALIGN = LEFT }. Reason: { lv_reason ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
ELSE."reset the entries
lv_message = |Batch - { lv_batch_count ALIGN = LEFT } - ended with return code: { lv_return_code ALIGN = LEFT } and message: { lv_reason ALIGN = LEFT }|.
MESSAGE lv_message TYPE 'S'.
ls_staging-returncode = lv_return_code.
ls_staging-message = lv_reason.
MODIFY lt_staging FROM ls_staging TRANSPORTING returncode message WHERE message IS INITIAL.
MODIFY zkafka_error FROM TABLE lt_staging.
COMMIT WORK AND WAIT.
lv_error_occurred = 'X'.
* MESSAGE lv_json_final TYPE 'S'.
* IF lv_retry_attempts >= 120. "Retried for an hour
* lv_message = |Retries failed|.
* MESSAGE lv_message TYPE 'E'.
* WAIT UP TO 570 SECONDS. " I'm not certain that messages would actually stop the job I believe it will keep trying every 5 minutes forever
* ENDIF.
* WAIT UP TO 30 SECONDS.
ENDIF.
* lv_retry_attempts = lv_retry_attempts + 1.
* ENDWHILE.
GET TIME STAMP FIELD lv_batch_end_ts.
lv_runtime = cl_abap_tstmp=>subtract( tstmp1 = lv_batch_ts tstmp2 = lv_batch_end_ts ).
lv_message = |Batch - { lv_batch_count ALIGN = LEFT } - processing time: { lv_runtime ALIGN = LEFT }|.
CLEAR lv_batch_rec_count.
CLEAR lv_json_records.
ENDIF.
ENDIF. " Checking whether an error occurred sending to Kafka
ENDLOOP.
GET TIME STAMP FIELD lv_end_ts.
" Insert record into logging table if flag is set
IF lv_logging_flag = 'X'.
ls_tbl_log-endtime = lv_end_ts.
ls_tbl_log-inserted_records = lv_insert_count.
ls_tbl_log-updated_records = lv_update_count.
ls_tbl_log-deleted_records = lv_delete_count.
ls_tbl_log-total_records = lv_success.
INSERT INTO zkafka_tbl_log VALUES ls_tbl_log.
COMMIT WORK AND WAIT.
ENDIF.
CLEAR lv_runtime.
lv_runtime = cl_abap_tstmp=>subtract( tstmp1 = lv_end_ts tstmp2 = lv_start_ts ).
lv_message = |Total Records Processed: { lv_total ALIGN = LEFT }. Successfully Posted: { lv_success ALIGN = LEFT }. Total Runtime(secs): { lv_runtime ALIGN = LEFT DECIMALS = 0 } |.
MESSAGE lv_message TYPE 'S'.
ENDIF.
ENDIF.
CLEAR <any_tab>. " This is necessary because cluster tables reprocess the same records over and over.