Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor connection pool #5081

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion config/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ https_only: false
#disable_proxy: false

##
## Size of the HTTP pool used to connect to youtube. Each
## Max size of the HTTP pool used to connect to youtube. Each
## domain ('youtube.com', 'ytimg.com', ...) has its own pool.
##
## Accepted values: a positive integer
Expand All @@ -154,6 +154,35 @@ https_only: false
#pool_size: 100


##
## Max idle size of the HTTP pool used to connect to youtube. Each
## domain ('youtube.com', 'ytimg.com', ...) has its own pool.
##
## This means that when releasing a connection back into the pool, it will
## be closed if there are already more than idle_pool_size connections within
## the pool
##
## Do note that idle connections are kept around forever without any way of
## timing them out.
##
## When unset this value has the same value as pool_size
##
## Accepted values: a positive integer
## Default: <none> (internally this means that it has the same value as pool_size)
##
#idle_pool_size:

##
## Amount of seconds to wait for a client to be free from the pool
## before raising an error
##
##
## Accepted values: a positive integer
## Default: 5
##
#pool_checkout_timeout: 5


##
## Additional cookies to be sent when requesting the youtube API.
##
Expand Down
15 changes: 13 additions & 2 deletions src/invidious.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require "protodec/utils"

require "./invidious/database/*"
require "./invidious/database/migrations/*"
require "./invidious/connection/*"
require "./invidious/http_server/*"
require "./invidious/helpers/*"
require "./invidious/yt_backend/*"
Expand Down Expand Up @@ -91,11 +92,21 @@ SOFTWARE = {
"branch" => "#{CURRENT_BRANCH}",
}

YT_POOL = YoutubeConnectionPool.new(YT_URL, capacity: CONFIG.pool_size)
YT_POOL = Invidious::ConnectionPool::Pool.new(
YT_URL,
max_capacity: CONFIG.pool_size,
idle_capacity: CONFIG.idle_pool_size,
timeout: CONFIG.pool_checkout_timeout
)

# Image request pool

GGPHT_POOL = YoutubeConnectionPool.new(URI.parse("https://yt3.ggpht.com"), capacity: CONFIG.pool_size)
GGPHT_POOL = Invidious::ConnectionPool::Pool.new(
URI.parse("https://yt3.ggpht.com"),
max_capacity: CONFIG.pool_size,
idle_capacity: CONFIG.idle_pool_size,
timeout: CONFIG.pool_checkout_timeout
)

# CLI
Kemal.config.extra_options do |parser|
Expand Down
2 changes: 1 addition & 1 deletion src/invidious/channels/channels.cr
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def fetch_channel(ucid, pull_all_videos : Bool)
}

LOGGER.trace("fetch_channel: #{ucid} : Downloading RSS feed")
rss = YT_POOL.client &.get("/feeds/videos.xml?channel_id=#{ucid}").body
rss = YT_POOL.get("/feeds/videos.xml?channel_id=#{ucid}").body
LOGGER.trace("fetch_channel: #{ucid} : Parsing RSS feed")
rss = XML.parse(rss)

Expand Down
9 changes: 8 additions & 1 deletion src/invidious/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,15 @@ class Config
property port : Int32 = 3000
# Host to bind (overridden by command line argument)
property host_binding : String = "0.0.0.0"
# Pool size for HTTP requests to youtube.com and ytimg.com (each domain has a separate pool of `pool_size`)
# Max pool size for HTTP requests to youtube.com and ytimg.com (each domain has a separate pool)
property pool_size : Int32 = 100

# Idle pool size for HTTP requests to youtube.com and ytimg.com (each domain has a separate pool)
property idle_pool_size : Int32? = nil

# Amount of seconds to wait for a client to be free from the pool before rasing an error
property pool_checkout_timeout : Float64 = 5

# HTTP Proxy configuration
property http_proxy : HTTPProxyConfig? = nil

Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,3 @@
# Mapping of subdomain => YoutubeConnectionPool
# This is needed as we may need to access arbitrary subdomains of ytimg
private YTIMG_POOLS = {} of String => YoutubeConnectionPool

struct YoutubeConnectionPool
property! url : URI
property! capacity : Int32
property! timeout : Float64
property pool : DB::Pool(HTTP::Client)

def initialize(url : URI, @capacity = 5, @timeout = 5.0)
@url = url
@pool = build_pool()
end

def client(&)
conn = pool.checkout
# Proxy needs to be reinstated every time we get a client from the pool
conn.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy

begin
response = yield conn
rescue ex
conn.close
conn = make_client(url, force_resolve: true)

response = yield conn
ensure
pool.release(conn)
end

response
end

