-
Notifications
You must be signed in to change notification settings - Fork 1
/
util_pesquisaelastic_facil.py
1234 lines (1146 loc) · 62.7 KB
/
util_pesquisaelastic_facil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Classe:
# - PesquisaElasticFacil : Componente python que simplifica a construção de queries no ElasticSearch
# e aproxima o uso dos operadores de proximidade comuns no BRS em queries
# internas do ElasticSearch. Permite aproveitar o conhecimento de usuários BRS
# ao receber critérios de proximidade usados no BRS (PROX, ADJ, COM) e convertê-los
# para os critérios do elastic, bem como simplificar a forma de escrita dos critérios
# de pesquisa e traduzi-los para conjuntos mais robustos de pesquisa no ElasticSearch.
# Esse código, dicas de uso e outras informações:
# -> https://github.com/luizanisio/PesquisaElasticFacil/
# Luiz Anísio
# Ver 0.1.0 - 03/10/2021 - disponibilizado no GitHub
# Ver 0.1.1 - 03/10/2021 - highlight no MLT
# Ver 0.1.2 - 05/10/2021 - ajustes MLT, lower() e limpeza dos termos de pesquisa
# - no elastic número com ./- vira _ para juntar os tokens
# - no python, a pesquisa usa regex "(\\d+)[\\.\\-\\/](?=\\d)" para $1_
# - corrige numeros puros para números com _? 1234 ==> 1_?234
# - corrige curingas repetidos
# - testes de transformação de curingas
# - testes de transformação de operadores finais
# - pesquisas inteligentes:
# - ADJn: lista de termos NÃO (lista de termos)
# - PROXn: lista de termos NÃO (lista de termos)
# Ver 0.1.3 - 06/10/2021 - correção termo único na pesquisa, correção de aspas simples e inclusão de mais testes
# Ver 0.1.4 - 06/10/2021 - correção slop
# Ver 0.1.5 - 06/10/2021 - termos entre aspas usa o campo_texto_raw para todos do slop
# Ver 0.1.6 - 07/10/2021 - campo raw é um sufixo no campo principal (o mapeamento no elastic normalmente é assim)
# - Grupos de pesquisas e campos disponíveis GruposPesquisaElasticFacil
# - Pesquisa CONTÉM automática com textos grandes copiados (>100 caracters com artigos e símbolos)
# Ver 0.2.0 - 14/10/2021 - Grupos de pesquisas e campos disponíveis GruposPesquisaElasticFacil
# - Otimizações e correções
# - mensagens e alertas para apresentação ao usuário
# Ver 0.2.1 - 25/10/2021 - .raw ou raw nos sufixos
# Ver 0.3.0 - 26/10/2021 - testes unitários e pequenas correções na tokenização
#
# TODO:
# - ampliar casos de teste
import re
from unicodedata import normalize
import json
from copy import deepcopy
CRITERIO_CAMPO_HIGHLIGHT = {"require_field_match": False,"max_analyzed_offset": 1000000}
ERRO_PARENTESES_FALTA_FECHAR = 'Parênteses incompletos nos critérios de pesquisa - falta fechamento de parênteses.'
ERRO_PARENTESES_FECHOU_MAIS = 'Parênteses incompletos nos critérios de pesquisa - há fechamento excedente de parênteses.'
ERRO_OPERADOR_CAMPO_PESQUISA = 'campos_pesquisa: Não são aceitos operadores de campo em pesquisas simples.'
ERRO_OPERADOR_CAMPO_PARENTESES = 'campos_pesquisa: Não são aceitos operadores de campo dentro de parênteses.'
ERRO_PARENTESES_CAMPO = 'Não são aceitos operadores de campo dentro de parênteses.'
ERRO_OPERADOR_OU_CAMPO_RAIZ = 'Não são aceitos operadores OU entre critérios de grupo e critérios simples'
PRINT_DEBUG = False
PRINT_WARNING = True
###########################################################
# Controla o uso de operadores válidos nas pesquisas,
# diferenciando os operadores dos termos de pesquisa
#----------------------------------------------------------
class Operadores():
RE_LIMPAR_CRITERIOS_INICIAIS = re.compile(r'(?<=\D)[,\.\/](?=\D)|(?<=\d)[,\.\/](?=\D)|(?<=\D)[,\.\/](?=\d)|[+\-:\[\]\{\}`´@!#%¨&=]',re.IGNORECASE)
RE_TOKEN_CRITERIOS = re.compile(r'(adjc?\d*|proxc?\d*|com)$',re.IGNORECASE)
RE_TOKEN_CRITERIOS_AGRUPAMENTO = re.compile(r'(adjc?\d*|proxc?\d*|ou)$',re.IGNORECASE)
RE_TOKEN_ADJ = re.compile(r'adjc?\d*$',re.IGNORECASE)
RE_TOKEN_PROX = re.compile(r'proxc?\d*$',re.IGNORECASE)
RE_TOKEN_COM = re.compile(r'com$',re.IGNORECASE)
RE_TOKEN_N = re.compile(r'\d+')
RE_TERMO_NUMERICO = re.compile(r'[\d\?\*][\d\:\?\*\.\,\-\_\/]*[\d\?\*]$')
RE_TERMO_MILHAS = re.compile(r'[\d\?\*][\d\?\*\.\,]*[\d\?\*]$')
RE_TERMO_SO_CURINGA = re.compile(r'[\?\*\_]+$')
RE_TERMO_COM_CURINGA = re.compile(r'[\?\*\_]')
RE_TOKEN_QUEBRA_N = re.compile(r'[\d\.\-_\/\,\?\*\:]+$') # 123.233/2332-23,23 ou curingas - verifica se é um token numérico
RE_TOKEN_QUEBRA_N_FORMAT = re.compile(r'[\.\-_\/\,\:]+') # 123.233/2332-23,23 ou curingas - corrige símbolos por _
RE_TOKEN_OU = re.compile(r'ou$',re.IGNORECASE)
RE_TOKEN_E = re.compile(r'e$',re.IGNORECASE)
RE_TOKEN_INTERROGA = re.compile(r'([\?]+)')
RE_TOKEN_ASTERISCO = re.compile(r'([\*]+)')
RE_LIMPAR_TERMO_NAO_NUMERICO = re.compile(f'[^A-Za-z\d\?\*\"_]') # o token já estará sem acentos
RE_LIMPAR_TERMO_ASPAS = re.compile(f'( \")|(\" )') # o token já estará sem acentos
RE_LIMPAR_TERMO_MLT = re.compile(f'[^A-Za-z\d]') # tokens limpos de pesquisa
#RE_OPERADOR_CAMPOS_GRUPOS = re.compile(r'(\.\w+\.\()|(\s+n[ãa]o\s*\.\w+\.\()|(\s+e\s*\.\w+\.\()|(\s+ou\s*\.\w+\.\()', re.IGNORECASE)
RE_OPERADOR_CAMPOS_GRUPOS = re.compile(r'(\.\w+\.\()', re.IGNORECASE)
RE_OPERADOR_RANGE = re.compile(r'>=|<=|>|<|lte|gte|lt|gt', re.IGNORECASE)
OPERADOR_PADRAO = 'E'
OPERADOR_ADJ1 = 'ADJ1'
# retorna true se o token recebido é um critério conhecido
@classmethod
def e_operador(self,token):
if type(token) is not str:
return False
if token.lower() in ('e','ou','não','nao','com','and','or','not'):
return True
if self.RE_TOKEN_CRITERIOS.match(token):
#print('Critério: ', token, f'({self.n_do_criterio(token)})')
return True
return False
# retorna true se o token recebido é um ADJ
@classmethod
def e_operador_adj(self,token):
return self.RE_TOKEN_ADJ.match(token)
# retorna true se o token recebido é um ADJ
@classmethod
def e_operador_nao(self,token):
return token == 'NAO'
# retorna true se o token recebido é um ADJ
@classmethod
def e_operador_ou(self,token):
return self.RE_TOKEN_OU.match(token)
# retorna true se o token recebido é um ADJ
@classmethod
def e_operador_e(self,token):
return self.RE_TOKEN_E.match(token)
# retorna true se o token recebido é um PROX
@classmethod
def e_operador_prox(self,token):
return self.RE_TOKEN_PROX.match(token)
# retorna true se o token recebido é um COM
@classmethod
def e_operador_com(self,token):
return self.RE_TOKEN_COM.match(token)
# retorna true se o token é um critério slop
@classmethod
def e_operador_slop(self,token):
return self.e_operador_adj(token) or self.e_operador_prox(token)
# retorna true se o token é um critério que pode vir antes/depois de parênteses
@classmethod
def e_operador_que_pode_antes_depois_parenteses(self,token):
return self.e_operador_ou(token) or self.e_operador_nao(token) or self.e_operador_e(token)
# retorna n do critério
@classmethod
def n_do_operador(self,token):
n = self.RE_TOKEN_N.findall(token)
if not any(n):
return 1
return int(n[0])
# retorno o tipo e o n
@classmethod
def get_operador_n(self, token):
n = self.n_do_operador(token)
criterio = self.RE_TOKEN_N.sub('', token)
return criterio, n
# retorno do critério se ele for um critério de agrupamento
@classmethod
def get_operador_agrupamento(self, token):
if type(token) is not str:
return ''
criterio = self.RE_TOKEN_N.sub('', token)
if self.RE_TOKEN_CRITERIOS_AGRUPAMENTO.match(criterio):
return criterio
return ''
# formatar termos e operadores de pesquisa
@classmethod
def formatar_token(self, token):
if self.e_operador(token):
return self.formatar_operador(token)
return self.formatar_termo(token)
# formata os tokens e quebra tokens com caracteres estranhos como termo1:termo2
@classmethod
def formatar_tokens(self, criterios_lista):
#print('Formatar tokens: ', criterios_lista)
res = []
for token in criterios_lista:
if type(token) is list:
res.append(self.formatar_tokens(token))
else:
_tk = self.formatar_token(token)
if (not _tk) or self.RE_TERMO_SO_CURINGA.match(_tk):
continue
tokens = _tk.split(' ')
res += [_ for _ in tokens if not self.RE_TERMO_SO_CURINGA.match(_) ]
#print('- tokens: ', criterios_lista)
return res
# dica em https://pt.stackoverflow.com/questions/8526/tratar-n%C3%BAmeros-python-adicionando-ponto
@classmethod
def formatar_numeros_milhar(self, s):
if s.find('.')>=0: return s
virgula = f'{s},'.split(',')
if virgula[1]: virgula[1] = ','+virgula[1]
#print('Virgula: ', virgula)
s = virgula[0]
return s + virgula[1] if len(s) <= 3 else self.formatar_numeros_milhar(s[:-3]) + '.' + s[-3:] + virgula[1]
# formata os termos para apresentação ou para pesquisa
@classmethod
def formatar_termo(self, termo):
# tratamento padrão
termo = Operadores.remover_acentos(termo).replace("'",'"').replace('$','*')
if not self.RE_TERMO_NUMERICO.match(termo):
#print('Limpando termo não numérico: ', termo)
termo = Operadores.RE_LIMPAR_TERMO_NAO_NUMERICO.sub(' ', termo).strip()
termo = Operadores.RE_LIMPAR_TERMO_ASPAS.sub('"', termo).strip()
#print('Termo limpo: ', termo)
# para apresentação mostra o termo real somente sem acento
return termo
@classmethod
# formata os tokens de critérios para padronização
def formatar_operador(self, operador):
if self.e_operador_adj(operador):
cr = f'ADJ{self.n_do_operador(operador)}'
self.contem_operadores_brs = True
elif self.e_operador_prox(operador):
cr = f'PROX{self.n_do_operador(operador)}'
self.contem_operadores_brs = True
elif self.e_operador_com(operador):
n = self.n_do_operador(operador)
# todo avaliar melhor solução para se
# aproximar do operador COM - mesmo parágrafo
#cr = f'COM{n}' if n>1 else 'COM'
cr = 'PROX30'
self.contem_operadores_brs = True
else:
self.contem_operadores = True
cr = operador.upper()
cr = 'E' if cr == 'AND' else cr
cr = 'OU' if cr == 'OR' else cr
cr = 'NAO' if cr in ('NOT','NÃO') else cr
return cr
# retorna o operador e o n do operador que sera analisado no grupo
# entende-se que chegando aqui os grupos já foram separados por operadores especiais
# no caso de operadores especiais misturados com grupos, retorna um erro
@classmethod
def operador_n_do_grupo(self, grupo):
assert type(grupo) is list, 'O grupo deve ser do tipo lista para ter um operador'
operador, n = self.OPERADOR_PADRAO, 0
tem_grupo = False
tem_slop = False
tem_simples = False
for token in grupo:
if type(token) is list:
tem_grupo = True
continue
#print('Token: ', token, 'Operador: ', self.e_operador(token), 'Slop:', self.e_operador_slop(token) )
if self.e_operador(token):
novo, novo_n = self.get_operador_n(token)
if self.e_operador_slop(novo):
tem_slop = True
elif not self.e_operador_nao(novo):
tem_simples = True
if not self.e_operador_nao(novo):
n = max(n,novo_n) # busca o maior n
operador = novo
# tem operador prox ou adj misturado com simples ou grupo, retorna erro
if tem_slop and (tem_simples or tem_grupo):
_msg = f'Operadores: foi encontrado um grupo com operadores simples e de proximidade juntos: {grupo}'
raise Exception(_msg)
return operador, n
# substitui critérios de ? por .{0,n} para permitir ser opcional
@classmethod
def termo_regex_interroga(self, termo):
if termo.find('*')>=0:
# corrige * seguido mas não coloca o .* ainda
# para não atrapalhar o controle de números \d.\d
termo = self.RE_TOKEN_ASTERISCO.sub('*', termo)
termo = self.formatar_termo_numerico_pesquisa(termo)
termo = termo.replace('_?','_!') # curinga de números para não substituir
termos = self.RE_TOKEN_INTERROGA.split(termo)
termo = ''
for t in termos:
if t.find('?')>=0:
termo = termo + '.{' + f'0,{len(t)}' + '}'
elif t:
termo += t
termo = termo.replace('_!','_?') # curinga de números retornando
return termo.replace('*','.*')
@classmethod
def formatar_termo_numerico_pesquisa(self, termo):
if not self.RE_TERMO_NUMERICO.match(termo):
return termo
# números puros com . ou , divide os milhares e insere _?
if self.RE_TERMO_MILHAS.match(termo):
#print('milhas: ', termo)
termo = self.formatar_numeros_milhar(termo)
#print('milhas saída: ', termo)
# números com ./-,_ substitui por _?
if self.RE_TOKEN_QUEBRA_N.match(termo):
#print('quebra n: ', termo)
termo = self.RE_TOKEN_QUEBRA_N_FORMAT.sub('_?', termo)
#print('quebra n saída: ', termo)
return termo
# sobrou separadores numéricos, remove para quebrar o token
#print('termos format: ', termo)
termo = self.RE_LIMPAR_TERMO_NAO_NUMERICO.sub(' ',termo).strip()
#print('saída termos format: ', termo)
return termo
@classmethod
def remover_acentos(self, txt):
return normalize('NFKD', txt).encode('ASCII', 'ignore').decode('ASCII')
# retorna o campo texto ou o campo texto com o sufico raw se existirem critérios entre aspas
# unico = True exige que mesmo existindo uma lista de campos, somento o primeiro seja retornado
# para resolver problemas de operadores que trabalham com um campo apenas
@classmethod
def campo_texto_grupo(self, criterios, campo_texto, sufixo_campo_raw, unico = False):
aspas = filter(lambda k:type(k) is str and k.find('"')>=0,criterios)
if not any(aspas):
res = campo_texto
elif type(campo_texto) is str:
res = f'{campo_texto}{sufixo_campo_raw}'
else:
# coloca o sufixo em todos os campos
res = [f'{_}{sufixo_campo_raw}' for _ in campo_texto]
if (not unico) or (type(res) is str):
return res
return list(res)[0] if any(res) else 'texto'
@classmethod
def campo_texto_termo(self, termo, campo_texto, sufixo_campo_raw, unico = False):
# o termo pode vir como string ou como uma sublista - ao colocar em uma lista, tem o mesmo comportamento de grupo
return self.campo_texto_grupo(criterios=[termo], campo_texto=campo_texto, sufixo_campo_raw=sufixo_campo_raw, unico=unico)
@classmethod
def contem_operador_agrupado(self, criterios):
return self.RE_OPERADOR_CAMPOS_GRUPOS.search(criterios)
###########################################################
# Recebe um critério de pesquisa livre estilo BRS
# e aproxima ele no que for possível para rodar uma
# query do elasticsearch
#----------------------------------------------------------
class PesquisaElasticFacil():
# critérios prox e adj serão agrupados pelo maior valor para usar o slop
# critérios prox e adj antes e depois de parênteses serão substituídos por E
# termos seguidos sem eperadores serão considerados com o operador E entre eles
# not sem parênteses será aplicado apenas ao próximo termo
# contem: transforma em more like this - aceita NÃO (lista de termos)
# PROXn: transforma em slop(n não ordenado) - aceita NÃO (lista de termos)
# ADJn: transforma em slop(n ordenado) - aceita NÃO (lista de termos)
# o sufixo_campo_raw identifica o sufixo de campo para termos entre aspas
# e_subgrupo_pesquisa apenas identifica que está rodando a pesquisa de dentro de um grupo de campo para melhorar as mensagens de erro
RE_CONTEM = re.compile('^cont[eé]m:', re.IGNORECASE)
RE_INTELIGENTE = re.compile('^(adj\d*|prox\d*|cont[ée]m):', re.IGNORECASE)
RE_CONTEM_INTELIGENTE = re.compile(r'\bd?[aiou]\b|\bde\b|\b[a-z]\b')
RE_CONTEM_INTELIGENTE_SIMBOLOS = re.compile(r'\D[,\/]\D|[{}\[\]:]|^[,\/]|[,\/]$')
RE_NAO_CONTEM_INTELIGENTE = re.compile(r'\b(adj\d*|prox?\d*|com)\b',re.IGNORECASE)
RE_NAO = re.compile(r'\s+n[aã]o\s*\([^\)]+\)')
RE_NAO_LIMPAR = re.compile(r'(\s+n[aã]o\s*\()|(\()|(\))')
def __init__(self, criterios_originais, campo_texto = 'texto', sufixo_campo_raw = None, e_subgrupo_pesquisa = False):
self.pesquisa_inteligente = self.RE_INTELIGENTE.match(criterios_originais)
self.criterios_originais = str(criterios_originais).strip()
self.contem_operadores_brs = False
self.contem_operadores = False
self.campo_texto = str(campo_texto)
self.sufixo_campo_raw = '' if not sufixo_campo_raw else str(sufixo_campo_raw)
self.sufixo_campo_raw = f'.{self.sufixo_campo_raw}' if self.sufixo_campo_raw and self.sufixo_campo_raw[0] !='.' else self.sufixo_campo_raw
self.criterios_listas = []
self.avisos = [] # registra sugestões de avisos para o usuário
# valida se a pesquisa contém operadores de campos pois não é aceito nessa classe
self.e_subgrupo_pesquisa = e_subgrupo_pesquisa
if Operadores.RE_OPERADOR_CAMPOS_GRUPOS.search(criterios_originais):
if self.e_subgrupo_pesquisa:
raise ValueError(ERRO_OPERADOR_CAMPO_PARENTESES)
else:
raise ValueError(ERRO_OPERADOR_CAMPO_PESQUISA)
# começar os critérios com : é um escape para essa análise automática
# mais de 50 de tamanho, não tem adj ou prox ou campo agrupado
# e com mais de um caracteres estranhos às pesquisas, considera "contém:"
if not self.pesquisa_inteligente and self.criterios_originais[:1] != ':' :
_teste = Operadores.remover_acentos(self.criterios_originais).lower()
if Operadores.RE_TOKEN_ADJ.search(_teste) or \
Operadores.RE_TOKEN_PROX.search(_teste) or \
Operadores.RE_OPERADOR_CAMPOS_GRUPOS.search(_teste) or \
self.RE_NAO_CONTEM_INTELIGENTE.search(_teste):
pass
else:
qtd_quebrados = len(self.RE_CONTEM_INTELIGENTE.findall(_teste))
simbolos = self.RE_CONTEM_INTELIGENTE_SIMBOLOS.search(_teste)
if simbolos or (qtd_quebrados > 1):
if PRINT_WARNING: print(f'Critério CONTÉM: inserido >> {len(self.criterios_originais)} caracteres e {qtd_quebrados} termos não pesquisáveis')
self.pesquisa_inteligente = True
self.criterios_originais = f'CONTÉM: {self.criterios_originais}'
self.avisos.append('Critério "CONTÉM:" inserido automaticamente ao identificar o conteúdo como texto. Use ":" antes da pesquisa para desativá-lo.')
if self.criterios_originais[:1] == ':':
self.criterios_originais = self.criterios_originais[1:]
# realiza a construção das pesquisas
if self.pesquisa_inteligente:
self.executar_pesquisa_inteligente()
else:
# limpa os símbolos não tokenizáveis
_criterios = Operadores.RE_LIMPAR_CRITERIOS_INICIAIS.sub(' ',self.criterios_originais)
# transforma os critérios agrupados em lista e sublistas de critérios
_criterios = self.converter_parenteses_para_listas(_criterios)
_criterios = self.corrigir_sublistas_desnecessarias(_criterios)
# unindo os termos entre aspas agrupando entre parênteses e ADJ1
_criterios = self.juntar_aspas(_criterios)
# formata os termos e operadores e quebrar tokens
_criterios = Operadores.formatar_tokens(_criterios)
# agrupando os critérios em parênteses próprios
_criterios = self.corrigir_criterios_e_reagrupar(_criterios)
_criterios = self.corrigir_sublistas_desnecessarias(_criterios)
# corrige regras de operadores
_criterios = self.corrigir_lista_de_operadores(_criterios)
_criterios = self.corrigir_sublistas_desnecessarias(_criterios)
# pode ser mostrado na interface do usuário como a classe interpretou os critérios
self.criterios_listas = _criterios
self.criterios_reformatado = self.reformatar_criterios(self.criterios_listas)
# critérios elastic pesquisa normal
self.criterios_elastic = self.as_query()
# cria a query com o highlight
self.criterios_elastic_highlight = deepcopy(self.criterios_elastic)
self.criterios_elastic_highlight['highlight'] = {"type" : "plain", "fields": { f"{campo_texto}": CRITERIO_CAMPO_HIGHLIGHT }}
self.criterios_elastic_highlight['_source'] = [""]
# recebe a primeira forma RAW escrita pelo usuário e converte em sublistas cada grupo de parênteses
# cria lsitas de listas dentro dos parênteses
# exemplo: ((teste1 teste2) e (teste3 teste4) teste5)
# vira : [['teste1','teste2'], ['teste3', 'teste4'], 'teste5']
def converter_parenteses_para_listas(self,criterios):
comp_msg = f' {ERRO_PARENTESES_CAMPO}' if self.e_subgrupo_pesquisa else ''
""" baseado em https://stackoverflow.com/questions/23185540/turn-a-string-with-nested-parenthesis-into-a-nested-list-python """
left = r'[(]'
right=r'[)]'
sep=r'\s'
pat = r'({}|{}|{})'.format(left, right, sep)
_criterios = str(criterios).replace("'",'"').replace('"',' " ')
tokens = re.split(pat, _criterios)
stack = [[]]
for x in tokens:
if not x or re.match(sep, x): continue
if re.match(left, x):
stack[-1].append([])
stack.append(stack[-1][-1])
elif re.match(right, x):
stack.pop()
if not stack:
raise ValueError(str(f'{ERRO_PARENTESES_FECHOU_MAIS}{comp_msg}'))
else:
stack[-1].append(x)
if len(stack) > 1:
print('criterios: ', criterios )
print('stack: ', stack)
raise ValueError(str(f'{ERRO_PARENTESES_FALTA_FECHAR}{comp_msg}'))
return stack.pop()
# agrupa OU pois é precedente dos critérios
# exemplos:
# termo1 E termo2 OU termo3 OU termo4 = termo1 E (termo2 OU termo3 OU termo4)
# termo1 E termo2 OU (termo3 adj2 termo4) = termo1 E (termo2 OU (termo3 adj2 termo4))
# termo1 OU termo2 termo3 = (termo1 OU termo2) termo3
# termo1 OU termo2 (termo3 termo4) = (termo1 OU termo2) (termo3 termo4)
# termo1 OU termo2 termo3 OU termo4 = (termo1 OU termo2) (termo3 OU termo4)
# termo1 OU termo2 (termo3 OU termo4 termo5) = (termo1 OU termo2) ((termo3 OU termo4) termo5)
# termo1 OU termo2 OU (termo3 OU termo4 termo5) = (termo1 OU termo2 OU ((termo3 OU termo4) termo5))
# Não permite
def corrigir_criterios_e_reagrupar(self,criterios_lista, recursivo = True):
# inicia a resposta e o grupo temporário
res = []
grupo = []
grupo_operador = ''
if PRINT_DEBUG: print(f'Agrupar (recursivo {recursivo}): {criterios_lista}')
for i, token in enumerate(criterios_lista):
operador_proximo = ''
token_anterior = ''
token_proximo = ''
# identifica o próximo token para análise
if i < len(criterios_lista) -1:
token_proximo = criterios_lista[i+1]
operador_proximo = Operadores.get_operador_agrupamento( token_proximo )
# identifica o token anterior
if i > 0:
token_anterior = criterios_lista[i-1]
#print(f'{operador_anterior} | {token} | {operador_proximo} >>> Grupo {grupo_operador} > {grupo}')
if recursivo:
# se o token atual for uma lista, agrupa antes de analisar
token = self.corrigir_criterios_e_reagrupar(token) if type(token) is list else token
# dois operadores seguidos, mantém o segundo
if type(token) is str and type(token_proximo) is str and \
Operadores.e_operador(token) and Operadores.e_operador(token_proximo):
continue
# termo com termo ou lista com termo, finaliza o agrupamento e continua avaliando
if (not Operadores.e_operador(token_anterior)) and \
(not Operadores.e_operador(token)) and \
any(grupo):
res.append(grupo)
grupo_operador = ''
grupo = []
if PRINT_DEBUG: print(f' - quebra termo termo: {token_anterior} >> {token}')
# próximo é um operador mas não é de agrupamento, finaliza o agrupamento com o token
if (not operador_proximo) and \
Operadores.e_operador(token_proximo) and \
any(grupo):
grupo.append(token)
res.append(grupo)
grupo_operador = ''
grupo = []
if PRINT_DEBUG: print(f' - quebra termo termo: {token_anterior} >> {token}')
# o próximo operador é um operador agrupado, insere no grupo
elif operador_proximo:
# se o grupo estiver em uso e for de outro operador, finaliza o grupo
if grupo_operador and grupo_operador != operador_proximo:
grupo.append(token)
res.append(grupo)
grupo=[ ]
if PRINT_DEBUG: print(' - novo grupo outro operador: ',token, grupo_operador, '|', operador_proximo)
# se for prox ou adj antes e depois, o token fica compartilhado nos dois grupos
if Operadores.e_operador_slop(operador_proximo) and \
Operadores.e_operador_slop(grupo_operador):
grupo.append(token)
grupo_operador = operador_proximo
else:
grupo_operador = ''
else:
# agrupa
grupo.append(token)
grupo_operador = operador_proximo
if PRINT_DEBUG: print(' - novo grupo: ',token, grupo_operador)
# se estiver agrupando, continua agrupando
elif any(grupo):
grupo.append(token)
# inclui como está
else:
res.append(token)
# insere o último agrupamento se existir
if any(grupo):
res.append(grupo)
# ao final, alguns grupos podem ter ficados soltos com OU no meio sem agrupamento
# então reagrupa sem recursividade
if recursivo:
res = self.corrigir_criterios_e_reagrupar(res, recursivo=False)
return res
# corrige os tipos de operadores que podem existir em cada situação
# remove sublistas encadeadas sem necessidade (((teste))) = (teste)
def corrigir_lista_de_operadores(self,criterios_lista):
res = []
if PRINT_DEBUG: print(f'Operadores: {criterios_lista}')
for i, token in enumerate(criterios_lista):
token_anterior = ''
token_proximo = ''
# identifica o próximo token para análise
if i < len(criterios_lista) -1:
token_proximo = criterios_lista[i+1]
# identifica o token anterior
if i > 0:
token_anterior = criterios_lista[i-1]
# dois operadores seguidos, mantém o segundo
if Operadores.e_operador(token) and Operadores.e_operador(token_proximo):
continue
# operador no início ou fim do grupo, ignora
if (i==0 and Operadores.e_operador(token) and (not Operadores.e_operador_nao(token)) ) or \
(token_proximo == '' and Operadores.e_operador(token)) :
continue
# operador antes ou depois de parênteses, valida o operador
if (type(token_anterior) is list or type(token_proximo) is list) and \
Operadores.e_operador(token) and \
not Operadores.e_operador_que_pode_antes_depois_parenteses(token):
token = Operadores.OPERADOR_PADRAO
# inclui o E na falta de operadores entre termos/listas
if token_anterior != '' and \
(not Operadores.e_operador(token)) and \
(not Operadores.e_operador(token_anterior)):
res.append(Operadores.OPERADOR_PADRAO)
if type(token) is list:
token = self.corrigir_lista_de_operadores(token)
res.append(token)
return res
# caso seja uma lista de uma única sublista, remove um nível
def corrigir_sublistas_desnecessarias(self,criterios_lista, raiz = True):
#if PRINT_DEBUG: print(f'Sublistas: {criterios_lista}')
res = []
for token in criterios_lista:
if type(token) is list:
# ignora lista vazia
if not any(token):
continue
# remove subníveis desnecessários
while type(token) is list and len(token) == 1:
token = token[0]
# não tem nada (texto ou lista), ignora o grupo
if len(token) == 0:
continue
if type(token) is list:
token = self.corrigir_sublistas_desnecessarias(token, False)
res.append(token)
# remove subníveis desnecessários da raiz
if len(res) == 1:
# se for uma lista, remove, se for raiz e for um token, mantém
if type(res[0]) is list or not raiz:
res = res[0]
if PRINT_DEBUG: print(f' -- sublistas ---> : {res}')
return res
def quebra_aspas_adj1(self, texto):
_texto = texto.replace('"','').replace("'",'')
if not _texto:
return []
res = []
# pega apenas os tokens que são válidos
tokens = [_ for _ in _texto.strip().split(' ') if Operadores.formatar_termo(_)]
for _ in tokens:
res += [f'"{_}"', Operadores.OPERADOR_ADJ1]
res = res[:-1]
# se tiver apenas um item, retorna ele como string sem grupo
if len(res) == 1:
return res[0]
return res
# junta critérios entre aspas se existirem - espera receber uma lista de strings
def juntar_aspas(self,criterios_lista):
if PRINT_DEBUG: print(f'Juntar aspas: {criterios_lista}')
res = []
aspas = False
aspas_txt = '' #vai acrescentando os termos entre aspas
ultimo_i = len(criterios_lista)-1
for i, tk in enumerate(criterios_lista):
# é o último token?
ultimo_token = i == ultimo_i
# caso venha de uma lista de strings e chega em um sublista
sublista = type(tk) is list
# fim de aspas
if (tk == '"' or ultimo_token or sublista) and aspas and aspas_txt:
if ultimo_token:
aspas_txt += f' {tk}'
tk = ''
_novos_tokens = self.quebra_aspas_adj1(aspas_txt.strip())
res.append(_novos_tokens)
aspas_txt = ''
aspas = False
tk = '' if tk == '"' else tk
# se encontrar uma lista, processa ela recursivamente
if type(tk) is list:
res.append(self.juntar_aspas(tk))
# início das aspas
elif tk == '"' and not aspas:
aspas = True
aspas_txt = ''
# meio das aspas
elif aspas:
aspas_txt += f' {tk}'
# token sem aspas
elif tk:
res.append( tk )
# última aspas
if aspas_txt:
_novos_tokens = self.quebra_aspas_adj1(aspas_txt.strip())
res.append(_novos_tokens)
if PRINT_DEBUG: print(f' -- aspas ----> : {res}')
return res
def reformatar_criterios(self, criterios_lista):
# converter a lista em uma lista plana
def _planifica(lista):
res = []
for lst in lista:
if type(lst) is str:
res.append(lst)
elif any(lista):
lst = _planifica(lst)
res += ['('] + lst + [')']
return res
lista = _planifica(criterios_lista)
# retorna uma string com os critérios
return ' '.join(lista).replace('( ','(').replace(' )',')')
# cria os critérios do elastic com o more like this
# todos os grupos de not são agrupados em um único must not
# operadores e aspas são removidos
# min_term_freq = 1 -> menor frequência do termo para ele ser usado na pesquisa
# min_doc_freq=1 -> menor frequência do termo em documentos para ele ser usado na pesquisa
# max_query_terms=None -> maior número de termos usados na pesquisa (none é automático 30)
# minimum_should_match=None -> quantidade de termos que precisam ser encontrados (none é automático entre 30% e 80% dependendo da qtd de termos)
def as_query_more_like_this(self, criterios, criterios_nao, campos_texto, min_term_freq=1,min_doc_freq=1, max_query_terms=None, minimum_should_match=None ):
_campos = [campos_texto] if type(campos_texto) is str else list(campos_texto)
_max_query_terms = max_query_terms if max_query_terms else 30
if minimum_should_match:
_minimum_should_match = minimum_should_match
else:
if len(set(criterios)) <5:
_minimum_should_match = "100%"
elif len(criterios) <30:
_minimum_should_match = "75%"
else:
_minimum_should_match = "50%"
return { "query": {"more_like_this" : {
"fields" : _campos,
"like" : str(criterios),
"unlike" : criterios_nao,
"min_term_freq" : min_term_freq,
"min_doc_freq" : min_doc_freq,
"max_query_terms" : _max_query_terms,
"minimum_should_match" : _minimum_should_match}}}
def __str__(self) -> str:
return f'PesquisaElasticFacil: {self.criterios_reformatado}'
def as_string(self):
return str(self.criterios_reformatado)
#############################################################
#############################################################
# formatadores de query elastic
#------------------------------------------------------------
def as_query(self):
#print('as_query: ', self.criterios_brs_listas)
res = self.as_query_condicoes(self.criterios_listas)
# dependendo dos retornos, constrói queries específicas
return { "query": res}
def as_bool_must(self, must, must_not, should=[], span_near=[]):
res = {}
# no caso de ser apenas um critério não precisa ser bool/must
if len(must) == 1 and len(must_not) == 0 and len(should) == 0 and len(span_near) == 0:
return must[0]
# no caso de span_near, o tratamento é feito em um único grupo
if any(span_near):
res['span_near'] = span_near
return res
if any(must):
res['must'] = must
if any(must_not):
res['must_not'] = must_not
if any(should):
res['should'] = should
return {"bool": res }
# retorna uma lista de condicoes para serem incluídos no
# must ou must_not dependendo do operador externo do grupo
def as_query_condicoes(self, grupo):
must = []
must_not = []
should = []
span_near = []
operador_nao = False
# busca o primeiro operador para análise do grupo
operador_grupo, n_grupo = Operadores.operador_n_do_grupo(grupo)
_campo_texto_slop = Operadores.campo_texto_grupo(grupo, self.campo_texto, self.sufixo_campo_raw, unico = True)
if PRINT_DEBUG: print('Operador do grupo: ', operador_grupo, grupo)
for token in grupo:
# se for o operador não/not - apenas guarda a referência
if type(token) is str and Operadores.e_operador_nao(token):
operador_nao = True
continue
# converte o grupo ou subgrupo em um critério interno do must/must_not
if type(token) is list:
# se or um grupo, processa ele recursivamente
grupo_convertido = self.as_query_condicoes(token)
if operador_nao:
must_not.append( grupo_convertido)
elif Operadores.e_operador_ou(operador_grupo):
should.append( grupo_convertido )
else:
must.append( grupo_convertido )
elif Operadores.e_operador(token):
# operadores foram identificados antes do for
continue
else:
# verifica o tipo do grupo e monta o operador do termo
if operador_nao:
# não com termo é um must_not simples
_campo_texto = Operadores.campo_texto_termo(token, campo_texto=self.campo_texto, sufixo_campo_raw=self.sufixo_campo_raw, unico=True)
grupo_convertido = self.as_query_operador(token, Operadores.OPERADOR_PADRAO, _campo_texto)
must_not.append( grupo_convertido )
else:
# critérios slop são todos entre aspas ou todos sem aspas pois precisam
# ser aplicados no mesmo campo
if Operadores.e_operador_slop(operador_grupo):
_campo_texto = _campo_texto_slop
else:
_campo_texto = Operadores.campo_texto_termo(token, campo_texto=self.campo_texto, sufixo_campo_raw=self.sufixo_campo_raw, unico=True)
grupo_convertido = self.as_query_operador(token, operador_grupo, _campo_texto)
if Operadores.e_operador_slop(operador_grupo):
span_near.append( grupo_convertido )
elif Operadores.e_operador_ou(operador_grupo):
should.append( grupo_convertido )
elif Operadores.e_operador_e(operador_grupo):
must.append( grupo_convertido )
# o não/not só afeta o próximo token ou grupo
operador_nao = False
# configura o span_near
if any(span_near):
span_near = {'clauses' : span_near,
'slop' : max(0, n_grupo -1 ),
'in_order' : bool(Operadores.e_operador_adj(operador_grupo))}
if PRINT_DEBUG:
if any(must): print('Must: ', must )
if any(must_not): print('Must_not: ', must_not )
if any(should): print('Should: ', should )
if any(span_near): print('Span_near: ', span_near )
return self.as_bool_must(must = must, must_not = must_not, should=should, span_near=span_near)
@classmethod
def as_query_operador(self, token, operador_grupo, campo_texto = None):
token = token.lower()
# wildcard - se o termo for entre aspas usa o campo raw, mas isso quem resolve é quem chama o método
#_aspas = token.find('"')>=0 or token.find("'")>=0
_wildcard = token.find('*')>=0
_regex = token.find('?')>=0 or Operadores.RE_TERMO_NUMERICO.match(token)
_token = Operadores.remover_acentos(token)
_token = _token.replace("'",'').replace('"','') # remove aspas
if _wildcard and not _regex:
_wildcard = { "wildcard": {f"{campo_texto}" : {"case_insensitive": True, "value": f"{_token}" } } }
if Operadores.e_operador_slop(operador_grupo):
return { "span_multi" : { "match": _wildcard } }
return _wildcard
elif _regex :
_token = Operadores.termo_regex_interroga(_token)
_regex = { "regexp": {f"{campo_texto}" : {"case_insensitive": True, "value": f"{_token}" } } }
if Operadores.e_operador_slop(operador_grupo):
return { "span_multi" : { "match": _regex } }
return _regex
# termo simples
if Operadores.e_operador_slop(operador_grupo):
return { "span_term": { f"{campo_texto}": f"{_token}" } }
return { "term": { f"{campo_texto}": f"{_token}" } }
# contem: transforma em more like this aceita nao ()
# parecido: transforma em slop(20 não ordenado)
# igual: transforma em slop(1 ordenado)
# identico: transforma em slop(0 ordenado)
def executar_pesquisa_inteligente(self):
_tipo = self.RE_INTELIGENTE.findall(self.criterios_originais)[0]
_criterios = self.RE_INTELIGENTE.sub('', self.criterios_originais)
_criterios_nao = self.RE_NAO.findall(_criterios)
_criterios = self.RE_NAO.sub(' ', _criterios)
_criterios = Operadores.remover_acentos(_criterios.lower())
_criterios = Operadores.RE_LIMPAR_TERMO_MLT.sub(' ', _criterios).replace('$','*')
_criterios_nao = [self.RE_NAO_LIMPAR.sub(' ',Operadores.remover_acentos(_.lower())) for _ in _criterios_nao]
_tipo = _tipo.upper()
_criterios_nao_formatados = [f' NÃO ({_}) ' for _ in _criterios_nao]
_criterios_nao_formatados = ''.join(_criterios_nao_formatados)
self.criterios_reformatado = f'{_tipo}: {_criterios} {_criterios_nao_formatados}'
if PRINT_DEBUG:
print('Tipo: ', _tipo)
print('Critérios: ', _criterios)
print('Critérios não: ', _criterios_nao)
print('Critérios finais: ', _criterios)
if _tipo in ('CONTÉM', 'CONTEM'):
self.criterios_elastic = self.as_query_more_like_this(criterios=_criterios,
criterios_nao=_criterios_nao,
campos_texto=self.campo_texto)
else: # contém
operador, n = Operadores.get_operador_n(_tipo)
if Operadores.e_operador_adj(operador):
ordem = True
else:
ordem = False
distancia = n-1
_criterios = [_ for _ in _criterios.split(' ') if _]
_criterios_nao = ' '.join(_criterios_nao)
_criterios_nao = [_ for _ in _criterios_nao.split(' ') if _]
self.criterios_elastic = self.as_query_slop(criterios=_criterios,
criterios_nao=_criterios_nao,
campos_texto=self.campo_texto,
sufixo_campo_raw=self.sufixo_campo_raw,
distancia = distancia,
ordem = ordem)
def as_query_slop(self, criterios, criterios_nao, campos_texto, sufixo_campo_raw, distancia, ordem):
_campo = Operadores.campo_texto_grupo(criterios, campo_texto=campos_texto, sufixo_campo_raw=sufixo_campo_raw, unico=True)
_campo_nao = Operadores.campo_texto_grupo(criterios_nao, campo_texto=campos_texto, sufixo_campo_raw=sufixo_campo_raw, unico=True)
span_near = [self.as_query_operador(_, Operadores.OPERADOR_ADJ1 , _campo) for _ in criterios]
span_near_nao = [self.as_query_operador(_, Operadores.OPERADOR_ADJ1 , _campo_nao) for _ in criterios_nao]
qspan_near = {'clauses' : span_near, 'slop' : max(0, distancia), 'in_order' : ordem}
qspan_near_nao = {'clauses' : span_near_nao, 'slop' : max(0, distancia), 'in_order' : ordem}
if not any(span_near_nao):
return { "query": {"span_near" :qspan_near }}
return { "query": { "bool": {
"must": [{"span_near" :qspan_near } ] ,
"must_not" : [{"span_near" :qspan_near_nao }]} }
}
class GruposPesquisaElasticFacil():
def __init__(self, criterios_agrupados = '', campo_texto_padrao='texto', sufixo_campo_raw='.raw', campos_disponiveis = {}) -> None:
if PRINT_DEBUG: print(f'GruposPesquisaElasticFacil: iniciado campo:"{campo_texto_padrao}"', 'critérios:', len(criterios_agrupados)>0)
self.__must__ = []
self.__must_not__ = []
self.__should__ = []
self.__as_string__ = ''
self.campo_texto_padrao = campo_texto_padrao
self.sufixo_campo_raw = sufixo_campo_raw
self.avisos = [] # registra sugestões de avisos para o usuário
# configura os campos disponíveis para critérios em grupo
# bem como o sufixo raw de cada um se existir
# por padrão o sufixo raw é vazio se não for configurado
# exemplo: {'texto':'.raw', 'titulo':'', 'nome':''}
# se campos_disponiveis estiver vazio, permite incluir qualquer campo
self.campos_disponiveis = campos_disponiveis if type(campos_disponiveis) is dict else dict(campos_disponiveis)
if criterios_agrupados:
self.add_criterios_agrupados(criterios_agrupados)
def __valida_parenteses__(self,criterios):
a = [1 for _ in criterios if _=='(']
f = [1 for _ in criterios if _==')']
if a>f:
raise ValueError(ERRO_PARENTESES_FALTA_FECHAR)
elif f>a:
raise ValueError(ERRO_PARENTESES_FECHOU_MAIS)
# um campo é válido se for igual ao padrão
# ou se está na lista de disponíveis ou se não há lista de disponíveis
def __valida_campo_grupo__(self, campo):
# print('VALIDANDO CAMPO ', campo, 'CAMPOS', list(self.campos_disponiveis.keys()))
res = campo==self.campo_texto_padrao or \
campo in self.campos_disponiveis.keys() or \
not any(self.campos_disponiveis.keys())
if res:
return True
msg = f'campos_pesquisa: o campo {campo} não está disponível para pesquisa, corrija a lista de campos disponíveis ou corrija o nome do campo.'
raise KeyError(msg)
# retorna o sufixo do campo informado ou ''
def __retorna_sufixo_campo_raw__(self, campo):
self.__valida_campo_grupo__(campo)
# se for o campo padrão e o sufixo for vazio,
# verifica se o sufixo está na lista para esse campo
if campo == self.campo_texto_padrao and self.sufixo_campo_raw:
return self.sufixo_campo_raw
# busca o sufixo do campo ou vazio
sufixo = self.campos_disponiveis.get(campo,'')
return str(sufixo)
# retorna o campo informado com sufixo raw ou só o campo
def __retorna_campo_raw__(self, campo):
sufixo = self.__retorna_sufixo_campo_raw__(campo)
sufixo = f'.{sufixo}' if sufixo and sufixo[0] !='.' else sufixo
return f'{campo}{sufixo}'
# busca o próximo fechamento levando em consideração que pode abrir algum parênteses no meio
def __get_proximo_fechamento__(self, texto):
q_abre, pos = 0,-1
for c in texto:
pos +=1
if c == ')' and q_abre ==0:
#acabou pois achou o fechamento sem abertura pendente
return pos
elif c == ')' and q_abre >0:
q_abre += -1
elif c == '(':
q_abre += 1
# não conseguiu achar o fechamento
return -1
# varre os grupos entre parênteses que contenham indicador de campo
# Exemplo: .campo_texto.(critérios) .campo_nome.(criterios)
# adiciona os critérios no objeto
def add_criterios_agrupados(self, criterios):
_criterios = str(criterios)
self.__valida_parenteses__(_criterios)
ini = 0
lista_grupos = [] # (operador, campo, critérios de grupo)
for grupo in Operadores.RE_OPERADOR_CAMPOS_GRUPOS.finditer(criterios):
# até o operador de campo
criterio_raiz = _criterios[ini:grupo.start()].strip()
_criterio_raiz_split = criterio_raiz.split(' ')
# abertura de parênteses seguido de campo de pesquisa
if any(_criterio_raiz_split) and _criterio_raiz_split[-1][-1]=='(':
raise ValueError(ERRO_OPERADOR_CAMPO_PARENTESES)
# operador ou iniciando o critério raiz
if any(_criterio_raiz_split) and Operadores.e_operador_ou(_criterio_raiz_split[0]):
raise ValueError(ERRO_OPERADOR_OU_CAMPO_RAIZ)
# operador antes de campo de pesquisa, identifica o operador e separa os critérios anteriores
# para adicionar ao conjunto de pesquisa
operador = ''
if any(_criterio_raiz_split) and Operadores.e_operador(_criterio_raiz_split[-1]):
operador = _criterio_raiz_split[-1].upper()
criterio_raiz = ' '.join(_criterio_raiz_split[:-1])
# critério de pesquisa por campo
campo = _criterios[grupo.start():grupo.end()]
# prepara o campo para ver se tem operador junto
campo = campo.replace('.(',' ').replace('.',' ').replace(' ',' ').strip()
ini = grupo.end()
pos_fim = self.__get_proximo_fechamento__(_criterios[ini:])
pos_fim = ini if pos_fim<0 else ini+pos_fim+1
criterios_campo = _criterios[ini:pos_fim-1]
ini = pos_fim
# adiciona o critério raiz anterior à pesquisa por campo
if criterio_raiz:
lista_grupos.append(('','', criterio_raiz))