From e3b31e84c43125439efa5c593bc50cfa7f016293 Mon Sep 17 00:00:00 2001 From: Chun-Han Hsiao Date: Mon, 8 Jul 2024 13:42:06 +0800 Subject: [PATCH] Fix race condition in HttpProtocol while having multiple proxy settings In HttpProtocol implementation, the client builder was singleton and may be accessed and modified by different threads at same time. The result is that a wrong proxy will be used or a wrong proxy auth will be configured. To fix it, create a local builder insteand of having a field-level builder. Fixes #1247 --- .../protocol/httpclient/HttpProtocol.java | 114 ++++++------- .../protocol/okhttp/HttpProtocol.java | 151 +++++++++--------- 2 files changed, 118 insertions(+), 147 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java b/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java index 9c2c019b8..9fd2f373b 100644 --- a/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java +++ b/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java @@ -22,8 +22,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; -import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Locale; import org.apache.commons.lang.StringUtils; @@ -78,10 +77,11 @@ public class HttpProtocol extends AbstractHttpProtocol private int globalMaxContent; - private HttpClientBuilder builder; - private RequestConfig requestConfig; - private RequestConfig.Builder requestConfigBuilder; + + private String userAgent; + + private final List defaultHeaders = new ArrayList<>(); @Override public void configure(final Config conf) { @@ -100,19 +100,14 @@ public void configure(final Config conf) { globalMaxContent = ConfUtils.getInt(conf, "http.content.limit", -1); - String userAgent = getAgentString(conf); - - Collection defaultHeaders = new LinkedList<>(); + userAgent = getAgentString(conf); String accept = ConfUtils.getString(conf, "http.accept"); if (StringUtils.isNotBlank(accept)) { defaultHeaders.add(new BasicHeader("Accept", accept)); } - customHeaders.forEach( - h -> { - defaultHeaders.add(new BasicHeader(h.getKey(), h.getValue())); - }); + customHeaders.forEach(h -> defaultHeaders.add(new BasicHeader(h.getKey(), h.getValue()))); String basicAuthUser = ConfUtils.getString(conf, "http.basicauth.user", null); @@ -132,25 +127,16 @@ public void configure(final Config conf) { defaultHeaders.add(new BasicHeader("Accept-Language", acceptLanguage)); } - builder = - HttpClients.custom() - .setUserAgent(userAgent) - .setDefaultHeaders(defaultHeaders) - .setConnectionManager(CONNECTION_MANAGER) - .setConnectionManagerShared(true) - .disableRedirectHandling() - .disableAutomaticRetries(); - int timeout = ConfUtils.getInt(conf, "http.timeout", 10000); - requestConfigBuilder = - RequestConfig.custom() - .setSocketTimeout(timeout) - .setConnectTimeout(timeout) - .setConnectionRequestTimeout(timeout) - .setCookieSpec(CookieSpecs.STANDARD); - - requestConfig = requestConfigBuilder.build(); + requestConfig = RequestConfig.custom() + .setSocketTimeout(timeout) + .setConnectTimeout(timeout) + .setConnectionRequestTimeout(timeout) + .setCookieSpec(CookieSpecs.STANDARD) + // Can make configurable and add more in future + .setProxyPreferredAuthSchemes(Collections.singletonList(AuthSchemes.BASIC)) + .build(); } @Override @@ -158,45 +144,11 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except LOG.debug("HTTP connection manager stats {}", CONNECTION_MANAGER.getTotalStats()); - // set default request config to global config - RequestConfig reqConfig = requestConfig; - + SCProxy proxy = null; // conditionally add a dynamic proxy if (proxyManager != null) { - // retrieve proxy from proxy manager - SCProxy prox = proxyManager.getProxy(md); - - // conditionally configure proxy authentication - if (StringUtils.isNotBlank(prox.getUsername())) { - List authSchemes = new ArrayList<>(); - - // Can make configurable and add more in future - authSchemes.add(AuthSchemes.BASIC); - requestConfigBuilder.setProxyPreferredAuthSchemes(authSchemes); - - BasicCredentialsProvider basicAuthCreds = new BasicCredentialsProvider(); - basicAuthCreds.setCredentials( - new AuthScope(prox.getAddress(), Integer.parseInt(prox.getPort())), - new UsernamePasswordCredentials(prox.getUsername(), prox.getPassword())); - builder.setDefaultCredentialsProvider(basicAuthCreds); - } - - HttpHost proxy = new HttpHost(prox.getAddress(), Integer.parseInt(prox.getPort())); - DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy); - builder.setRoutePlanner(routePlanner); - - // save start time for debugging speed impact of request config - // build - long buildStart = System.currentTimeMillis(); - - // set request config to new configuration with dynamic proxy - reqConfig = requestConfigBuilder.build(); - - LOG.debug( - "time to build http request config with proxy: {}ms", - System.currentTimeMillis() - buildStart); - - LOG.debug("fetching with " + prox.toString()); + proxy = proxyManager.getProxy(md); + LOG.debug("fetching with {}", proxy); } HttpRequestBase request = new HttpGet(url); @@ -246,11 +198,11 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except } } - request.setConfig(reqConfig); + request.setConfig(requestConfig); // no need to release the connection explicitly as this is handled // automatically. The client itself must be closed though. - try (CloseableHttpClient httpclient = builder.build()) { + try (CloseableHttpClient httpclient = createClient(proxy)) { return httpclient.execute(request, responseHandler); } } @@ -372,6 +324,32 @@ private static byte[] toByteArray( return buffer.toByteArray(); } + private CloseableHttpClient createClient(final SCProxy proxy) { + final HttpClientBuilder builder = HttpClients.custom() + .setUserAgent(userAgent) + .setDefaultHeaders(defaultHeaders) + .setConnectionManager(CONNECTION_MANAGER) + .setConnectionManagerShared(true) + .disableRedirectHandling() + .disableAutomaticRetries(); + + if (proxy != null) { + final DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(new HttpHost( + proxy.getAddress(), Integer.parseInt(proxy.getPort()))); + builder.setRoutePlanner(routePlanner); + if (StringUtils.isNotBlank(proxy.getUsername())) { + // conditionally configure proxy authentication + final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(proxy.getAddress(), Integer.parseInt(proxy.getPort())), + new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword())); + builder.setDefaultCredentialsProvider(credentialsProvider); + } + } + + return builder.build(); + } + public static void main(String[] args) throws Exception { Protocol.main(new HttpProtocol(), args); } diff --git a/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java b/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java index 2aefa950e..e360990f5 100644 --- a/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java +++ b/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java @@ -32,9 +32,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -43,8 +41,6 @@ import okhttp3.Connection; import okhttp3.ConnectionPool; import okhttp3.Credentials; -import okhttp3.EventListener; -import okhttp3.EventListener.Factory; import okhttp3.Handshake; import okhttp3.Headers; import okhttp3.Interceptor; @@ -87,6 +83,18 @@ public class HttpProtocol extends AbstractHttpProtocol { private int completionTimeout = -1; + private int httpTimeout; + + private boolean retryOnConnectionFailure; + + private boolean followRedirects; + + private boolean insecure; + + private final List protocols = new ArrayList<>(); + + private ConnectionPool connectionPool = null; + /** Accept partially fetched content as trimmed content */ private boolean partialContentAsTrimmed = false; @@ -95,8 +103,6 @@ public class HttpProtocol extends AbstractHttpProtocol { // track the time spent for each URL in DNS resolution private final Map DNStimes = new HashMap<>(); - private OkHttpClient.Builder builder; - private static final TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @@ -137,7 +143,7 @@ public void configure(Config conf) { globalMaxContent = ConfUtils.getInt(conf, "http.content.limit", -1); - final int timeout = ConfUtils.getInt(conf, "http.timeout", 10000); + this.httpTimeout = ConfUtils.getInt(conf, "http.timeout", 10000); this.completionTimeout = ConfUtils.getInt(conf, "topology.message.timeout.secs", completionTimeout); @@ -145,19 +151,13 @@ public void configure(Config conf) { this.partialContentAsTrimmed = ConfUtils.getBoolean(conf, "http.content.partial.as.trimmed", false); - builder = - new OkHttpClient.Builder() - .retryOnConnectionFailure( - ConfUtils.getBoolean( - conf, "http.retry.on.connection.failure", true)) - .followRedirects(ConfUtils.getBoolean(conf, "http.allow.redirects", false)) - .connectTimeout(timeout, TimeUnit.MILLISECONDS) - .writeTimeout(timeout, TimeUnit.MILLISECONDS) - .readTimeout(timeout, TimeUnit.MILLISECONDS); + this.retryOnConnectionFailure = ConfUtils.getBoolean( + conf, "http.retry.on.connection.failure", true); + + this.followRedirects = ConfUtils.getBoolean(conf, "http.allow.redirects", false); // protocols in order of preference, see // https://square.github.io/okhttp/4.x/okhttp/okhttp3/-ok-http-client/-builder/protocols/ - final List protocols = new ArrayList<>(); for (String pVersion : protocolVersions) { switch (pVersion) { case "h2": @@ -181,10 +181,6 @@ public void configure(Config conf) { break; } } - if (protocols.size() > 0) { - LOG.info("Using protocol versions: {}", protocols); - builder.protocols(protocols); - } final String userAgent = getAgentString(conf); if (StringUtils.isNotBlank(userAgent)) { @@ -216,38 +212,14 @@ public void configure(Config conf) { customHeaders.forEach(customRequestHeaders::add); - if (storeHTTPHeaders) { - builder.addNetworkInterceptor(new HTTPHeadersInterceptor()); - } - - if (ConfUtils.getBoolean(conf, "http.trust.everything", true)) { - builder.sslSocketFactory(trustAllSslSocketFactory, (X509TrustManager) trustAllCerts[0]); - builder.hostnameVerifier( - new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession session) { - return true; - } - }); - } - - builder.eventListenerFactory( - new Factory() { - @Override - public EventListener create(Call call) { - return new DNSResolutionListener(DNStimes); - } - }); - - // enable support for Brotli compression (Content-Encoding) - builder.addInterceptor(BrotliInterceptor.INSTANCE); + this.insecure = ConfUtils.getBoolean(conf, "http.trust.everything", true); final Map connectionPoolConf = (Map) conf.get("okhttp.protocol.connection.pool"); if (connectionPoolConf != null) { final int size = ConfUtils.getInt(connectionPoolConf, "max.idle.connections", 5); final int time = ConfUtils.getInt(connectionPoolConf, "connection.keep.alive", 300); - builder.connectionPool(new ConnectionPool(size, time, TimeUnit.SECONDS)); + this.connectionPool = new ConnectionPool(size, time, TimeUnit.SECONDS); LOG.info( "Using connection pool with max. {} idle connections " + "and {} sec. connection keep-alive time", @@ -255,7 +227,8 @@ public EventListener create(Call call) { time); } - client = builder.build(); + // default client without proxy + client = createClient(null); } private void addCookiesToRequest(Builder rb, String url, Metadata md) { @@ -292,46 +265,20 @@ public ProtocolResponse getProtocolOutput(String url, final Metadata metadata) // conditionally add a dynamic proxy if (proxyManager != null) { // retrieve proxy from proxy manager - SCProxy prox = proxyManager.getProxy(metadata); - - // conditionally configure proxy authentication - if (StringUtils.isNotBlank(prox.getAddress())) { - // format SCProxy into native Java proxy - Proxy proxy = - new Proxy( - Proxy.Type.valueOf(prox.getProtocol().toUpperCase(Locale.ROOT)), - new InetSocketAddress( - prox.getAddress(), Integer.parseInt(prox.getPort()))); - - // set proxy in builder - builder.proxy(proxy); - - // conditionally add proxy authentication - if (StringUtils.isNotBlank(prox.getUsername())) { - // add proxy authentication header to builder - builder.proxyAuthenticator( - (Route route, Response response) -> { - String credential = - Credentials.basic(prox.getUsername(), prox.getPassword()); - return response.request() - .newBuilder() - .header("Proxy-Authorization", credential) - .build(); - }); - } + SCProxy proxy = proxyManager.getProxy(metadata); + if (StringUtils.isNotBlank(proxy.getAddress())) { + // create new local client from builder using proxy + localClient = createClient(proxy); } // save start time for debugging speed impact of client build long buildStart = System.currentTimeMillis(); - // create new local client from builder using proxy - localClient = builder.build(); - LOG.debug( "time to build okhttp client with proxy: {}ms", System.currentTimeMillis() - buildStart); - LOG.debug("fetching with proxy {} - {} ", url, prox.toString()); + LOG.debug("fetching with proxy {} - {} ", url, proxy); } final Builder rb = new Request.Builder().url(url); @@ -616,6 +563,52 @@ public Response intercept(Interceptor.Chain chain) throws IOException { } } + private OkHttpClient createClient(final SCProxy proxy) { + final OkHttpClient.Builder builder = new OkHttpClient.Builder() + .retryOnConnectionFailure(retryOnConnectionFailure) + .followRedirects(followRedirects) + .connectTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .writeTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .readTimeout(httpTimeout, TimeUnit.MILLISECONDS); + + if (protocols.size() > 0) { + LOG.info("Using protocol versions: {}", protocols); + builder.protocols(protocols); + } + + if (storeHTTPHeaders) { + builder.addNetworkInterceptor(new HTTPHeadersInterceptor()); + } + + if (insecure) { + builder.sslSocketFactory(trustAllSslSocketFactory, (X509TrustManager) trustAllCerts[0]); + builder.hostnameVerifier((hostname, session) -> true); + } + + builder.eventListenerFactory(call -> new DNSResolutionListener(DNStimes)); + + // enable support for Brotli compression (Content-Encoding) + builder.addInterceptor(BrotliInterceptor.INSTANCE); + + builder.connectionPool(connectionPool); + + if (proxy != null) { + builder.proxy(new Proxy( + Proxy.Type.valueOf(proxy.getProtocol().toUpperCase(Locale.ROOT)), + new InetSocketAddress(proxy.getAddress(), Integer.parseInt(proxy.getPort())))); + if (StringUtils.isNotBlank(proxy.getUsername())) { + builder.proxyAuthenticator((Route route, Response response) -> { + String credential = Credentials.basic(proxy.getUsername(), proxy.getPassword()); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + }); + } + } + + return builder.build(); + } + public static void main(String args[]) throws Exception { org.apache.stormcrawler.protocol.Protocol.main(new HttpProtocol(), args); }