diff --git a/proxycache/proxycache.py b/proxycache/proxycache.py index 6f41386..e8e0796 100644 --- a/proxycache/proxycache.py +++ b/proxycache/proxycache.py @@ -1,39 +1,47 @@ -import json import os +import json import re +import logging import requests -from steamgrid import SteamGridDB -from http.server import BaseHTTPRequestHandler, HTTPServer -from urllib.parse import urlparse, parse_qs, unquote -from ratelimit import limits, sleep_and_retry, RateLimitException -from requests.adapters import HTTPAdapter -from requests.packages.urllib3.util.retry import Retry -from steamgrid.enums import PlatformType +from steamgriddba import SteamGridDB +from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse from datetime import datetime, timedelta -import logging +from ratelimit import limits, sleep_and_retry, RateLimitException +from collections import defaultdict +import requests_cache +from urllib.parse import urlparse, parse_qs, unquote -# Initialize logging +# Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Initialize an empty dictionary to serve as the cache -api_cache = {} +# Initialize cache with requests-cache +requests_cache.install_cache('steamgriddb_cache', expire_after=3600) # 1 hour cache expiration -# API Key for SteamGridDB +# SteamGridDB API key API_KEY = os.getenv('STEAMGRIDDB_API_KEY') -sgdb = SteamGridDB(API_KEY) # Create an instance of SteamGridDB +sgdb = SteamGridDB(API_KEY) -# Define rate limit (e.g., 100 requests per minute) +# Rate-limiting parameters RATE_LIMIT = 100 RATE_LIMIT_PERIOD = 60 # in seconds -# Create a session with connection pooling +# Initialize cache and IP rate-limiting tracking +api_cache = {} +ip_request_counts = defaultdict(int) +blocked_ips = set() + +# Create a session with connection pooling and retries session = requests.Session() -retry = Retry(connect=3, backoff_factor=0.5) -adapter = HTTPAdapter(max_retries=retry) +retry = requests.adapters.Retry(connect=3, backoff_factor=0.5) +adapter = requests.adapters.HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) +# FastAPI app setup +app = FastAPI() + @sleep_and_retry @limits(calls=RATE_LIMIT, period=RATE_LIMIT_PERIOD) def limited_request(url, headers): @@ -49,160 +57,80 @@ def limited_request(url, headers): raise def sanitize_game_name(game_name): - # Remove special characters like ™ and ® - sanitized_name = re.sub(r'[^\w\s]', '', game_name) - return sanitized_name - -class ProxyCacheHandler(BaseHTTPRequestHandler): - def do_GET(self): - parsed_path = urlparse(self.path) - path_parts = parsed_path.path.split('/') - logger.info(f"Parsed path: {parsed_path.path}") - logger.info(f"Path parts: {path_parts}") - - if len(path_parts) < 4: - self.send_response(400) - self.end_headers() - self.wfile.write(b'Invalid request') - return - - if path_parts[2] == 'search': - game_name = unquote(path_parts[3]) # Decode the URL-encoded game name - self.handle_search(game_name) - else: - if len(path_parts) < 5: - self.send_response(400) - self.end_headers() - self.wfile.write(b'Invalid request') - return - - art_type = path_parts[2] - game_id = path_parts[4] - dimensions = parse_qs(parsed_path.query).get('dimensions', [None])[0] - - logger.info(f"Art type: {art_type}") - logger.info(f"Game ID: {game_id}") - logger.info(f"Dimensions: {dimensions}") - - self.handle_artwork(game_id, art_type, dimensions) - - def handle_search(self, game_name): - logger.info(f"Searching for game ID for: {game_name}") - - # List of terms to decline - decline_terms = ["NonSteamLaunchers", "Repair EA App", "Nexon Launcher", "RemotePlayWhatever"] - - if game_name in decline_terms: - logger.info(f"Declining search for: {game_name}") - self.send_response(400) - self.end_headers() - self.wfile.write(b'Search term is not allowed') - return - + """Sanitize game name by removing special characters""" + return re.sub(r'[^\w\s]', '', game_name) + +def is_cache_valid(cache_entry): + """Check if the cache entry is still valid""" + return (datetime.now() - cache_entry['timestamp']).seconds < 3600 + +@app.get("/grid/{game_name}") +async def get_game_grid(game_name: str): + """Get the grid image URL for a specific game""" + sanitized_name = sanitize_game_name(game_name) + + # Check cache for the game grid + if sanitized_name in api_cache and is_cache_valid(api_cache[sanitized_name]): + logger.info(f"Serving from cache: {sanitized_name}") + response = api_cache[sanitized_name]['data'] + else: + # Fetch grid from SteamGridDB try: - sanitized_name = sanitize_game_name(game_name) - logger.info(f"Sanitized game name: {sanitized_name}") - - # Check if the search term is in the cache - if sanitized_name in api_cache and self.is_cache_valid(api_cache[sanitized_name]): - logger.info(f"Serving from cache: {sanitized_name}") - response = api_cache[sanitized_name]['data'] + games = sgdb.search_game(sanitized_name) + if games: + grid_url = games[0].image_steam_grid + response = {"game": sanitized_name, "grid_url": grid_url} + # Cache the result + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} else: - games = sgdb.search_game(sanitized_name) - if games: - game_id = games[0].id - response = {'data': [{'id': game_id}]} - # Store the search term and response in the cache - api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} - else: - # Fallback to Steam platform if no results from SteamGridDB - fallback_results = self.search_fallback_platforms(sanitized_name) - if fallback_results: - response = {'data': fallback_results} - # Store the search term and response in the cache - api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} - else: - response = {'data': [], 'message': 'No artwork found for the given search term.'} - # Store the search term and response in the cache - api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} - - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps(response).encode()) + # Fallback if no results found + response = {"message": "Game not found."} + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} except Exception as e: - logger.error(f"Error searching for game ID: {e}") - self.send_response(500) - self.end_headers() - self.wfile.write(b'Error searching for game ID') - - def search_fallback_platforms(self, game_name): - fallback_results = [] - steam_results = self.search_steamgridb(game_name) - if steam_results: - fallback_results.extend(steam_results) - return fallback_results - - def search_steamgridb(self, game_name): + logger.error(f"Error fetching game grid: {e}") + raise HTTPException(status_code=500, detail="Error fetching game grid") + + return response + +@app.get("/search/{game_name}") +async def search_game(game_name: str): + """Search for a game and get its ID""" + sanitized_name = sanitize_game_name(game_name) + + # Decline certain terms + decline_terms = ["NonSteamLaunchers", "Repair EA App", "Nexon Launcher", "RemotePlayWhatever"] + if sanitized_name in decline_terms: + raise HTTPException(status_code=400, detail="Search term is not allowed") + + # Check cache for search result + if sanitized_name in api_cache and is_cache_valid(api_cache[sanitized_name]): + logger.info(f"Serving search result from cache: {sanitized_name}") + response = api_cache[sanitized_name]['data'] + else: try: - games = sgdb.search_game(game_name) + games = sgdb.search_game(sanitized_name) if games: - return [{'id': game.id, 'name': game.name} for game in games] + game_id = games[0].id + response = {"data": [{"id": game_id}]} + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} + else: + response = {"data": [], "message": "No results found."} + api_cache[sanitized_name] = {'data': response, 'timestamp': datetime.now()} except Exception as e: - logger.error(f"Error searching SteamGridDB: {e}") - return [] - - def handle_artwork(self, game_id, art_type, dimensions): - if not game_id: - self.send_response(400) - self.end_headers() - self.wfile.write(b'Game ID is required') - return - - logger.info(f"Downloading {art_type} artwork for game ID: {game_id}") - cache_key = (game_id, art_type, dimensions) - if cache_key in api_cache and self.is_cache_valid(api_cache[cache_key]): - logger.info(f"Serving from cache: {cache_key}") - data = api_cache[cache_key]['data'] - else: - try: - url = f"https://www.steamgriddb.com/api/v2/{art_type}/game/{game_id}" - if dimensions: - url += f"?dimensions={dimensions}" - - # Check for specific game IDs and request alternate artwork styles - if game_id in ['5260961', '5297303']: - url += "&style=alternate" - - headers = {'Authorization': f'Bearer {API_KEY}'} - logger.info(f"Sending request to: {url}") - response = limited_request(url, headers) - data = response.json() - api_cache[cache_key] = {'data': data, 'timestamp': datetime.now()} - logger.info(f"Storing in cache: {cache_key}") - except Exception as e: - logger.error(f"Error making API call: {e}") - self.send_response(500) - self.end_headers() - self.wfile.write(b'Error making API call') - return - - if 'data' not in data: - self.send_response(500) - self.end_headers() - self.wfile.write(b'Invalid response from API') - return - - self.send_response(200) - self.end_headers() - self.wfile.write(json.dumps(data).encode()) - - def is_cache_valid(self, cache_entry): - cache_expiry = timedelta(hours=168) # Set cache expiry time - return datetime.now() - cache_entry['timestamp'] < cache_expiry - -def run(server_class=HTTPServer, handler_class=ProxyCacheHandler): - port = int(os.environ.get('PORT', 8000)) # Use the environment variable PORT or default to 8000 - server_address = ('', port) + logger.error(f"Error searching for game: {e}") + raise HTTPException(status_code=500, detail="Error searching for game") + + return response + +# Optional route to check cache status +@app.get("/cache_status") +async def cache_status(): + """Check cache expiration status""" + if requests_cache.is_cache_expired('steamgriddb_cache'): + return {"status": "Cache expired. Refreshing..."} + else: + return {"status": "Using cached response."} + httpd = server_class(server_address, handler_class) logger.info(f'Starting proxy cache server on port {port}...') httpd.serve_forever()