diff --git a/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java b/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java index 9c2c019b8..9fd2f373b 100644 --- a/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java +++ b/core/src/main/java/org/apache/stormcrawler/protocol/httpclient/HttpProtocol.java @@ -22,8 +22,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; -import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Locale; import org.apache.commons.lang.StringUtils; @@ -78,10 +77,11 @@ public class HttpProtocol extends AbstractHttpProtocol private int globalMaxContent; - private HttpClientBuilder builder; - private RequestConfig requestConfig; - private RequestConfig.Builder requestConfigBuilder; + + private String userAgent; + + private final List defaultHeaders = new ArrayList<>(); @Override public void configure(final Config conf) { @@ -100,19 +100,14 @@ public void configure(final Config conf) { globalMaxContent = ConfUtils.getInt(conf, "http.content.limit", -1); - String userAgent = getAgentString(conf); - - Collection defaultHeaders = new LinkedList<>(); + userAgent = getAgentString(conf); String accept = ConfUtils.getString(conf, "http.accept"); if (StringUtils.isNotBlank(accept)) { defaultHeaders.add(new BasicHeader("Accept", accept)); } - customHeaders.forEach( - h -> { - defaultHeaders.add(new BasicHeader(h.getKey(), h.getValue())); - }); + customHeaders.forEach(h -> defaultHeaders.add(new BasicHeader(h.getKey(), h.getValue()))); String basicAuthUser = ConfUtils.getString(conf, "http.basicauth.user", null); @@ -132,25 +127,16 @@ public void configure(final Config conf) { defaultHeaders.add(new BasicHeader("Accept-Language", acceptLanguage)); } - builder = - HttpClients.custom() - .setUserAgent(userAgent) - .setDefaultHeaders(defaultHeaders) - .setConnectionManager(CONNECTION_MANAGER) - .setConnectionManagerShared(true) - .disableRedirectHandling() - .disableAutomaticRetries(); - int timeout = ConfUtils.getInt(conf, "http.timeout", 10000); - requestConfigBuilder = - RequestConfig.custom() - .setSocketTimeout(timeout) - .setConnectTimeout(timeout) - .setConnectionRequestTimeout(timeout) - .setCookieSpec(CookieSpecs.STANDARD); - - requestConfig = requestConfigBuilder.build(); + requestConfig = RequestConfig.custom() + .setSocketTimeout(timeout) + .setConnectTimeout(timeout) + .setConnectionRequestTimeout(timeout) + .setCookieSpec(CookieSpecs.STANDARD) + // Can make configurable and add more in future + .setProxyPreferredAuthSchemes(Collections.singletonList(AuthSchemes.BASIC)) + .build(); } @Override @@ -158,45 +144,11 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except LOG.debug("HTTP connection manager stats {}", CONNECTION_MANAGER.getTotalStats()); - // set default request config to global config - RequestConfig reqConfig = requestConfig; - + SCProxy proxy = null; // conditionally add a dynamic proxy if (proxyManager != null) { - // retrieve proxy from proxy manager - SCProxy prox = proxyManager.getProxy(md); - - // conditionally configure proxy authentication - if (StringUtils.isNotBlank(prox.getUsername())) { - List authSchemes = new ArrayList<>(); - - // Can make configurable and add more in future - authSchemes.add(AuthSchemes.BASIC); - requestConfigBuilder.setProxyPreferredAuthSchemes(authSchemes); - - BasicCredentialsProvider basicAuthCreds = new BasicCredentialsProvider(); - basicAuthCreds.setCredentials( - new AuthScope(prox.getAddress(), Integer.parseInt(prox.getPort())), - new UsernamePasswordCredentials(prox.getUsername(), prox.getPassword())); - builder.setDefaultCredentialsProvider(basicAuthCreds); - } - - HttpHost proxy = new HttpHost(prox.getAddress(), Integer.parseInt(prox.getPort())); - DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy); - builder.setRoutePlanner(routePlanner); - - // save start time for debugging speed impact of request config - // build - long buildStart = System.currentTimeMillis(); - - // set request config to new configuration with dynamic proxy - reqConfig = requestConfigBuilder.build(); - - LOG.debug( - "time to build http request config with proxy: {}ms", - System.currentTimeMillis() - buildStart); - - LOG.debug("fetching with " + prox.toString()); + proxy = proxyManager.getProxy(md); + LOG.debug("fetching with {}", proxy); } HttpRequestBase request = new HttpGet(url); @@ -246,11 +198,11 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except } } - request.setConfig(reqConfig); + request.setConfig(requestConfig); // no need to release the connection explicitly as this is handled // automatically. The client itself must be closed though. - try (CloseableHttpClient httpclient = builder.build()) { + try (CloseableHttpClient httpclient = createClient(proxy)) { return httpclient.execute(request, responseHandler); } } @@ -372,6 +324,32 @@ private static byte[] toByteArray( return buffer.toByteArray(); } + private CloseableHttpClient createClient(final SCProxy proxy) { + final HttpClientBuilder builder = HttpClients.custom() + .setUserAgent(userAgent) + .setDefaultHeaders(defaultHeaders) + .setConnectionManager(CONNECTION_MANAGER) + .setConnectionManagerShared(true) + .disableRedirectHandling() + .disableAutomaticRetries(); + + if (proxy != null) { + final DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(new HttpHost( + proxy.getAddress(), Integer.parseInt(proxy.getPort()))); + builder.setRoutePlanner(routePlanner); + if (StringUtils.isNotBlank(proxy.getUsername())) { + // conditionally configure proxy authentication + final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(proxy.getAddress(), Integer.parseInt(proxy.getPort())), + new UsernamePasswordCredentials(proxy.getUsername(), proxy.getPassword())); + builder.setDefaultCredentialsProvider(credentialsProvider); + } + } + + return builder.build(); + } + public static void main(String[] args) throws Exception { Protocol.main(new HttpProtocol(), args); } diff --git a/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java b/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java index 2aefa950e..e360990f5 100644 --- a/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java +++ b/core/src/main/java/org/apache/stormcrawler/protocol/okhttp/HttpProtocol.java @@ -32,9 +32,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; @@ -43,8 +41,6 @@ import okhttp3.Connection; import okhttp3.ConnectionPool; import okhttp3.Credentials; -import okhttp3.EventListener; -import okhttp3.EventListener.Factory; import okhttp3.Handshake; import okhttp3.Headers; import okhttp3.Interceptor; @@ -87,6 +83,18 @@ public class HttpProtocol extends AbstractHttpProtocol { private int completionTimeout = -1; + private int httpTimeout; + + private boolean retryOnConnectionFailure; + + private boolean followRedirects; + + private boolean insecure; + + private final List protocols = new ArrayList<>(); + + private ConnectionPool connectionPool = null; + /** Accept partially fetched content as trimmed content */ private boolean partialContentAsTrimmed = false; @@ -95,8 +103,6 @@ public class HttpProtocol extends AbstractHttpProtocol { // track the time spent for each URL in DNS resolution private final Map DNStimes = new HashMap<>(); - private OkHttpClient.Builder builder; - private static final TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() { @@ -137,7 +143,7 @@ public void configure(Config conf) { globalMaxContent = ConfUtils.getInt(conf, "http.content.limit", -1); - final int timeout = ConfUtils.getInt(conf, "http.timeout", 10000); + this.httpTimeout = ConfUtils.getInt(conf, "http.timeout", 10000); this.completionTimeout = ConfUtils.getInt(conf, "topology.message.timeout.secs", completionTimeout); @@ -145,19 +151,13 @@ public void configure(Config conf) { this.partialContentAsTrimmed = ConfUtils.getBoolean(conf, "http.content.partial.as.trimmed", false); - builder = - new OkHttpClient.Builder() - .retryOnConnectionFailure( - ConfUtils.getBoolean( - conf, "http.retry.on.connection.failure", true)) - .followRedirects(ConfUtils.getBoolean(conf, "http.allow.redirects", false)) - .connectTimeout(timeout, TimeUnit.MILLISECONDS) - .writeTimeout(timeout, TimeUnit.MILLISECONDS) - .readTimeout(timeout, TimeUnit.MILLISECONDS); + this.retryOnConnectionFailure = ConfUtils.getBoolean( + conf, "http.retry.on.connection.failure", true); + + this.followRedirects = ConfUtils.getBoolean(conf, "http.allow.redirects", false); // protocols in order of preference, see // https://square.github.io/okhttp/4.x/okhttp/okhttp3/-ok-http-client/-builder/protocols/ - final List protocols = new ArrayList<>(); for (String pVersion : protocolVersions) { switch (pVersion) { case "h2": @@ -181,10 +181,6 @@ public void configure(Config conf) { break; } } - if (protocols.size() > 0) { - LOG.info("Using protocol versions: {}", protocols); - builder.protocols(protocols); - } final String userAgent = getAgentString(conf); if (StringUtils.isNotBlank(userAgent)) { @@ -216,38 +212,14 @@ public void configure(Config conf) { customHeaders.forEach(customRequestHeaders::add); - if (storeHTTPHeaders) { - builder.addNetworkInterceptor(new HTTPHeadersInterceptor()); - } - - if (ConfUtils.getBoolean(conf, "http.trust.everything", true)) { - builder.sslSocketFactory(trustAllSslSocketFactory, (X509TrustManager) trustAllCerts[0]); - builder.hostnameVerifier( - new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession session) { - return true; - } - }); - } - - builder.eventListenerFactory( - new Factory() { - @Override - public EventListener create(Call call) { - return new DNSResolutionListener(DNStimes); - } - }); - - // enable support for Brotli compression (Content-Encoding) - builder.addInterceptor(BrotliInterceptor.INSTANCE); + this.insecure = ConfUtils.getBoolean(conf, "http.trust.everything", true); final Map connectionPoolConf = (Map) conf.get("okhttp.protocol.connection.pool"); if (connectionPoolConf != null) { final int size = ConfUtils.getInt(connectionPoolConf, "max.idle.connections", 5); final int time = ConfUtils.getInt(connectionPoolConf, "connection.keep.alive", 300); - builder.connectionPool(new ConnectionPool(size, time, TimeUnit.SECONDS)); + this.connectionPool = new ConnectionPool(size, time, TimeUnit.SECONDS); LOG.info( "Using connection pool with max. {} idle connections " + "and {} sec. connection keep-alive time", @@ -255,7 +227,8 @@ public EventListener create(Call call) { time); } - client = builder.build(); + // default client without proxy + client = createClient(null); } private void addCookiesToRequest(Builder rb, String url, Metadata md) { @@ -292,46 +265,20 @@ public ProtocolResponse getProtocolOutput(String url, final Metadata metadata) // conditionally add a dynamic proxy if (proxyManager != null) { // retrieve proxy from proxy manager - SCProxy prox = proxyManager.getProxy(metadata); - - // conditionally configure proxy authentication - if (StringUtils.isNotBlank(prox.getAddress())) { - // format SCProxy into native Java proxy - Proxy proxy = - new Proxy( - Proxy.Type.valueOf(prox.getProtocol().toUpperCase(Locale.ROOT)), - new InetSocketAddress( - prox.getAddress(), Integer.parseInt(prox.getPort()))); - - // set proxy in builder - builder.proxy(proxy); - - // conditionally add proxy authentication - if (StringUtils.isNotBlank(prox.getUsername())) { - // add proxy authentication header to builder - builder.proxyAuthenticator( - (Route route, Response response) -> { - String credential = - Credentials.basic(prox.getUsername(), prox.getPassword()); - return response.request() - .newBuilder() - .header("Proxy-Authorization", credential) - .build(); - }); - } + SCProxy proxy = proxyManager.getProxy(metadata); + if (StringUtils.isNotBlank(proxy.getAddress())) { + // create new local client from builder using proxy + localClient = createClient(proxy); } // save start time for debugging speed impact of client build long buildStart = System.currentTimeMillis(); - // create new local client from builder using proxy - localClient = builder.build(); - LOG.debug( "time to build okhttp client with proxy: {}ms", System.currentTimeMillis() - buildStart); - LOG.debug("fetching with proxy {} - {} ", url, prox.toString()); + LOG.debug("fetching with proxy {} - {} ", url, proxy); } final Builder rb = new Request.Builder().url(url); @@ -616,6 +563,52 @@ public Response intercept(Interceptor.Chain chain) throws IOException { } } + private OkHttpClient createClient(final SCProxy proxy) { + final OkHttpClient.Builder builder = new OkHttpClient.Builder() + .retryOnConnectionFailure(retryOnConnectionFailure) + .followRedirects(followRedirects) + .connectTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .writeTimeout(httpTimeout, TimeUnit.MILLISECONDS) + .readTimeout(httpTimeout, TimeUnit.MILLISECONDS); + + if (protocols.size() > 0) { + LOG.info("Using protocol versions: {}", protocols); + builder.protocols(protocols); + } + + if (storeHTTPHeaders) { + builder.addNetworkInterceptor(new HTTPHeadersInterceptor()); + } + + if (insecure) { + builder.sslSocketFactory(trustAllSslSocketFactory, (X509TrustManager) trustAllCerts[0]); + builder.hostnameVerifier((hostname, session) -> true); + } + + builder.eventListenerFactory(call -> new DNSResolutionListener(DNStimes)); + + // enable support for Brotli compression (Content-Encoding) + builder.addInterceptor(BrotliInterceptor.INSTANCE); + + builder.connectionPool(connectionPool); + + if (proxy != null) { + builder.proxy(new Proxy( + Proxy.Type.valueOf(proxy.getProtocol().toUpperCase(Locale.ROOT)), + new InetSocketAddress(proxy.getAddress(), Integer.parseInt(proxy.getPort())))); + if (StringUtils.isNotBlank(proxy.getUsername())) { + builder.proxyAuthenticator((Route route, Response response) -> { + String credential = Credentials.basic(proxy.getUsername(), proxy.getPassword()); + return response.request().newBuilder() + .header("Proxy-Authorization", credential) + .build(); + }); + } + } + + return builder.build(); + } + public static void main(String args[]) throws Exception { org.apache.stormcrawler.protocol.Protocol.main(new HttpProtocol(), args); }