forked from logpai/Drain3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
template_miner_config.py
78 lines (65 loc) · 3.77 KB
/
template_miner_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# SPDX-License-Identifier: MIT
import ast
import configparser
import json
import logging
from drain3.masking import MaskingInstruction
logger = logging.getLogger(__name__)
class TemplateMinerConfig:
def __init__(self):
self.profiling_enabled = False
self.profiling_report_sec = 60
self.snapshot_interval_minutes = 5
self.snapshot_compress_state = True
self.drain_extra_delimiters = []
self.drain_sim_th = 0.4
self.drain_depth = 4
self.drain_max_children = 100
self.drain_max_clusters = None
self.masking_instructions = []
self.mask_prefix = "<"
self.mask_suffix = ">"
self.parameter_extraction_cache_capacity = 3000
self.parametrize_numeric_tokens = True
def load(self, config_filename: str):
parser = configparser.ConfigParser()
read_files = parser.read(config_filename)
if len(read_files) == 0:
logger.warning(f"config file not found: {config_filename}")
section_profiling = 'PROFILING'
section_snapshot = 'SNAPSHOT'
section_drain = 'DRAIN'
section_masking = 'MASKING'
self.profiling_enabled = parser.getboolean(section_profiling, 'enabled',
fallback=self.profiling_enabled)
self.profiling_report_sec = parser.getint(section_profiling, 'report_sec',
fallback=self.profiling_report_sec)
self.snapshot_interval_minutes = parser.getint(section_snapshot, 'snapshot_interval_minutes',
fallback=self.snapshot_interval_minutes)
self.snapshot_compress_state = parser.getboolean(section_snapshot, 'compress_state',
fallback=self.snapshot_compress_state)
drain_extra_delimiters_str = parser.get(section_drain, 'extra_delimiters',
fallback=str(self.drain_extra_delimiters))
self.drain_extra_delimiters = ast.literal_eval(drain_extra_delimiters_str)
self.drain_sim_th = parser.getfloat(section_drain, 'sim_th',
fallback=self.drain_sim_th)
self.drain_depth = parser.getint(section_drain, 'depth',
fallback=self.drain_depth)
self.drain_max_children = parser.getint(section_drain, 'max_children',
fallback=self.drain_max_children)
self.drain_max_clusters = parser.getint(section_drain, 'max_clusters',
fallback=self.drain_max_clusters)
self.parametrize_numeric_tokens = parser.getboolean(section_drain, 'parametrize_numeric_tokens',
fallback=self.parametrize_numeric_tokens)
masking_instructions_str = parser.get(section_masking, 'masking',
fallback=str(self.masking_instructions))
self.mask_prefix = parser.get(section_masking, 'mask_prefix', fallback=self.mask_prefix)
self.mask_suffix = parser.get(section_masking, 'mask_suffix', fallback=self.mask_suffix)
self.parameter_extraction_cache_capacity = parser.get(section_masking, 'parameter_extraction_cache_capacity',
fallback=self.parameter_extraction_cache_capacity)
masking_instructions = []
masking_list = json.loads(masking_instructions_str)
for mi in masking_list:
instruction = MaskingInstruction(mi['regex_pattern'], mi['mask_with'])
masking_instructions.append(instruction)
self.masking_instructions = masking_instructions