forked from nikat/mtproto2json
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tl.py
441 lines (389 loc) · 18 KB
/
tl.py
1
#!/usr/bin/env python3"""This is a prototype moduleThis module partly implements TL binary serialization for Telegram MTProto https://core.telegram.org/mtproto/serializeReading and parsing of scheme.tl is supported.Vectors and flags are hardcoded.int128 and int256 are read and written as 16 bytes and 32 bytesservice.tl is used to extend scheme.tl to implement certain service constructors that are not present in scheme.tlspecial basic types are added to support service.tl:ulong - 64 bit little endian unsigned integeruint - 32 bit little endian unsigned integersha1 - read and written as 20 bytesrawobject - any boxed typeobject - any type prepended by length as uintencrypted - ONLY for writing, just writes bytes as they are passed to the serialize functiongzip - ONLY for reading, a string that is gzip.decompressed upon reading"""__author__ = "Nikita Miropolskiy"__email__ = "[email protected]"__license__ = "https://creativecommons.org/licenses/by-nc-nd/4.0/legalcode"__status__ = "Prototype"import binasciiimport functoolsimport gzip # TODO make gzip async/threadedimport reimport structfrom byteutils import long_hex, pack_binary_string, unpack_binary_string, unpack_long_binary_string, \ pack_long_binary_string, Bytedata, base64decode, [email protected]_cache()def _compile_cons_number(definition: bytes) -> bytes: n = binascii.crc32(definition) return n.to_bytes(4, 'little', signed=False)def _pack_flags(flags: set) -> bytes: n = 0 for flag in flags: n |= 1 << int(flag) return n.to_bytes(4, 'little', signed=False)@functools.lru_cache()def unpack_flags(n: int) -> list: i = 0 flags = [] while n > 0: if n % 2 == 1: flags.append(i) i += 1 n >>= 1 return flags_schemeRE = re.compile( r'^(?P<empty>$)' r'|(?P<comment>//.*)' r'|(?P<typessection>---types---)' r'|(?P<functionssection>---functions---)' r'|(?P<vector>vector#1cb5c415 {t:Type} # \[ t ] = Vector t;)' r'|(?P<cons>(?P<name>[a-zA-Z0-9._]+)(#(?P<number>[a-f0-9]{1,8}))?' r'(?P<xtype> {X:Type})?' r'(?P<flags> flags:#)?' r'(?P<parameters>.*?)' r'(?(xtype) query:!X = X| = (?P<type>[a-zA-Z0-9._<>]+));)' r'$')_parameterRE = re.compile( r'^(?P<name>[a-zA-Z0-9_]+):' r'(flags.(?P<flag_number>\d+)\?)?' r'(?P<type>' r'(?P<vector>((?P<bare_vector>vector)|(?P<boxed_vector>Vector))<)?' r'(?P<element_type>((?P<namespace>[a-zA-Z0-9._]*)\.)?((?P<bare>[a-z][a-zA-Z0-9._]*)|(?P<boxed>[A-Z][a-zA-Z0-9._]*)))' r'(?(vector)>)?)$')# a collection of constructorsclass Scheme: def __init__(self, in_thread, scheme_data): self.constructors = dict() self.types = dict() self.cons_numbers = dict() self._parse_file(scheme_data) self._in_thread = in_thread def __repr__(self): return '\n'.join(repr(cons) for cons in self.constructors.values()) def _parse_file(self, scheme_data): for scheme_line in scheme_data.split('\n'): self._parse_line(scheme_line) @staticmethod def _parse_token(regex, s: str): match = regex.match(s) if not match: return None else: return {k: v for k, v in match.groupdict().items() if v is not None} def _parse_line(self, line): cons_parsed = self._parse_token(_schemeRE, line) if not cons_parsed: raise SyntaxError('Error in scheme: `%s`' % line) if 'cons' not in cons_parsed: return parameter_tokens = cons_parsed['parameters'].split(' ')[1:] parameters = [] if 'number' in cons_parsed: con_number_int = int(cons_parsed['number'], base=16) cons_number = con_number_int.to_bytes(4, 'little', signed=False) else: cons_number = None for parameter_token in parameter_tokens: parameter_parsed = self._parse_token(_parameterRE, parameter_token) if not parameter_parsed: raise SyntaxError('Error in parameter `%s`' % parameter_token) is_vector = 'vector' in parameter_parsed element_parameter = Parameter( pname='<element of vector `%s`>' % parameter_parsed['name'], ptype=parameter_parsed['element_type'], is_boxed='boxed' in parameter_parsed ) if is_vector else None parameter = Parameter( pname=parameter_parsed['name'], ptype=parameter_parsed['type'], flag_number=int(parameter_parsed['flag_number']) if 'flag_number' in parameter_parsed else None, is_vector=is_vector, is_boxed='boxed_vector' in parameter_parsed if is_vector else 'boxed' in parameter_parsed, element_parameter=element_parameter ) parameters.append(parameter) if 'xtype' in cons_parsed: parameters.append(Parameter( pname='_wrapped', ptype='rawobject', flag_number=None, is_vector=False, is_boxed=True, element_parameter=None )) cons = Constructor( scheme=self, ptype=None if 'xtype' in cons_parsed else cons_parsed['type'], name=cons_parsed['name'], number=cons_number, has_flags='flags' in cons_parsed, parameters=parameters ) self.constructors[cons.name] = cons self.cons_numbers[cons.number] = cons if cons.type not in self.types: self.types[cons.type] = set() self.types[cons.type].add(cons) def typecheck(self, parameter, argument): if not isinstance(argument, Value): return False, 'not an object for nonbasic type' if parameter.is_boxed: if parameter.type not in self.types: return False, 'unknown type' if argument.cons not in self.types[parameter.type]: return False, 'type mismatch' if not argument.boxed: return False, 'expected boxed, found bare' else: if parameter.type not in self.constructors: return False, 'unknown constructor' if argument.cons != self.constructors[parameter.type]: return False, 'wrong constructor' if argument.boxed: return False, 'expected bare, found boxed' return True, 'Ok' async def deserialize(self, bytereader, parameter=None): if parameter.is_boxed: if parameter.type is not None and parameter.type not in self.types: raise ValueError("Unknown type `%s`" % parameter.type) cons_number = await bytereader(4) if cons_number not in self.cons_numbers: raise ValueError("Unknown constructor %s" % hex(int.from_bytes(cons_number, 'little'))) cons = self.cons_numbers[cons_number] if parameter.type is not None and cons not in self.types[parameter.type]: raise ValueError("type mismatch, constructor `%s` not in type `%s`" % (cons.name, parameter.type)) else: if parameter.type not in self.constructors: raise ValueError("Unknown constructor in parameter `%r`" % parameter) cons = self.constructors[parameter.type] return await cons.deserialize_bare_data(bytereader) def serialize(self, boxed: bool, **kwargs): cons_name = kwargs['_cons'] if cons_name not in self.constructors: raise NotImplementedError('Constructor `%s` not present in scheme.' % cons_name) cons = self.constructors[cons_name] return cons.serialize(boxed=boxed, **kwargs) def bare(self, **kwargs): return self.serialize(boxed=False, **kwargs) def boxed(self, **kwargs): return self.serialize(boxed=True, **kwargs) async def read(self, bytereader, is_boxed=True, parameter_type=None): parameter = Parameter('', parameter_type, is_boxed=is_boxed) return await self.deserialize(bytereader, parameter) async def read_from_string(self, string: bytes, *args, **kwargs): bytedata = Bytedata(string) return await self.read(bytedata.cororead, *args, **kwargs)# a serialized TL Value that will be sentclass Value: def __init__(self, cons, boxed: bool=False): self.cons = cons self.boxed = boxed if self.boxed and self.cons.number is None: raise RuntimeError("Tried to create a boxed value for a numberless constructor `%r`" % cons) self._flags = set() self._data = [] def set_flag(self, flag_number: int): if not self.cons.has_flags: raise TypeError('Conditional data added to plain constructor `%r`' % self.cons) if flag_number in self._flags: raise ValueError('Data with flag `%d` is already present in constructor `%s`' % (flag_number, self.cons)) self._flags.add(flag_number) def append(self, data: bytes): self._data.append(data) def __repr__(self): return '%s(%r)\n%s' % ('boxed' if self.boxed else 'bare', self.cons, long_hex(self.get_flat_bytes())) def get_flat_bytes(self): prefix = b'' if self.boxed: prefix += self.cons.number if self.cons.has_flags: prefix += _pack_flags(self._flags) return prefix + b''.join(map(lambda k: k.get_flat_bytes() if isinstance(k, Value) else k, self._data))# a deserialized TL Value that was receivedclass Structure: def __init__(self, constructor_name: str): self._constructor_name = constructor_name self._fields = dict() def __eq__(self, other): if isinstance(other, str): return self._constructor_name == other def __repr__(self): return repr(self.get_dict()) def __getattr__(self, name): if name not in self._fields: raise AttributeError("Attribute `%s` not found in `%r`" % (name, self)) return self._fields[name] def get_dict(self): return Structure._get_dict(self) @staticmethod def _get_dict(anything): if isinstance(anything, Structure): ret = dict(_cons=anything._constructor_name) ret.update({key: Structure._get_dict(value) for key, value in anything._fields.items()}) return ret elif isinstance(anything, (list, tuple)): return [Structure._get_dict(value) for value in anything] elif isinstance(anything, bytes): try: return anything.decode('utf-8') except UnicodeDecodeError: return "could not decode bytes object :O" else: return anything# a parameter in TL Constructor or TL Functionclass Parameter: def __init__(self, pname: str, ptype: str, is_boxed: bool, flag_number: int=None, is_vector: bool=False, element_parameter=None ): self.name = pname self.type = ptype self.flag_number = flag_number self.is_vector = is_vector self.is_boxed = is_boxed self.element_parameter = element_parameter def __repr__(self): if self.flag_number is not None: return '%s:flags.%d?%s' % (self.name, self.flag_number, self.type) else: return '%s:%s' % (self.name, self.type)# a TL Constructor or TL Functionclass Constructor: def __init__(self, scheme, ptype: str, name: str, number: bytes, has_flags: bool, parameters): self.scheme = scheme self.name = name self.number = number self.type = ptype self.has_flags = has_flags self._parameters = parameters def __repr__(self): return '%s %s= %s;' % (self.name, ''.join('%r ' % p for p in self._parameters), self.type) def _serialize_argument(self, data, parameter, argument): if isinstance(argument, str): argument = argument.encode('utf-8') if isinstance(argument, dict): argument = self.scheme.serialize(boxed=parameter.is_boxed, **argument) if parameter.type == 'int': data.append(int(argument).to_bytes(4, 'little', signed=True)) elif parameter.type == 'uint': data.append(int(argument).to_bytes(4, 'little', signed=False)) elif parameter.type == 'long': data.append(int(argument).to_bytes(8, 'little', signed=True)) elif parameter.type == 'ulong': data.append(int(argument).to_bytes(8, 'little', signed=False)) elif parameter.type == 'int128': # it's more convenient to handle long ints as bytes if len(argument) != 16: raise ValueError("Expected 16 bytes, got %d bytes" % len(argument)) data.append(argument) elif parameter.type == 'sha1': # it's more convenient to handle long ints as bytes if len(argument) != 20: raise ValueError("Expected 20 bytes, got %d bytes" % len(argument)) data.append(argument) elif parameter.type == 'int256': # it's more convenient to handle long ints as bytes if len(argument) != 32: raise ValueError("Expected 32 bytes, got %d bytes" % len(argument)) data.append(argument) elif parameter.type == 'double': data.append(struct.pack(b'<d', float(argument))) elif parameter.type == 'string': if isinstance(argument, str): argument = argument.encode('utf-8') if not isinstance(argument, bytes): raise TypeError('Wrong argument `%r` for parameter `%r` in `%s`, expected bytes or string' % (argument, parameter, self.name)) data.append(pack_binary_string(argument)) elif parameter.type == 'bytes': argument = base64decode(argument) data.append(pack_binary_string(argument)) elif parameter.type == 'object': data.append(pack_long_binary_string(argument.get_flat_bytes())) elif parameter.type == 'rawobject': argument.boxed = True data.append(argument) elif parameter.type == 'encrypted': data.append(argument) elif parameter.is_vector: if parameter.is_boxed: data.append(_compile_cons_number(b'vector t:Type # [ t ] = Vector t')) data.append(len(argument).to_bytes(4, 'little', signed=False)) for element_argument in argument: self._serialize_argument(data, parameter.element_parameter, element_argument) else: typecheck, type_error = self.scheme.typecheck(parameter, argument) if not typecheck: raise TypeError('Wrong argument `%r` for parameter `%r` in `%s`, %s' % (argument, parameter, self.name, type_error)) data.append(argument) if parameter.flag_number is not None: data.set_flag(parameter.flag_number) def serialize(self, boxed: bool, **arguments): data = Value(self, boxed=boxed) for parameter in self._parameters: if parameter.name not in arguments: if parameter.flag_number is None: raise TypeError('required `%s` not found in `%s`' % (parameter, self.name)) else: pass else: argument = arguments[parameter.name] self._serialize_argument(data, parameter, argument) return data async def _deserialize_argument(self, bytereader, parameter): if parameter.type == 'int': return int.from_bytes(await bytereader(4), 'little', signed=True) elif parameter.type == 'uint': return int.from_bytes(await bytereader(4), 'little', signed=False) elif parameter.type == 'long': return int.from_bytes(await bytereader(8), 'little', signed=True) elif parameter.type == 'ulong': return int.from_bytes(await bytereader(8), 'little', signed=False) elif parameter.type == 'int128': return await bytereader(16) elif parameter.type == 'sha1': return await bytereader(20) elif parameter.type == 'int256': return await bytereader(32) elif parameter.type == 'double': return struct.unpack(b'<d', await bytereader(8)) elif parameter.type == 'string': return await unpack_binary_string(bytereader) elif parameter.type == 'bytes': return base64encode(await unpack_binary_string(bytereader)) elif parameter.type == 'gzip': unpacked = self.scheme._in_thread(gzip.decompress, await unpack_binary_string(bytereader)) return await self.scheme.read_from_string(await unpacked) elif parameter.type == 'rawobject': return await self.scheme.read(bytereader) elif parameter.type == 'object': return await self.scheme.read_from_string(await unpack_long_binary_string(bytereader)) elif parameter.is_vector: if parameter.is_boxed: vcons = await bytereader(4) if vcons != _compile_cons_number(b'vector t:Type # [ t ] = Vector t'): raise ValueError("Not vector `%s` in `%r` in `%r`" % (long_hex(vcons), parameter, self)) vlen = int.from_bytes(await bytereader(4), 'little', signed=False) return [(await self._deserialize_argument(bytereader, parameter.element_parameter)) for _ in range(vlen)] else: return await self.scheme.deserialize(bytereader, parameter) async def deserialize_bare_data(self, bytedata): if self.has_flags: flags = unpack_flags(int.from_bytes(await bytedata(4), 'little', signed=False)) parameters = [p for p in self._parameters if p.flag_number is None or p.flag_number in flags] else: parameters = self._parameters result = Structure(self.name) for parameter in parameters: argument = await self._deserialize_argument(bytedata, parameter) result._fields[parameter.name] = argument return result