Skip to content

Commit

Permalink
feat: enhanced webhook security measures:
Browse files Browse the repository at this point in the history
- Added an automatically generated path segment to the webhook endpoint for Telegram API requests.
- Introduced support for the X-Telegram-Bot-Api-Secret-Token header.
  • Loading branch information
orenlab committed Dec 28, 2024
1 parent 083fbb6 commit 282a19d
Show file tree
Hide file tree
Showing 6 changed files with 680 additions and 331 deletions.
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def main():
"""
try:
manager = pytmbot_instance.PyTMBot()
manager.start_bot_instance()
manager.launch_bot()
except Exception as e:
logs.bot_logger.critical(f"Failed to start the bot instance: {e}")
sys.exit(1)
Expand Down
55 changes: 55 additions & 0 deletions pytmbot/models/telegram_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/venv/bin/python3
"""
(c) Copyright 2024, Denis Rozhnovskiy <[email protected]>
pyTMBot - A simple Telegram bot to handle Docker containers and images,
also providing basic information about the status of local servers.
"""

import ipaddress

from pytmbot.logs import bot_logger


class TelegramIPValidator:
"""Validates if an IP address belongs to Telegram's network ranges."""

def __init__(self):
self.ipv4_ranges = [
ipaddress.ip_network("91.108.56.0/22"),
ipaddress.ip_network("91.108.4.0/22"),
ipaddress.ip_network("91.108.8.0/22"),
ipaddress.ip_network("91.108.16.0/22"),
ipaddress.ip_network("91.108.12.0/22"),
ipaddress.ip_network("149.154.160.0/20"),
ipaddress.ip_network("91.105.192.0/23"),
ipaddress.ip_network("91.108.20.0/22"),
ipaddress.ip_network("185.76.151.0/24"),
]

self.ipv6_ranges = [
ipaddress.ip_network("2001:b28:f23d::/48"),
ipaddress.ip_network("2001:b28:f23f::/48"),
ipaddress.ip_network("2001:67c:4e8::/48"),
ipaddress.ip_network("2001:b28:f23c::/48"),
ipaddress.ip_network("2a0a:f280::/32"),
]

self.validated_ips: set[str] = set()

def is_telegram_ip(self, ip_str: str) -> bool:
if ip_str in self.validated_ips:
return True

try:
ip = ipaddress.ip_address(ip_str)
ranges = self.ipv4_ranges if isinstance(ip, ipaddress.IPv4Address) else self.ipv6_ranges

for network in ranges:
if ip in network:
self.validated_ips.add(ip_str)
return True
return False

except ValueError:
bot_logger.error(f"Invalid IP address format: {ip_str}")
return False
174 changes: 174 additions & 0 deletions pytmbot/models/updates_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#!/venv/bin/python3
"""
(c) Copyright 2024, Denis Rozhnovskiy <[email protected]>
pyTMBot - A simple Telegram bot to handle Docker containers and images,
also providing basic information about the status of local servers.
"""

from typing import Optional, Dict, Any, List

from pydantic import BaseModel, Field, ConfigDict


class InlineKeyboardButton(BaseModel):
text: str
callback_data: Optional[str] = None
url: Optional[str] = None
model_config = ConfigDict(extra="allow")


class InlineKeyboardMarkup(BaseModel):
inline_keyboard: List[List[InlineKeyboardButton]]
model_config = ConfigDict(extra="allow")


class User(BaseModel):
id: int
is_bot: bool
first_name: str
last_name: Optional[str] = None
username: Optional[str] = None
model_config = ConfigDict(extra="allow")


class Chat(BaseModel):
id: int
type: str
title: Optional[str] = None
username: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
model_config = ConfigDict(extra="allow")


class MessageEntity(BaseModel):
type: str
offset: int
length: int
url: Optional[str] = None
user: Optional[User] = None
model_config = ConfigDict(extra="allow")


class Message(BaseModel):
message_id: int
date: int
chat: Chat
from_user: Optional[User] = Field(None, alias='from')
text: Optional[str] = None
entities: Optional[List[MessageEntity]] = None
reply_markup: Optional[InlineKeyboardMarkup] = None
model_config = ConfigDict(extra="allow")


class CallbackQuery(BaseModel):
id: str
from_user: User = Field(..., alias='from')
message: Optional[Message] = None
inline_message_id: Optional[str] = None
chat_instance: str
data: Optional[str] = None
game_short_name: Optional[str] = None
model_config = ConfigDict(extra="allow")


class InlineQuery(BaseModel):
id: str
from_user: User = Field(..., alias='from')
query: str
offset: str
chat_type: Optional[str] = None
model_config = ConfigDict(extra="allow")


class ChosenInlineResult(BaseModel):
result_id: str
from_user: User = Field(..., alias='from')
query: str
inline_message_id: Optional[str] = None
model_config = ConfigDict(extra="allow")


class ShippingQuery(BaseModel):
id: str
from_user: User = Field(..., alias='from')
invoice_payload: str
shipping_address: Dict[str, Any]
model_config = ConfigDict(extra="allow")


class PreCheckoutQuery(BaseModel):
id: str
from_user: User = Field(..., alias='from')
currency: str
total_amount: int
invoice_payload: str
model_config = ConfigDict(extra="allow")


class PollAnswer(BaseModel):
poll_id: str
user: User
option_ids: List[int]
model_config = ConfigDict(extra="allow")


class Poll(BaseModel):
id: str
question: str
options: List[Dict[str, Any]]
is_closed: bool
is_anonymous: bool
type: str
allows_multiple_answers: bool
model_config = ConfigDict(extra="allow")


class ChatMember(BaseModel):
user: User
status: str
model_config = ConfigDict(extra="allow")


class ChatMemberUpdated(BaseModel):
chat: Chat
from_user: User = Field(..., alias='from')
date: int
old_chat_member: ChatMember
new_chat_member: ChatMember
invite_link: Optional[Dict[str, Any]] = None
model_config = ConfigDict(extra="allow")


class ChatJoinRequest(BaseModel):
chat: Chat
from_user: User = Field(..., alias='from')
user_chat_id: int
date: int
bio: Optional[str] = None
invite_link: Optional[Dict[str, Any]] = None
model_config = ConfigDict(extra="allow")


class UpdateModel(BaseModel):
update_id: int = Field(..., gt=0)
message: Optional[Message] = None
edited_message: Optional[Message] = None
channel_post: Optional[Message] = None
edited_channel_post: Optional[Message] = None
inline_query: Optional[InlineQuery] = None
chosen_inline_result: Optional[ChosenInlineResult] = None
callback_query: Optional[CallbackQuery] = None
shipping_query: Optional[ShippingQuery] = None
pre_checkout_query: Optional[PreCheckoutQuery] = None
poll: Optional[Poll] = None
poll_answer: Optional[PollAnswer] = None
my_chat_member: Optional[ChatMemberUpdated] = None
chat_member: Optional[ChatMemberUpdated] = None
chat_join_request: Optional[ChatJoinRequest] = None

model_config = ConfigDict(
extra="allow",
populate_by_name=True,
arbitrary_types_allowed=True
)
Loading

0 comments on commit 282a19d

Please sign in to comment.