private def build_pool
options = DB::Pool::Options.new(
initial_pool_size: 0,
max_pool_size: capacity,
max_idle_pool_size: capacity,
checkout_timeout: timeout
)

DB::Pool(HTTP::Client).new(options) do
next make_client(url, force_resolve: true)
end
end
end

def add_yt_headers(request)
request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal"
request.headers["User-Agent"] ||= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
Expand Down Expand Up @@ -99,18 +51,3 @@ def make_configured_http_proxy_client
password: config_proxy.password,
)
end

# Fetches a HTTP pool for the specified subdomain of ytimg.com
#
# Creates a new one when the specified pool for the subdomain does not exist
def get_ytimg_pool(subdomain)
if pool = YTIMG_POOLS[subdomain]?
return pool
else
LOGGER.info("ytimg_pool: Creating a new HTTP pool for \"https://#{subdomain}.ytimg.com\"")
pool = YoutubeConnectionPool.new(URI.parse("https://#{subdomain}.ytimg.com"), capacity: CONFIG.pool_size)
YTIMG_POOLS[subdomain] = pool

return pool
end
end
116 changes: 116 additions & 0 deletions src/invidious/connection/pool.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
module Invidious::ConnectionPool
struct Pool
property url : URI
property pool : DB::Pool(HTTP::Client)

def initialize(
url : URI,
*,
max_capacity : Int32 = 5,
idle_capacity : Int32? = nil,
timeout : Float64 = 5.0
)
if idle_capacity.nil?
idle_capacity = max_capacity
end

@url = url

options = DB::Pool::Options.new(
initial_pool_size: 0,
max_pool_size: max_capacity,
max_idle_pool_size: idle_capacity,
checkout_timeout: timeout
)

@pool = DB::Pool(HTTP::Client).new(options) do
next make_client(url, force_resolve: true)
end
end

{% for method in %w[get post put patch delete head options] %}
def {{method.id}}(*args, **kwargs, &)
self.client do | client |
client.{{method.id}}(*args, **kwargs) do | response |

result = yield response
return result

ensure
response.body_io?.try &. skip_to_end
end
end
end

def {{method.id}}(*args, **kwargs)
{{method.id}}(*args, **kwargs) do | response |
return response
ensure
response.body_io?.try &. skip_to_end
end
end
{% end %}

# Checks out a client in the pool
private def client(&)
# If a client has been deleted from the pool
# we won't try to release it
client_exists_in_pool = true

http_client = pool.checkout

# When the HTTP::Client connection is closed, the automatic reconnection
# feature will create a new IO to connect to the server with
#
# This new TCP IO will be a direct connection to the server and will not go
# through the proxy. As such we'll need to reinitialize the proxy connection

http_client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy

response = yield http_client
rescue ex : DB::PoolTimeout
# Failed to checkout a client
raise ConnectionPool::Error.new(ex.message, cause: ex)
rescue ex
# An error occurred with the client itself.
# Delete the client from the pool and close the connection
if http_client
client_exists_in_pool = false
@pool.delete(http_client)
http_client.close
end

# Raise exception for outer methods to handle
raise ConnectionPool::Error.new(ex.message, cause: ex)
ensure
pool.release(http_client) if http_client && client_exists_in_pool
end
end

class Error < Exception
end

# Mapping of subdomain => Invidious::ConnectionPool::Pool
# This is needed as we may need to access arbitrary subdomains of ytimg
private YTIMG_POOLS = {} of String => ConnectionPool::Pool

# Fetches a HTTP pool for the specified subdomain of ytimg.com
#
# Creates a new one when the specified pool for the subdomain does not exist
def self.get_ytimg_pool(subdomain)
if pool = YTIMG_POOLS[subdomain]?
return pool
else
LOGGER.info("ytimg_pool: Creating a new HTTP pool for \"https://#{subdomain}.ytimg.com\"")
pool = ConnectionPool::Pool.new(
URI.parse("https://#{subdomain}.ytimg.com"),
max_capacity: CONFIG.pool_size,
idle_capacity: CONFIG.idle_pool_size,
timeout: CONFIG.pool_checkout_timeout
)
YTIMG_POOLS[subdomain] = pool

return pool
end
end
end
2 changes: 1 addition & 1 deletion src/invidious/mixes.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def fetch_mix(rdid, video_id, cookies = nil, locale = nil)
end

video_id = "CvFH_6DNRCY" if rdid.starts_with? "OLAK5uy_"
response = YT_POOL.client &.get("/watch?v=#{video_id}&list=#{rdid}&gl=US&hl=en", headers)
response = YT_POOL.get("/watch?v=#{video_id}&list=#{rdid}&gl=US&hl=en", headers)
initial_data = extract_initial_data(response.body)

