-
Notifications
You must be signed in to change notification settings - Fork 3
/
merger.py
160 lines (136 loc) · 4.53 KB
/
merger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""Merges the redundant program data in an XMLTV file."""
import argparse
import pathlib
import typing
from xml.dom.minidom import Document, Element, parse
import tqdm
class ChildNodes:
"""A set of DOM elements"""
def __init__(self, element: Element):
"""
Parameters
----------
element: Element
"""
self.element = element
self.representations = {self.represent(element)}
def normalize(self, name: str):
"""
Normalize the name of an element
Parameters
----------
name: str
Returns
-------
str
"""
return name.lower().replace(" ", "").replace("\n", "")
def represent(self, element: Element):
"""
Represent an element as a string
Parameters
----------
element: Element
Returns
-------
str
"""
return self.normalize(element.toxml())
def add(self, element: Element):
"""
Add an element to the set
Parameters
----------
element: Element
"""
representation = self.represent(element)
if not representation or representation in self.representations:
return
# print(representation, self.representations)
self.representations.add(representation)
return self.element.appendChild(element)
try:
tag = str(element.tagName)
except AttributeError:
return
value = str(element.nodeValue).lower().replace(" ", "")
# Only add the element if it is not already in the set
for el in self.element.childNodes:
# The tags are different => the elements are different
try:
if el.tagName != tag:
continue
except AttributeError:
pass
# The content is different => the elements are different
if str(el.nodeValue).lower().replace(" ", "") != value:
continue
try:
for key, value in el.attributes.items():
# The attributes are different => the elements are different
if element.attributes.get(key, None) != value:
break
else:
# never breaked, so the attributes are the same
return
except AttributeError:
return
self.element.appendChild(element)
def merge_programs(programs: typing.List[Element]) -> Element:
"""
Merges the redundant program data in a list of program elements.
Parameters
----------
programs: list
Returns
-------
Element
"""
# Two child elements are merged if they have the same tag name, attributes and text content.
new_element = programs[0].cloneNode(deep=True)
children = ChildNodes(new_element)
for program in programs[1:]:
for child in program.childNodes:
children.add(child)
return new_element
def main(dom: Document, progress: bool = False):
"""
The core function for the script.
Parameters
----------
dom: Document
progress: bool, default = True
"""
for programme in tqdm.tqdm(
dom.getElementsByTagName("programme"), disable=not progress
):
same = []
for other in dom.getElementsByTagName("programme"):
if programme.getAttribute("start") == other.getAttribute(
"start"
) and programme.getAttribute("channel") == other.getAttribute("channel"):
same.append(other)
if len(same) > 1:
merged = merge_programs(same)
for other in same[1:]:
programme.parentNode.removeChild(other)
programme.parentNode.replaceChild(merged, programme)
def entry():
"""The main entrypoint for the script."""
parser = argparse.ArgumentParser(prog="merger", description="Merge program data")
parser.add_argument(
"--input", "-i", help="The input XMLTV file", type=pathlib.Path, required=True
)
parser.add_argument("output", default="-", help="The output path", nargs="?")
args = parser.parse_args()
with pathlib.Path(args.input).open() as file:
dom = parse(file)
stdout = not (args.output and args.output != "-")
main(dom, progress=not stdout)
result = dom.toxml(encoding="utf-8")
if stdout:
print(result.decode("utf-8"))
else:
pathlib.Path(args.output).write_bytes(result)
if __name__ == "__main__":
entry()