From 7b2051f1a8388feb05f013ac7aa9b2ca4c3ae58e Mon Sep 17 00:00:00 2001 From: Luca Bassi Date: Thu, 11 Apr 2024 14:05:18 +0200 Subject: [PATCH] Add SciTags support in HTTP-TPC --- doc/tpc.md | 13 +++ .../webdav/server/servlet/MiltonFilter.java | 12 +++ .../webdav/server/servlet/SciTagFilter.java | 80 +++++++++++++++++ .../webdav/server/servlet/StoRMServlet.java | 8 +- .../resource/StormResourceService.java | 20 +++++ .../storm/webdav/spring/AppConfig.java | 8 +- .../spring/web/ServletConfiguration.java | 27 ++++-- .../italiangrid/storm/webdav/tpc/SciTag.java | 83 +++++++++++++++++ .../SciTagPlainConnectionSocketFactory.java | 53 +++++++++++ .../tpc/SciTagSSLConnectionSocketFactory.java | 44 +++++++++ .../storm/webdav/tpc/TransferConstants.java | 1 + .../storm/webdav/tpc/TransferFilter.java | 15 ++-- .../webdav/tpc/TransferFilterSupport.java | 15 ++++ .../webdav/tpc/http/HttpTransferClient.java | 16 +++- .../transfer/GetTransferRequestBuilder.java | 2 +- .../transfer/PutTransferRequestBuilder.java | 2 +- .../webdav/tpc/transfer/RequestBuilder.java | 9 ++ .../webdav/tpc/transfer/TransferRequest.java | 4 + .../transfer/impl/GetTransferRequestImpl.java | 7 +- .../transfer/impl/PutTransferRequestImpl.java | 6 +- .../transfer/impl/TransferRequestImpl.java | 11 ++- src/main/resources/application.yml | 3 +- .../webdav/test/tpc/PullTransferTest.java | 18 ++-- .../webdav/test/tpc/PushTransferTest.java | 47 ++++++++-- .../test/tpc/SciTagFilterActivationTest.java | 89 +++++++++++++++++++ .../test/tpc/TransferFilterTestSupport.java | 4 + .../webdav/test/tpc/http/ClientTest.java | 7 +- .../integration/TpcClientRedirectionTest.java | 5 +- .../http/integration/TpcIntegrationTest.java | 6 +- 29 files changed, 563 insertions(+), 52 deletions(-) create mode 100644 src/main/java/org/italiangrid/storm/webdav/server/servlet/SciTagFilter.java create mode 100644 src/main/java/org/italiangrid/storm/webdav/tpc/SciTag.java create mode 100644 src/main/java/org/italiangrid/storm/webdav/tpc/SciTagPlainConnectionSocketFactory.java create mode 100644 src/main/java/org/italiangrid/storm/webdav/tpc/SciTagSSLConnectionSocketFactory.java create mode 100644 src/test/java/org/italiangrid/storm/webdav/test/tpc/SciTagFilterActivationTest.java diff --git a/doc/tpc.md b/doc/tpc.md index d93e0274..ce37378d 100644 --- a/doc/tpc.md +++ b/doc/tpc.md @@ -56,3 +56,16 @@ STORM_WEBDAV_AUTHZ_SERVER_MAX_TOKEN_LIFETIME_SEC="43200" STORM_WEBDAV_REQUIRE_CLIENT_CERT="false" ``` For other configuration options, see the /etc/sysconfig/storm-webdav file. + +## SciTags + +StoRM WebDAV supports the `SciTag` header. +To correctly mark the network packets and/or network flows, you need to install [flowd](https://github.com/scitags/flowd) and configure it to use the `np_api` plugin. + +Example flowd configuration (`/etc/flowd/flowd-tags.cfg`): + +``` +PLUGIN='np_api' +BACKEND='udp_firefly' +FLOW_MAP_API='https://www.scitags.org/api.json' +``` diff --git a/src/main/java/org/italiangrid/storm/webdav/server/servlet/MiltonFilter.java b/src/main/java/org/italiangrid/storm/webdav/server/servlet/MiltonFilter.java index 666ea2a4..c4912e20 100644 --- a/src/main/java/org/italiangrid/storm/webdav/server/servlet/MiltonFilter.java +++ b/src/main/java/org/italiangrid/storm/webdav/server/servlet/MiltonFilter.java @@ -38,6 +38,7 @@ import org.italiangrid.storm.webdav.milton.StoRMResourceFactory; import org.italiangrid.storm.webdav.milton.util.ReplaceContentStrategy; import org.italiangrid.storm.webdav.server.PathResolver; +import org.italiangrid.storm.webdav.tpc.SciTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -133,11 +134,22 @@ public void doMilton(HttpServletRequest request, HttpServletResponse response) { Request miltonReq = new StoRMMiltonRequest(request, servletContext); Response miltonRes = new io.milton.servlet.ServletResponse(response); + if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) { + String localIPAddress = request.getLocalAddr(); + int localPort = request.getLocalPort(); + String remoteIPAddress = request.getRemoteAddr(); + int remotePort = request.getRemotePort(); + ((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeStart(localIPAddress, localPort, + remoteIPAddress, remotePort); + } miltonHTTPManager.process(miltonReq, miltonRes); } finally { MiltonServlet.clearThreadlocals(); + if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) { + ((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeEnd(); + } try { diff --git a/src/main/java/org/italiangrid/storm/webdav/server/servlet/SciTagFilter.java b/src/main/java/org/italiangrid/storm/webdav/server/servlet/SciTagFilter.java new file mode 100644 index 00000000..f5651140 --- /dev/null +++ b/src/main/java/org/italiangrid/storm/webdav/server/servlet/SciTagFilter.java @@ -0,0 +1,80 @@ +/** + * Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.italiangrid.storm.webdav.server.servlet; + +import java.io.IOException; +import java.util.Optional; + +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; + +import org.italiangrid.storm.webdav.tpc.SciTag; +import org.italiangrid.storm.webdav.tpc.TransferConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +public class SciTagFilter implements Filter { + + public static final Logger logger = LoggerFactory.getLogger(SciTagFilter.class); + + @Autowired + public SciTagFilter() {} + + @Override + public void init(FilterConfig filterConfig) throws ServletException { + logger.debug("Initializing SciTag filter."); + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + SciTag scitag = null; + HttpServletRequest req = (HttpServletRequest) request; + if (req.getHeader(TransferConstants.SCITAG_HEADER) != null) { + Optional source = Optional.ofNullable(req.getHeader(TransferConstants.SOURCE_HEADER)); + boolean remoteAddressIsSource = + req.getMethod().equals("PUT") || (req.getMethod().equals("COPY") && source.isPresent()); + // state prot src_ip src_port dst_ip dst_port exp act + // If the active party receives an HTTP-TPC COPY request with a SciTag request header with + // a valid value then the server SHOULD mark the resulting network traffic with the + // experiment ID and activity ID encoded in the value. + int scitagValue = Integer.parseInt(req.getHeader(TransferConstants.SCITAG_HEADER)); + // Valid value is a single positive integer > 64 and <65536 (16bit). Any other value is + // considered invalid. + if (scitagValue > 64 && scitagValue < 65536) { + scitag = new SciTag(scitagValue >> 6, scitagValue & ((1 << 6) - 1), remoteAddressIsSource); + } else { + // If the active party receives an HTTP-TPC COPY request with a SciTag request header + // with an invalid value then the server SHOULD mark the resulting network traffic with + // the 0 as the experiment ID and the activity ID. + scitag = new SciTag(0, 0, remoteAddressIsSource); + } + } + request.setAttribute(SciTag.SCITAG_ATTRIBUTE, scitag); + chain.doFilter(request, response); + } + + @Override + public void destroy() { + logger.debug("Destroying SciTag filter."); + } +} diff --git a/src/main/java/org/italiangrid/storm/webdav/server/servlet/StoRMServlet.java b/src/main/java/org/italiangrid/storm/webdav/server/servlet/StoRMServlet.java index 03f83598..68145aa7 100644 --- a/src/main/java/org/italiangrid/storm/webdav/server/servlet/StoRMServlet.java +++ b/src/main/java/org/italiangrid/storm/webdav/server/servlet/StoRMServlet.java @@ -34,7 +34,7 @@ public class StoRMServlet extends DefaultServlet { /** - * + * */ private static final long serialVersionUID = 4204673943980786498L; @@ -74,6 +74,12 @@ public Resource getResource(String pathInContext) { } + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + resourceService.doGet(request, response); + } + @Override protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { diff --git a/src/main/java/org/italiangrid/storm/webdav/server/servlet/resource/StormResourceService.java b/src/main/java/org/italiangrid/storm/webdav/server/servlet/resource/StormResourceService.java index 64684415..b25a7e6b 100644 --- a/src/main/java/org/italiangrid/storm/webdav/server/servlet/resource/StormResourceService.java +++ b/src/main/java/org/italiangrid/storm/webdav/server/servlet/resource/StormResourceService.java @@ -18,6 +18,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import javax.servlet.ServletException; import javax.servlet.RequestDispatcher; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -25,6 +26,7 @@ import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.server.ResourceService; import org.eclipse.jetty.util.URIUtil; +import org.italiangrid.storm.webdav.tpc.SciTag; public class StormResourceService extends ResourceService { @@ -50,6 +52,24 @@ private String pathInContext(HttpServletRequest request) { return URIUtil.addPaths(servletPath, pathInfo); } + @Override + public boolean doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) { + String localIPAddress = request.getLocalAddr(); + int localPort = request.getLocalPort(); + String remoteIPAddress = request.getRemoteAddr(); + int remotePort = request.getRemotePort(); + ((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeStart(localIPAddress, localPort, + remoteIPAddress, remotePort); + } + boolean result = super.doGet(request, response); + if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) { + ((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeEnd(); + } + return result; + } + public boolean doHead(HttpServletRequest request, HttpServletResponse response) throws IOException { diff --git a/src/main/java/org/italiangrid/storm/webdav/spring/AppConfig.java b/src/main/java/org/italiangrid/storm/webdav/spring/AppConfig.java index 22d43a01..2ccd25a4 100644 --- a/src/main/java/org/italiangrid/storm/webdav/spring/AppConfig.java +++ b/src/main/java/org/italiangrid/storm/webdav/spring/AppConfig.java @@ -45,8 +45,6 @@ import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.LayeredConnectionSocketFactory; -import org.apache.http.conn.socket.PlainConnectionSocketFactory; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; @@ -91,6 +89,8 @@ import org.italiangrid.storm.webdav.server.PathResolver; import org.italiangrid.storm.webdav.server.util.CANLListener; import org.italiangrid.storm.webdav.tpc.LocalURLService; +import org.italiangrid.storm.webdav.tpc.SciTagPlainConnectionSocketFactory; +import org.italiangrid.storm.webdav.tpc.SciTagSSLConnectionSocketFactory; import org.italiangrid.storm.webdav.tpc.StaticHostListLocalURLService; import org.italiangrid.storm.webdav.tpc.TransferConstants; import org.italiangrid.storm.webdav.tpc.http.SuperLaxRedirectStrategy; @@ -263,8 +263,8 @@ HttpClientConnectionManager tpcClientConnectionManager(ThirdPartyCopyProperties ctx.init(null, new TrustManager[] {tm}, null); } - ConnectionSocketFactory sf = PlainConnectionSocketFactory.getSocketFactory(); - LayeredConnectionSocketFactory tlsSf = new SSLConnectionSocketFactory(ctx); + ConnectionSocketFactory sf = SciTagPlainConnectionSocketFactory.getSocketFactory(); + LayeredConnectionSocketFactory tlsSf = new SciTagSSLConnectionSocketFactory(ctx); Registry r = RegistryBuilder.create() .register(HTTP, sf) diff --git a/src/main/java/org/italiangrid/storm/webdav/spring/web/ServletConfiguration.java b/src/main/java/org/italiangrid/storm/webdav/spring/web/ServletConfiguration.java index b6813811..410c428b 100644 --- a/src/main/java/org/italiangrid/storm/webdav/spring/web/ServletConfiguration.java +++ b/src/main/java/org/italiangrid/storm/webdav/spring/web/ServletConfiguration.java @@ -39,6 +39,7 @@ import org.italiangrid.storm.webdav.server.servlet.MiltonFilter; import org.italiangrid.storm.webdav.server.servlet.MoveRequestSanityChecksFilter; import org.italiangrid.storm.webdav.server.servlet.SAIndexServlet; +import org.italiangrid.storm.webdav.server.servlet.SciTagFilter; import org.italiangrid.storm.webdav.server.servlet.StoRMServlet; import org.italiangrid.storm.webdav.server.servlet.resource.StormResourceService; import org.italiangrid.storm.webdav.server.tracing.LogbackAccessAuthnInfoFilter; @@ -69,11 +70,12 @@ public class ServletConfiguration { public static final Logger LOG = LoggerFactory.getLogger(ServletConfiguration.class); static final int REQUEST_ID_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1000; - static final int LOGBACK_ACCESS_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1002; - static final int LOG_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1003; - static final int REDIRECT_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1004; - static final int CHECKSUM_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1005; - static final int MACAROON_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1006; + static final int LOGBACK_ACCESS_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1001; + static final int LOG_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1002; + static final int REDIRECT_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1003; + static final int CHECKSUM_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1004; + static final int MACAROON_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1005; + static final int SCITAG_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1006; static final int TPC_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1007; static final int MOVE_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1008; static final int DELETE_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1009; @@ -151,6 +153,15 @@ FilterRegistrationBean macaroonRequestFilter(ObjectMapper return filter; } + @Bean + @ConditionalOnProperty(name = "storm.scitag.enabled", havingValue = "true") + FilterRegistrationBean scitagFilter() { + LOG.info("SciTag filter enabled"); + FilterRegistrationBean filter = new FilterRegistrationBean<>(new SciTagFilter()); + filter.setOrder(SCITAG_FILTER_ORDER); + return filter; + } + @Bean FilterRegistrationBean miltonFilter(FilesystemAccess fsAccess, ExtendedAttributesHelper attrsHelper, PathResolver resolver, ReplaceContentStrategy rcs) { @@ -189,9 +200,9 @@ FilterRegistrationBean tpcFilter(Clock clock, FilesystemAccess f TransferClient metricsClient = new HttpTransferClientMetricsWrapper(registry, client); - FilterRegistrationBean tpcFilter = new FilterRegistrationBean<>( - new TransferFilter(clock, metricsClient, resolver, lus, props.isVerifyChecksum(), - props.getEnableExpectContinueThreshold())); + FilterRegistrationBean tpcFilter = + new FilterRegistrationBean<>(new TransferFilter(clock, metricsClient, resolver, lus, + props.isVerifyChecksum(), props.getEnableExpectContinueThreshold())); tpcFilter.addUrlPatterns("/*"); tpcFilter.setOrder(TPC_FILTER_ORDER); return tpcFilter; diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/SciTag.java b/src/main/java/org/italiangrid/storm/webdav/tpc/SciTag.java new file mode 100644 index 00000000..0d5781e4 --- /dev/null +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/SciTag.java @@ -0,0 +1,83 @@ +/** + * Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.italiangrid.storm.webdav.tpc; + +import java.io.IOException; +import java.io.RandomAccessFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SciTag { + + public static final Logger LOG = LoggerFactory.getLogger(SciTag.class); + public static final String SCITAG_ATTRIBUTE = "scitag"; + + private static final String FLOWD_PIPE_NAME = "/var/run/flowd"; + private final boolean remoteAddressIsSource; + private final int experimentId; + private final int activityId; + private String sourceAddress; + private int sourcePort; + private String destinationAddress; + private int destinationPort; + + public SciTag(int experimentId, int activityId, boolean remoteAddressIsSource) { + this.experimentId = experimentId; + this.activityId = activityId; + this.remoteAddressIsSource = remoteAddressIsSource; + } + + public int experimentId() { + return experimentId; + } + + public int activityId() { + return activityId; + } + + private String flowdEntry() { + return " tcp " + sourceAddress + " " + sourcePort + " " + destinationAddress + " " + + destinationPort + " " + this.experimentId() + " " + this.activityId() + "\n"; + } + + public void writeStart(String localAddress, int localPort, String remoteAddress, int remotePort) { + if (remoteAddressIsSource) { + this.sourceAddress = remoteAddress; + this.sourcePort = remotePort; + this.destinationAddress = localAddress; + this.destinationPort = localPort; + } else { + this.sourceAddress = localAddress; + this.sourcePort = localPort; + this.destinationAddress = remoteAddress; + this.destinationPort = remotePort; + } + try (RandomAccessFile flowdPipe = new RandomAccessFile(FLOWD_PIPE_NAME, "rw")) { + flowdPipe.writeBytes("start" + this.flowdEntry()); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + } + } + + public void writeEnd() { + try (RandomAccessFile flowdPipe = new RandomAccessFile(FLOWD_PIPE_NAME, "rw")) { + flowdPipe.writeBytes("end" + this.flowdEntry()); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + } + } + +} diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/SciTagPlainConnectionSocketFactory.java b/src/main/java/org/italiangrid/storm/webdav/tpc/SciTagPlainConnectionSocketFactory.java new file mode 100644 index 00000000..5e0d9f06 --- /dev/null +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/SciTagPlainConnectionSocketFactory.java @@ -0,0 +1,53 @@ +/** + * Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.italiangrid.storm.webdav.tpc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import org.apache.http.HttpHost; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.protocol.HttpContext; + +public class SciTagPlainConnectionSocketFactory extends PlainConnectionSocketFactory { + + public static final SciTagPlainConnectionSocketFactory INSTANCE = new SciTagPlainConnectionSocketFactory(); + + public static SciTagPlainConnectionSocketFactory getSocketFactory() { + return INSTANCE; + } + + public SciTagPlainConnectionSocketFactory() { + super(); + } + + @Override + public Socket connectSocket(int connectTimeout, Socket socket, HttpHost host, + InetSocketAddress remoteAddress, InetSocketAddress localAddress, HttpContext context) + throws IOException { + Socket s = + super.connectSocket(connectTimeout, socket, host, remoteAddress, localAddress, context); + SciTag scitag = (SciTag) context.getAttribute(SciTag.SCITAG_ATTRIBUTE); + if (scitag != null) { + String localIPAddress = s.getLocalAddress().getHostAddress(); + int localPort = s.getLocalPort(); + String remoteIPAddress = s.getInetAddress().getHostAddress(); + int remotePort = s.getPort(); + scitag.writeStart(localIPAddress, localPort, remoteIPAddress, remotePort); + } + return s; + } +} diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/SciTagSSLConnectionSocketFactory.java b/src/main/java/org/italiangrid/storm/webdav/tpc/SciTagSSLConnectionSocketFactory.java new file mode 100644 index 00000000..0bf46d87 --- /dev/null +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/SciTagSSLConnectionSocketFactory.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.italiangrid.storm.webdav.tpc; + +import java.io.IOException; +import java.net.Socket; +import javax.net.ssl.SSLContext; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.protocol.HttpContext; + +public class SciTagSSLConnectionSocketFactory extends SSLConnectionSocketFactory { + + public SciTagSSLConnectionSocketFactory(SSLContext sslContext) { + super(sslContext); + } + + @Override + public Socket createLayeredSocket(Socket socket, String target, int port, HttpContext context) + throws IOException { + Socket s = super.createLayeredSocket(socket, target, port, context); + SciTag scitag = (SciTag) context.getAttribute(SciTag.SCITAG_ATTRIBUTE); + if (scitag != null) { + String localIPAddress = s.getLocalAddress().getHostAddress(); + int localPort = s.getLocalPort(); + String remoteIPAddress = s.getInetAddress().getHostAddress(); + int remotePort = s.getPort(); + scitag.writeStart(localIPAddress, localPort, remoteIPAddress, remotePort); + } + return s; + } +} diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/TransferConstants.java b/src/main/java/org/italiangrid/storm/webdav/tpc/TransferConstants.java index fa6a55fb..f3659af0 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/TransferConstants.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/TransferConstants.java @@ -28,6 +28,7 @@ public interface TransferConstants { String DESTINATION_HEADER = "Destination"; String OVERWRITE_HEADER = "Overwrite"; String REQUIRE_CHECKSUM_HEADER = "RequireChecksumVerification"; + String SCITAG_HEADER = "SciTag"; String CREDENTIAL_HEADER = "Credential"; String CREDENTIAL_HEADER_NONE_VALUE = "none"; diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilter.java b/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilter.java index d2dec61d..8c7dd3ae 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilter.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilter.java @@ -108,10 +108,11 @@ protected void handleTpc(HttpServletRequest request, HttpServletResponse respons if (validRequest(request, response)) { Optional source = Optional.ofNullable(request.getHeader(SOURCE_HEADER)); + SciTag scitag = (SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE); if (source.isPresent()) { - handlePullCopy(request, response); + handlePullCopy(request, response, scitag); } else { - handlePushCopy(request, response); + handlePushCopy(request, response, scitag); } } @@ -210,8 +211,8 @@ protected void logTransferException(TransferRequest request, Exception e) { } } - protected void handlePullCopy(HttpServletRequest request, HttpServletResponse response) - throws IOException { + protected void handlePullCopy(HttpServletRequest request, HttpServletResponse response, + SciTag scitag) throws IOException { URI uri = URI.create(request.getHeader(SOURCE_HEADER)); String path = getScopedPathInfo(request); @@ -221,6 +222,7 @@ protected void handlePullCopy(HttpServletRequest request, HttpServletResponse re .uri(uri) .path(path) .headers(getTransferHeaders(request, response)) + .scitag(scitag) .verifyChecksum(verifyChecksum && verifyChecksumRequested(request)) .overwrite(overwriteRequested(request)) .build(); @@ -249,8 +251,8 @@ protected void handlePullCopy(HttpServletRequest request, HttpServletResponse re } } - protected void handlePushCopy(HttpServletRequest request, HttpServletResponse response) - throws IOException { + protected void handlePushCopy(HttpServletRequest request, HttpServletResponse response, + SciTag scitag) throws IOException { URI uri = URI.create(request.getHeader(DESTINATION_HEADER)); String path = getScopedPathInfo(request); @@ -259,6 +261,7 @@ protected void handlePushCopy(HttpServletRequest request, HttpServletResponse re .uri(uri) .path(path) .headers(getTransferHeaders(request, response)) + .scitag(scitag) .verifyChecksum(verifyChecksum && verifyChecksumRequested(request)) .overwrite(overwriteRequested(request)) .build(); diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilterSupport.java b/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilterSupport.java index 5a087e9e..8f6685a7 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilterSupport.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilterSupport.java @@ -87,10 +87,25 @@ protected Multimap getTransferHeaders(HttpServletRequest request LOG.warn("Ignoring invalid transfer header {}", headerName); continue; } + if (xferHeaderName.trim().equalsIgnoreCase(SCITAG_HEADER) + && request.getHeader(SCITAG_HEADER) != null) { + // If the active party receives an HTTP-TPC COPY request with both a SciTag request header + // and a TransferHeaderSciTag request header then it SHOULD ignore the + // TransferHeaderSciTag and continue to process the request. + LOG.warn("Ignoring TransferHeaderSciTag header because SciTag header is present"); + continue; + } xferHeaders.put(xferHeaderName.trim(), request.getHeader(headerName)); } } + // If the active party receives an HTTP-TPC COPY request with a SciTag request header then the + // active party SHOULD include a SciTag request header (with the same value as in the COPY + // request) in all related HTTP requests. + if (request.getHeader(SCITAG_HEADER) != null) { + xferHeaders.put(SCITAG_HEADER, request.getHeader(SCITAG_HEADER)); + } + if (isPushTpc(request, localURLService) && request.getContentLength() >= enableExpectContinueThreshold) { xferHeaders.put(org.apache.http.protocol.HTTP.EXPECT_DIRECTIVE, org.apache.http.protocol.HTTP.EXPECT_CONTINUE); diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/http/HttpTransferClient.java b/src/main/java/org/italiangrid/storm/webdav/tpc/http/HttpTransferClient.java index f89f0154..be85c833 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/http/HttpTransferClient.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/http/HttpTransferClient.java @@ -39,11 +39,13 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.impl.client.CloseableHttpClient; import org.italiangrid.storm.webdav.config.ServiceConfigurationProperties; import org.italiangrid.storm.webdav.config.ThirdPartyCopyProperties; import org.italiangrid.storm.webdav.fs.attrs.ExtendedAttributesHelper; import org.italiangrid.storm.webdav.server.PathResolver; +import org.italiangrid.storm.webdav.tpc.SciTag; import org.italiangrid.storm.webdav.tpc.transfer.GetTransferRequest; import org.italiangrid.storm.webdav.tpc.transfer.PutTransferRequest; import org.italiangrid.storm.webdav.tpc.transfer.TransferClient; @@ -165,6 +167,7 @@ public void handle(GetTransferRequest request, TransferStatusCallback cb) { StormCountingOutputStream os = prepareOutputStream(resolver.resolvePath(request.path())); HttpGet get = prepareRequest(request); + HttpClientContext context = HttpClientContext.create(); ScheduledFuture reportTask = executorService.scheduleAtFixedRate(() -> { reportStatus(cb, request, statusBuilder.inProgress(os.getCount())); @@ -172,8 +175,9 @@ public void handle(GetTransferRequest request, TransferStatusCallback cb) { try { + context.setAttribute(SciTag.SCITAG_ATTRIBUTE, request.scitag()); httpClient.execute(get, new GetResponseHandler(request, os, attributesHelper, - MDC.getCopyOfContextMap(), socketBufferSize, true)); + MDC.getCopyOfContextMap(), socketBufferSize, true), context); reportTask.cancel(true); reportStatus(cb, request, statusBuilder.done(os.getCount())); @@ -196,6 +200,9 @@ public void handle(GetTransferRequest request, TransferStatusCallback cb) { if (!reportTask.isCancelled()) { reportTask.cancel(true); } + if (context.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) { + ((SciTag) context.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeEnd(); + } } } @@ -221,6 +228,7 @@ public void handle(PutTransferRequest request, TransferStatusCallback cb) { CountingFileEntity cfe = prepareFileEntity(resolver.resolvePath(request.path())); HttpPut put = null; + HttpClientContext context = HttpClientContext.create(); try { put = prepareRequest(request, cfe); @@ -236,7 +244,8 @@ public void handle(PutTransferRequest request, TransferStatusCallback cb) { try { checkOverwrite(request); - httpClient.execute(put, new PutResponseHandler(MDC.getCopyOfContextMap())); + context.setAttribute(SciTag.SCITAG_ATTRIBUTE, request.scitag()); + httpClient.execute(put, new PutResponseHandler(MDC.getCopyOfContextMap()), context); reportTask.cancel(true); reportStatus(cb, request, statusBuilder.done(cfe.getCount())); } catch (HttpResponseException e) { @@ -255,6 +264,9 @@ public void handle(PutTransferRequest request, TransferStatusCallback cb) { if (!reportTask.isCancelled()) { reportTask.cancel(true); } + if (context.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) { + ((SciTag) context.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeEnd(); + } } } diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/GetTransferRequestBuilder.java b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/GetTransferRequestBuilder.java index a993ff9e..ea9107ec 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/GetTransferRequestBuilder.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/GetTransferRequestBuilder.java @@ -21,7 +21,7 @@ public class GetTransferRequestBuilder extends RequestBuilder { String uuid; @@ -28,6 +30,8 @@ public abstract class RequestBuilder { URI uri; + SciTag scitag; + boolean verifyChecksum = true; boolean overwrite = true; @@ -63,6 +67,11 @@ public RequestBuilder addHeader(String header, String value) { return this; } + public RequestBuilder scitag(SciTag scitag) { + this.scitag = scitag; + return this; + } + public RequestBuilder overwrite(boolean o) { overwrite = o; return this; diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/TransferRequest.java b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/TransferRequest.java index 5b149147..1eb1487d 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/TransferRequest.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/TransferRequest.java @@ -22,6 +22,8 @@ import com.google.common.collect.Multimap; +import org.italiangrid.storm.webdav.tpc.SciTag; + public interface TransferRequest { String uuid(); @@ -32,6 +34,8 @@ public interface TransferRequest { Multimap transferHeaders(); + SciTag scitag(); + boolean verifyChecksum(); boolean overwrite(); diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/GetTransferRequestImpl.java b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/GetTransferRequestImpl.java index 7cb4e2e5..e4a57c48 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/GetTransferRequestImpl.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/GetTransferRequestImpl.java @@ -19,6 +19,7 @@ import java.net.URI; +import org.italiangrid.storm.webdav.tpc.SciTag; import org.italiangrid.storm.webdav.tpc.transfer.GetTransferRequest; import com.google.common.collect.Multimap; @@ -26,9 +27,9 @@ public class GetTransferRequestImpl extends TransferRequestImpl implements GetTransferRequest { public GetTransferRequestImpl(String uuid, String path, URI uri, - Multimap xferHeaders, - boolean verifyChecksum, boolean overwrite) { - super(uuid, path, uri, xferHeaders, verifyChecksum, overwrite); + Multimap xferHeaders, SciTag scitag, boolean verifyChecksum, + boolean overwrite) { + super(uuid, path, uri, xferHeaders, scitag, verifyChecksum, overwrite); } @Override diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/PutTransferRequestImpl.java b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/PutTransferRequestImpl.java index b2f4d37c..99e65775 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/PutTransferRequestImpl.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/PutTransferRequestImpl.java @@ -19,6 +19,7 @@ import java.net.URI; +import org.italiangrid.storm.webdav.tpc.SciTag; import org.italiangrid.storm.webdav.tpc.transfer.PutTransferRequest; import com.google.common.collect.Multimap; @@ -26,8 +27,9 @@ public class PutTransferRequestImpl extends TransferRequestImpl implements PutTransferRequest { public PutTransferRequestImpl(String uuid, String path, URI uri, - Multimap xferHeaders, boolean verifyChecksum, boolean overwrite) { - super(uuid, path, uri, xferHeaders, verifyChecksum, overwrite); + Multimap xferHeaders, SciTag scitag, boolean verifyChecksum, + boolean overwrite) { + super(uuid, path, uri, xferHeaders, scitag, verifyChecksum, overwrite); } @Override diff --git a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/TransferRequestImpl.java b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/TransferRequestImpl.java index a1d29eb4..5e03dbfe 100644 --- a/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/TransferRequestImpl.java +++ b/src/main/java/org/italiangrid/storm/webdav/tpc/transfer/impl/TransferRequestImpl.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.Optional; +import org.italiangrid.storm.webdav.tpc.SciTag; import org.italiangrid.storm.webdav.tpc.transfer.TransferRequest; import org.italiangrid.storm.webdav.tpc.transfer.TransferStatus; import org.italiangrid.storm.webdav.tpc.transfer.TransferStatus.Status; @@ -40,6 +41,8 @@ public abstract class TransferRequestImpl implements TransferRequest { final Multimap xferHeaders; + final SciTag scitag; + final boolean verifyChecksum; final boolean overwrite; @@ -51,12 +54,13 @@ public abstract class TransferRequestImpl implements TransferRequest { private Optional lastTransferStatus = Optional.empty(); TransferRequestImpl(String uuid, String path, URI uri, Multimap xferHeaders, - boolean verifyChecksum, boolean overwrite) { + SciTag scitag, boolean verifyChecksum, boolean overwrite) { this.uuid = uuid; this.path = path; this.uri = uri; this.xferHeaders = xferHeaders; + this.scitag = scitag; this.verifyChecksum = verifyChecksum; this.overwrite = overwrite; } @@ -76,6 +80,11 @@ public Multimap transferHeaders() { return xferHeaders; } + @Override + public SciTag scitag() { + return scitag; + } + @Override public boolean verifyChecksum() { return verifyChecksum; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4db620d9..6e370b7f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -59,6 +59,8 @@ oauth: storm: checksum-strategy: early + scitag: + enabled: ${STORM_SCITAG:false} redirector: enabled: false @@ -173,4 +175,3 @@ storm: tape: well-known: source: ${STORM_WEBDAV_TAPE_WELLKNOWN_SOURCE:/etc/storm/webdav/wlcg-tape-rest-api.json} - diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/PullTransferTest.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/PullTransferTest.java index 3c7e84a6..8e3b40ab 100644 --- a/src/test/java/org/italiangrid/storm/webdav/test/tpc/PullTransferTest.java +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/PullTransferTest.java @@ -104,9 +104,10 @@ void checkTransferHeaderPassing() throws IOException, ServletException { .thenReturn(TRANSFER_HEADER_AUTHORIZATION_VALUE); when(request.getHeader(TRANSFER_HEADER_WHATEVER_KEY)) .thenReturn(TRANSFER_HEADER_WHATEVER_VALUE); + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); - when(request.getHeaderNames()).thenReturn( - enumeration(asList(TRANSFER_HEADER_AUTHORIZATION_KEY, TRANSFER_HEADER_WHATEVER_KEY))); + when(request.getHeaderNames()).thenReturn(enumeration( + asList(TRANSFER_HEADER_AUTHORIZATION_KEY, TRANSFER_HEADER_WHATEVER_KEY, SCITAG_HEADER))); filter.doFilter(request, response, chain); verify(client).handle(getXferRequest.capture(), Mockito.any()); @@ -118,21 +119,24 @@ void checkTransferHeaderPassing() throws IOException, ServletException { Multimap xferHeaders = getXferRequest.getValue().transferHeaders(); - assertThat(xferHeaders.size(), is(2)); + assertThat(xferHeaders.size(), is(3)); assertThat(xferHeaders.containsKey("Authorization"), is(true)); assertThat(xferHeaders.get("Authorization").iterator().next(), is(TRANSFER_HEADER_AUTHORIZATION_VALUE)); assertThat(xferHeaders.containsKey("Whatever"), is(true)); assertThat(xferHeaders.get("Whatever").iterator().next(), is(TRANSFER_HEADER_WHATEVER_VALUE)); + assertThat(xferHeaders.containsKey("SciTag"), is(true)); + assertThat(xferHeaders.get("SciTag").iterator().next(), is(SCITAG_HEADER_VALUE)); } @Test void emptyTransferHeaderAreIgnored() throws IOException, ServletException { - when(request.getHeaderNames()) - .thenReturn(enumeration(asList(TRANSFER_HEADER, TRANSFER_HEADER_WHATEVER_KEY))); + when(request.getHeaderNames()).thenReturn( + enumeration(asList(TRANSFER_HEADER, TRANSFER_HEADER_WHATEVER_KEY, SCITAG_HEADER))); when(request.getHeader(TRANSFER_HEADER_WHATEVER_KEY)) .thenReturn(TRANSFER_HEADER_WHATEVER_VALUE); + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); filter.doFilter(request, response, chain); verify(client).handle(getXferRequest.capture(), Mockito.any()); @@ -144,10 +148,12 @@ void emptyTransferHeaderAreIgnored() throws IOException, ServletException { Multimap xferHeaders = getXferRequest.getValue().transferHeaders(); - assertThat(xferHeaders.size(), is(1)); + assertThat(xferHeaders.size(), is(2)); assertThat(xferHeaders.containsKey("Whatever"), is(true)); assertThat(xferHeaders.get("Whatever").iterator().next(), is(TRANSFER_HEADER_WHATEVER_VALUE)); + assertThat(xferHeaders.containsKey("SciTag"), is(true)); + assertThat(xferHeaders.get("SciTag").iterator().next(), is(SCITAG_HEADER_VALUE)); } } diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/PushTransferTest.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/PushTransferTest.java index e70feeb0..e81eab86 100644 --- a/src/test/java/org/italiangrid/storm/webdav/test/tpc/PushTransferTest.java +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/PushTransferTest.java @@ -112,9 +112,10 @@ void checkTransferHeaderPassing() throws IOException, ServletException { .thenReturn(TRANSFER_HEADER_AUTHORIZATION_VALUE); when(request.getHeader(TRANSFER_HEADER_WHATEVER_KEY)) .thenReturn(TRANSFER_HEADER_WHATEVER_VALUE); + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); - when(request.getHeaderNames()).thenReturn( - enumeration(asList(TRANSFER_HEADER_AUTHORIZATION_KEY, TRANSFER_HEADER_WHATEVER_KEY))); + when(request.getHeaderNames()).thenReturn(enumeration( + asList(TRANSFER_HEADER_AUTHORIZATION_KEY, TRANSFER_HEADER_WHATEVER_KEY, SCITAG_HEADER))); filter.doFilter(request, response, chain); verify(client).handle(putXferRequest.capture(), Mockito.any()); @@ -126,21 +127,24 @@ void checkTransferHeaderPassing() throws IOException, ServletException { Multimap xferHeaders = putXferRequest.getValue().transferHeaders(); - assertThat(xferHeaders.size(), is(2)); + assertThat(xferHeaders.size(), is(3)); assertThat(xferHeaders.containsKey("Authorization"), is(true)); assertThat(xferHeaders.get("Authorization").iterator().next(), is(TRANSFER_HEADER_AUTHORIZATION_VALUE)); assertThat(xferHeaders.containsKey("Whatever"), is(true)); assertThat(xferHeaders.get("Whatever").iterator().next(), is(TRANSFER_HEADER_WHATEVER_VALUE)); + assertThat(xferHeaders.containsKey("SciTag"), is(true)); + assertThat(xferHeaders.get("SciTag").iterator().next(), is(SCITAG_HEADER_VALUE)); } @Test void emptyTransferHeaderAreIgnored() throws IOException, ServletException { - when(request.getHeaderNames()) - .thenReturn(enumeration(asList(TRANSFER_HEADER, TRANSFER_HEADER_WHATEVER_KEY))); + when(request.getHeaderNames()).thenReturn( + enumeration(asList(TRANSFER_HEADER, TRANSFER_HEADER_WHATEVER_KEY, SCITAG_HEADER))); when(request.getHeader(TRANSFER_HEADER_WHATEVER_KEY)) .thenReturn(TRANSFER_HEADER_WHATEVER_VALUE); + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); filter.doFilter(request, response, chain); verify(client).handle(putXferRequest.capture(), Mockito.any()); @@ -152,10 +156,12 @@ void emptyTransferHeaderAreIgnored() throws IOException, ServletException { Multimap xferHeaders = putXferRequest.getValue().transferHeaders(); - assertThat(xferHeaders.size(), is(1)); + assertThat(xferHeaders.size(), is(2)); assertThat(xferHeaders.containsKey("Whatever"), is(true)); assertThat(xferHeaders.get("Whatever").iterator().next(), is(TRANSFER_HEADER_WHATEVER_VALUE)); + assertThat(xferHeaders.containsKey("SciTag"), is(true)); + assertThat(xferHeaders.get("SciTag").iterator().next(), is(SCITAG_HEADER_VALUE)); } @@ -177,13 +183,14 @@ void checkExpectContinueHeaderIsSet() throws IOException, ServletException { .thenReturn(TRANSFER_HEADER_AUTHORIZATION_VALUE); when(request.getHeaderNames()).thenReturn( enumeration(asList(TRANSFER_HEADER_AUTHORIZATION_KEY))); + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); when(request.getContentLength()).thenReturn(1024*1024+1); filter.doFilter(request, response, chain); verify(client).handle(putXferRequest.capture(), Mockito.any()); Multimap xferHeaders = putXferRequest.getValue().transferHeaders(); - assertThat(xferHeaders.size(), is(2)); + assertThat(xferHeaders.size(), is(3)); assertThat(xferHeaders.containsKey(EXPECTED_HEADER), is(true)); assertThat(xferHeaders.get(EXPECTED_HEADER).iterator().next(), is(EXPECTED_VALUE)); @@ -197,14 +204,38 @@ void checkExpectContinueHeaderIsNotSet() throws IOException, ServletException { .thenReturn(TRANSFER_HEADER_AUTHORIZATION_VALUE); when(request.getHeaderNames()).thenReturn( enumeration(asList(TRANSFER_HEADER_AUTHORIZATION_KEY))); + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); when(request.getContentLength()).thenReturn(1024*1024-1); filter.doFilter(request, response, chain); verify(client).handle(putXferRequest.capture(), Mockito.any()); Multimap xferHeaders = putXferRequest.getValue().transferHeaders(); - assertThat(xferHeaders.size(), is(1)); + assertThat(xferHeaders.size(), is(2)); assertThat(xferHeaders.containsKey(EXPECTED_HEADER), is(false)); } + + @Test + void bothSciTagAndTransferHeaderSciTag() throws IOException, ServletException { + when(request.getHeaderNames()) + .thenReturn(enumeration(asList(SCITAG_HEADER, TRANSFER_HEADER_SCITAG))); + + when(request.getHeader(SCITAG_HEADER)).thenReturn(SCITAG_HEADER_VALUE); + + filter.doFilter(request, response, chain); + verify(client).handle(putXferRequest.capture(), Mockito.any()); + + assertThat(putXferRequest.getValue().path(), is(FULL_LOCAL_PATH)); + assertThat(putXferRequest.getValue().remoteURI(), is(HTTPS_URL_URI)); + assertThat(putXferRequest.getValue().overwrite(), is(true)); + assertThat(putXferRequest.getValue().verifyChecksum(), is(true)); + + + Multimap xferHeaders = putXferRequest.getValue().transferHeaders(); + assertThat(xferHeaders.size(), is(1)); + + assertThat(xferHeaders.containsKey("SciTag"), is(true)); + assertThat(xferHeaders.get("SciTag").iterator().next(), is(SCITAG_HEADER_VALUE)); + } } diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/SciTagFilterActivationTest.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/SciTagFilterActivationTest.java new file mode 100644 index 00000000..bcc7655d --- /dev/null +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/SciTagFilterActivationTest.java @@ -0,0 +1,89 @@ +/** + * Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.italiangrid.storm.webdav.test.tpc; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import javax.servlet.ServletException; + +import org.italiangrid.storm.webdav.config.StorageAreaInfo; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.italiangrid.storm.webdav.server.servlet.SciTagFilter; +import org.italiangrid.storm.webdav.tpc.SciTag; + +@ExtendWith(MockitoExtension.class) +class SciTagFilterActivationTest extends TransferFilterTestSupport { + + SciTagFilter sciTagFilter; + + @Mock + StorageAreaInfo testSa; + + @Mock + StorageAreaInfo otherSa; + + @Override + @BeforeEach + public void setup() throws IOException { + super.setup(); + sciTagFilter = new SciTagFilter(); + lenient().when(request.getServletPath()).thenReturn(SERVLET_PATH); + lenient().when(request.getPathInfo()).thenReturn(LOCAL_PATH); + lenient().when(response.getWriter()).thenReturn(responseWriter); + lenient().when(resolver.resolveStorageArea(FULL_LOCAL_PATH)).thenReturn(testSa); + lenient().when(resolver.resolveStorageArea("/test/otherfile")).thenReturn(testSa); + lenient().when(resolver.resolveStorageArea("/other/file")).thenReturn(otherSa); + lenient().when(request.getHeader(SOURCE_HEADER)).thenReturn(null); + } + + @Test + void requestWithScitag() throws IOException, ServletException { + when(request.getMethod()).thenReturn("GET"); + when(request.getHeader(SCITAG_HEADER)).thenReturn("66"); + + sciTagFilter.doFilter(request, response, chain); + verify(chain).doFilter(request, response); + verify(request).setAttribute(eq(SciTag.SCITAG_ATTRIBUTE), any(SciTag.class)); + } + + @Test + void requestWithoutScitag() throws IOException, ServletException { + sciTagFilter.doFilter(request, response, chain); + verify(chain).doFilter(request, response); + verify(request, times(0)).setAttribute(eq(SciTag.SCITAG_ATTRIBUTE), any(SciTag.class)); + } + + @Test + void createSciTag() { + SciTag scitag = new SciTag(1, 2, true); + assertThat(scitag.experimentId(), is(1)); + assertThat(scitag.activityId(), is(2)); + } + +} diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/TransferFilterTestSupport.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/TransferFilterTestSupport.java index 1702f996..6733e0e5 100644 --- a/src/test/java/org/italiangrid/storm/webdav/test/tpc/TransferFilterTestSupport.java +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/TransferFilterTestSupport.java @@ -67,6 +67,10 @@ public class TransferFilterTestSupport implements TransferConstants { public static final String TRANSFER_HEADER_WHATEVER_KEY = "TransferHeaderWhatever"; public static final String TRANSFER_HEADER_WHATEVER_VALUE = "papisilviobelluscona"; + public static final String SCITAG_HEADER = "SciTag"; + public static final String SCITAG_HEADER_VALUE = "65"; + public static final String TRANSFER_HEADER_SCITAG = "TransferHeaderSciTag"; + public static final URI HTTP_URL_URI = URI.create(HTTP_URL); public static final URI HTTPS_URL_URI = URI.create(HTTPS_URL); diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/ClientTest.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/ClientTest.java index 4cad306d..353df6ba 100644 --- a/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/ClientTest.java +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/ClientTest.java @@ -30,6 +30,7 @@ import org.apache.http.client.ResponseHandler; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -74,7 +75,8 @@ public void testClientCorrectlyBuildsHttpRequestNoHeaders() throws IOException { }); verify(httpClient).execute(getRequest.capture(), - ArgumentMatchers.>any()); + ArgumentMatchers.>any(), + ArgumentMatchers.any()); HttpGet httpGetReq = getRequest.getValue(); @@ -92,7 +94,8 @@ public void testClientCorrectlyBuildsHttpRequestWithHeaders() throws IOException }); verify(httpClient).execute(getRequest.capture(), - ArgumentMatchers.>any()); + ArgumentMatchers.>any(), + ArgumentMatchers.any()); HttpGet httpGetReq = getRequest.getValue(); diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcClientRedirectionTest.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcClientRedirectionTest.java index 73cebca4..498084a4 100644 --- a/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcClientRedirectionTest.java +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcClientRedirectionTest.java @@ -149,9 +149,8 @@ public void handleCrossProtocolRedirectionCorrectly() { headers.put("Authorization", "Bearer this-is-a-fake-token"); - GetTransferRequest getRequest = - new GetTransferRequestImpl(UUID.randomUUID().toString(), - "/test/example", URI.create(mockHttpsUrl("/test/example")), headers, false, false); + GetTransferRequest getRequest = new GetTransferRequestImpl(UUID.randomUUID().toString(), + "/test/example", URI.create(mockHttpsUrl("/test/example")), headers, null, false, false); mockServer .when(request().withMethod("GET").withPath("/test/example").withSecure(true), diff --git a/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcIntegrationTest.java b/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcIntegrationTest.java index 27b3d7e3..f3bb67f6 100644 --- a/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcIntegrationTest.java +++ b/src/test/java/org/italiangrid/storm/webdav/test/tpc/http/integration/TpcIntegrationTest.java @@ -92,7 +92,7 @@ public void testPutRedirectHandled() { Multimap emptyHeaders = ArrayListMultimap.create(); PutTransferRequest putRequest = new PutTransferRequestImpl(UUID.randomUUID().toString(), - "/test/example", URI.create(mockUrl("/test/example")), emptyHeaders, false, true); + "/test/example", URI.create(mockUrl("/test/example")), emptyHeaders, null, false, true); mockServer.when(request().withMethod("PUT").withPath("/test/example"), Times.exactly(1)) .respond(HttpResponse.response() @@ -121,7 +121,7 @@ public void testAuthorizationHeaderIsDroppedOnRedirectForPut() { headers.put("Authorization", "Bearer this-is-a-fake-token"); PutTransferRequest putRequest = new PutTransferRequestImpl(UUID.randomUUID().toString(), - "/test/example", URI.create(mockUrl("/test/example")), headers, false, true); + "/test/example", URI.create(mockUrl("/test/example")), headers, null, false, true); mockServer.when(request().withMethod("PUT").withPath("/test/example"), Times.exactly(1)) .respond(HttpResponse.response() @@ -153,7 +153,7 @@ public void testAuthorizationHeaderIsDroppedOnRedirectForGet() { GetTransferRequest getRequest = new GetTransferRequestImpl(UUID.randomUUID().toString(), - "/test/example", URI.create(mockUrl("/test/example")), headers, false, false); + "/test/example", URI.create(mockUrl("/test/example")), headers, null, false, false); mockServer.when(request().withMethod("GET").withPath("/test/example"), Times.exactly(1))