if !initial_data["contents"]["twoColumnWatchNextResults"]["playlist"]?
Expand Down
6 changes: 3 additions & 3 deletions src/invidious/routes/api/manifest.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Invidious::Routes::API::Manifest
end

if dashmpd = video.dash_manifest_url
response = YT_POOL.client &.get(URI.parse(dashmpd).request_target)
response = YT_POOL.get(URI.parse(dashmpd).request_target)

if response.status_code != 200
haltf env, status_code: response.status_code
Expand Down Expand Up @@ -163,7 +163,7 @@ module Invidious::Routes::API::Manifest

# /api/manifest/hls_playlist/*
def self.get_hls_playlist(env)
response = YT_POOL.client &.get(env.request.path)
response = YT_POOL.get(env.request.path)

if response.status_code != 200
haltf env, status_code: response.status_code
Expand Down Expand Up @@ -218,7 +218,7 @@ module Invidious::Routes::API::Manifest

# /api/manifest/hls_variant/*
def self.get_hls_variant(env)
response = YT_POOL.client &.get(env.request.path)
response = YT_POOL.get(env.request.path)

if response.status_code != 200
haltf env, status_code: response.status_code
Expand Down
6 changes: 3 additions & 3 deletions src/invidious/routes/api/v1/videos.cr
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ module Invidious::Routes::API::V1::Videos
# Auto-generated captions often have cues that aren't aligned properly with the video,
# as well as some other markup that makes it cumbersome, so we try to fix that here
if caption.name.includes? "auto-generated"
caption_xml = YT_POOL.client &.get(url).body
caption_xml = YT_POOL.get(url).body

settings_field = {
"Kind" => "captions",
Expand Down Expand Up @@ -147,7 +147,7 @@ module Invidious::Routes::API::V1::Videos
query_params = uri.query_params
query_params["fmt"] = "vtt"
uri.query_params = query_params
webvtt = YT_POOL.client &.get(uri.request_target).body
webvtt = YT_POOL.get(uri.request_target).body

if webvtt.starts_with?("<?xml")
webvtt = caption.timedtext_to_vtt(webvtt)
Expand Down Expand Up @@ -300,7 +300,7 @@ module Invidious::Routes::API::V1::Videos
cache_annotation(id, annotations)
end
else # "youtube"
response = YT_POOL.client &.get("/annotations_invideo?video_id=#{id}")
response = YT_POOL.get("/annotations_invideo?video_id=#{id}")

if response.status_code != 200
haltf env, response.status_code
Expand Down
2 changes: 1 addition & 1 deletion src/invidious/routes/channels.cr
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ module Invidious::Routes::Channels
value = env.request.resource.split("/")[2]
body = ""
{"channel", "user", "c"}.each do |type|
response = YT_POOL.client &.get("/#{type}/#{value}/live?disable_polymer=1")
response = YT_POOL.get("/#{type}/#{value}/live?disable_polymer=1")
if response.status_code == 200
body = response.body
end
Expand Down
2 changes: 1 addition & 1 deletion src/invidious/routes/embed.cr
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Invidious::Routes::Embed

return env.redirect url
when "live_stream"
response = YT_POOL.client &.get("/embed/live_stream?channel=#{env.params.query["channel"]? || ""}")
response = YT_POOL.get("/embed/live_stream?channel=#{env.params.query["channel"]? || ""}")
video_id = response.body.match(/"video_id":"(?<video_id>[a-zA-Z0-9_-]{11})"/).try &.["video_id"]

env.params.query.delete_all("channel")
Expand Down
6 changes: 3 additions & 3 deletions src/invidious/routes/errors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ module Invidious::Routes::ErrorRoutes
item = md["id"]

# Check if item is branding URL e.g. https://youtube.com/gaming
response = YT_POOL.client &.get("/#{item}")
response = YT_POOL.get("/#{item}")

if response.status_code == 301
response = YT_POOL.client &.get(URI.parse(response.headers["Location"]).request_target)
response = YT_POOL.get(URI.parse(response.headers["Location"]).request_target)
end

if response.body.empty?
Expand Down Expand Up @@ -40,7 +40,7 @@ module Invidious::Routes::ErrorRoutes
end

# Check if item is video ID
if item.match(/^[a-zA-Z0-9_-]{11}$/) && YT_POOL.client &.head("/watch?v=#{item}").status_code != 404
if item.match(/^[a-zA-Z0-9_-]{11}$/) && YT_POOL.head("/watch?v=#{item}").status_code != 404
env.response.headers["Location"] = url
haltf env, status_code: 302
end
Expand Down
Loading
Loading