Skip to content

Commit

Permalink
Add SciTags support in HTTP-TPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Bassi committed Oct 9, 2024
1 parent 577ec80 commit 63498c9
Show file tree
Hide file tree
Showing 30 changed files with 568 additions and 52 deletions.
13 changes: 13 additions & 0 deletions doc/tpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ STORM_WEBDAV_AUTHZ_SERVER_MAX_TOKEN_LIFETIME_SEC="43200"
STORM_WEBDAV_REQUIRE_CLIENT_CERT="false"
```
For other configuration options, see the /etc/sysconfig/storm-webdav file.

## SciTags

StoRM WebDAV supports the `SciTag` header.
To correctly mark the network packets and/or network flows, you need to install [flowd](https://github.com/scitags/flowd) and configure it to use the `np_api` plugin.

Example flowd configuration (`/etc/flowd/flowd-tags.cfg`):

```
PLUGIN='np_api'
BACKEND='udp_firefly'
FLOW_MAP_API='https://www.scitags.org/api.json'
```
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.italiangrid.storm.webdav.milton.StoRMResourceFactory;
import org.italiangrid.storm.webdav.milton.util.ReplaceContentStrategy;
import org.italiangrid.storm.webdav.server.PathResolver;
import org.italiangrid.storm.webdav.tpc.SciTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -133,11 +134,22 @@ public void doMilton(HttpServletRequest request, HttpServletResponse response) {
Request miltonReq = new StoRMMiltonRequest(request, servletContext);

Response miltonRes = new io.milton.servlet.ServletResponse(response);
if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) {
String localIPAddress = request.getLocalAddr();
int localPort = request.getLocalPort();
String remoteIPAddress = request.getRemoteAddr();
int remotePort = request.getRemotePort();
((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeStart(localIPAddress, localPort,
remoteIPAddress, remotePort);
}
miltonHTTPManager.process(miltonReq, miltonRes);

} finally {

MiltonServlet.clearThreadlocals();
if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) {
((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeEnd();
}

try {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.italiangrid.storm.webdav.server.servlet;

import java.io.IOException;
import java.util.Optional;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;

import org.italiangrid.storm.webdav.tpc.SciTag;
import org.italiangrid.storm.webdav.tpc.TransferConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class SciTagFilter implements Filter {

public static final Logger logger = LoggerFactory.getLogger(SciTagFilter.class);

@Autowired
public SciTagFilter() {}

@Override
public void init(FilterConfig filterConfig) throws ServletException {
logger.debug("Initializing SciTag filter.");
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
SciTag scitag = null;
HttpServletRequest req = (HttpServletRequest) request;
if (req.getHeader(TransferConstants.SCITAG_HEADER) != null) {
Optional<String> source = Optional.ofNullable(req.getHeader(TransferConstants.SOURCE_HEADER));
boolean remoteAddressIsSource =
req.getMethod().equals("PUT") || (req.getMethod().equals("COPY") && source.isPresent());
// state prot src_ip src_port dst_ip dst_port exp act
// If the active party receives an HTTP-TPC COPY request with a SciTag request header with
// a valid value then the server SHOULD mark the resulting network traffic with the
// experiment ID and activity ID encoded in the value.
int scitagValue = Integer.parseInt(req.getHeader(TransferConstants.SCITAG_HEADER));
// Valid value is a single positive integer > 64 and <65536 (16bit). Any other value is
// considered invalid.
if (scitagValue > 64 && scitagValue < 65536) {
scitag = new SciTag(scitagValue >> 6, scitagValue & ((1 << 6) - 1), remoteAddressIsSource);
} else {
// If the active party receives an HTTP-TPC COPY request with a SciTag request header
// with an invalid value then the server SHOULD mark the resulting network traffic with
// the 0 as the experiment ID and the activity ID.
scitag = new SciTag(0, 0, remoteAddressIsSource);
}
}
request.setAttribute(SciTag.SCITAG_ATTRIBUTE, scitag);
chain.doFilter(request, response);
}

@Override
public void destroy() {
logger.debug("Destroying SciTag filter.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class StoRMServlet extends DefaultServlet {

/**
*
*
*/
private static final long serialVersionUID = 4204673943980786498L;

Expand Down Expand Up @@ -74,6 +74,12 @@ public Resource getResource(String pathInContext) {

}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
resourceService.doGet(request, response);

Check notice

Code scanning / SonarCloud

Exceptions should not be thrown from servlet methods Low

Handle the following exceptions that could be thrown by "doGet": ServletException, IOException. See more on SonarCloud
}

@Override
protected void doHead(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.RequestDispatcher;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.server.ResourceService;
import org.eclipse.jetty.util.URIUtil;
import org.italiangrid.storm.webdav.tpc.SciTag;

public class StormResourceService extends ResourceService {

Expand All @@ -50,6 +52,24 @@ private String pathInContext(HttpServletRequest request) {
return URIUtil.addPaths(servletPath, pathInfo);
}

@Override
public boolean doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) {
String localIPAddress = request.getLocalAddr();
int localPort = request.getLocalPort();
String remoteIPAddress = request.getRemoteAddr();
int remotePort = request.getRemotePort();
((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeStart(localIPAddress, localPort,
remoteIPAddress, remotePort);
}
boolean result = super.doGet(request, response);
if (request.getAttribute(SciTag.SCITAG_ATTRIBUTE) != null) {
((SciTag) request.getAttribute(SciTag.SCITAG_ATTRIBUTE)).writeEnd();
}
return result;
}

public boolean doHead(HttpServletRequest request, HttpServletResponse response)
throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
Expand Down Expand Up @@ -91,6 +89,8 @@
import org.italiangrid.storm.webdav.server.PathResolver;
import org.italiangrid.storm.webdav.server.util.CANLListener;
import org.italiangrid.storm.webdav.tpc.LocalURLService;
import org.italiangrid.storm.webdav.tpc.SciTagPlainConnectionSocketFactory;
import org.italiangrid.storm.webdav.tpc.SciTagSSLConnectionSocketFactory;
import org.italiangrid.storm.webdav.tpc.StaticHostListLocalURLService;
import org.italiangrid.storm.webdav.tpc.TransferConstants;
import org.italiangrid.storm.webdav.tpc.http.SuperLaxRedirectStrategy;
Expand Down Expand Up @@ -263,8 +263,8 @@ HttpClientConnectionManager tpcClientConnectionManager(ThirdPartyCopyProperties
ctx.init(null, new TrustManager[] {tm}, null);
}

ConnectionSocketFactory sf = PlainConnectionSocketFactory.getSocketFactory();
LayeredConnectionSocketFactory tlsSf = new SSLConnectionSocketFactory(ctx);
ConnectionSocketFactory sf = SciTagPlainConnectionSocketFactory.getSocketFactory();
LayeredConnectionSocketFactory tlsSf = new SciTagSSLConnectionSocketFactory(ctx);

Registry<ConnectionSocketFactory> r = RegistryBuilder.<ConnectionSocketFactory>create()
.register(HTTP, sf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.italiangrid.storm.webdav.server.servlet.MiltonFilter;
import org.italiangrid.storm.webdav.server.servlet.MoveRequestSanityChecksFilter;
import org.italiangrid.storm.webdav.server.servlet.SAIndexServlet;
import org.italiangrid.storm.webdav.server.servlet.SciTagFilter;
import org.italiangrid.storm.webdav.server.servlet.StoRMServlet;
import org.italiangrid.storm.webdav.server.servlet.resource.StormResourceService;
import org.italiangrid.storm.webdav.server.tracing.LogbackAccessAuthnInfoFilter;
Expand Down Expand Up @@ -69,11 +70,12 @@ public class ServletConfiguration {
public static final Logger LOG = LoggerFactory.getLogger(ServletConfiguration.class);

static final int REQUEST_ID_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1000;
static final int LOGBACK_ACCESS_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1002;
static final int LOG_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1003;
static final int REDIRECT_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1004;
static final int CHECKSUM_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1005;
static final int MACAROON_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1006;
static final int LOGBACK_ACCESS_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1001;
static final int LOG_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1002;
static final int REDIRECT_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1003;
static final int CHECKSUM_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1004;
static final int MACAROON_REQ_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1005;
static final int SCITAG_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1006;
static final int TPC_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1007;
static final int MOVE_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1008;
static final int DELETE_FILTER_ORDER = DEFAULT_FILTER_ORDER + 1009;
Expand Down Expand Up @@ -151,6 +153,15 @@ FilterRegistrationBean<MacaroonRequestFilter> macaroonRequestFilter(ObjectMapper
return filter;
}

@Bean
@ConditionalOnProperty(name = "storm.scitag.enabled", havingValue = "true")
FilterRegistrationBean<SciTagFilter> scitagFilter() {
LOG.info("SciTag filter enabled");
FilterRegistrationBean<SciTagFilter> filter = new FilterRegistrationBean<>(new SciTagFilter());
filter.setOrder(SCITAG_FILTER_ORDER);
return filter;
}

@Bean
FilterRegistrationBean<MiltonFilter> miltonFilter(FilesystemAccess fsAccess,
ExtendedAttributesHelper attrsHelper, PathResolver resolver, ReplaceContentStrategy rcs) {
Expand Down Expand Up @@ -189,9 +200,9 @@ FilterRegistrationBean<TransferFilter> tpcFilter(Clock clock, FilesystemAccess f

TransferClient metricsClient = new HttpTransferClientMetricsWrapper(registry, client);

FilterRegistrationBean<TransferFilter> tpcFilter = new FilterRegistrationBean<>(
new TransferFilter(clock, metricsClient, resolver, lus, props.isVerifyChecksum(),
props.getEnableExpectContinueThreshold()));
FilterRegistrationBean<TransferFilter> tpcFilter =
new FilterRegistrationBean<>(new TransferFilter(clock, metricsClient, resolver, lus,
props.isVerifyChecksum(), props.getEnableExpectContinueThreshold()));
tpcFilter.addUrlPatterns("/*");
tpcFilter.setOrder(TPC_FILTER_ORDER);
return tpcFilter;
Expand Down
83 changes: 83 additions & 0 deletions src/main/java/org/italiangrid/storm/webdav/tpc/SciTag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.italiangrid.storm.webdav.tpc;

import java.io.IOException;
import java.io.RandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SciTag {

public static final Logger LOG = LoggerFactory.getLogger(SciTag.class);
public static final String SCITAG_ATTRIBUTE = "scitag";

private static final String FLOWD_PIPE_NAME = "/var/run/flowd";
private final boolean remoteAddressIsSource;
private final int experimentId;
private final int activityId;
private String sourceAddress;
private int sourcePort;
private String destinationAddress;
private int destinationPort;

public SciTag(int experimentId, int activityId, boolean remoteAddressIsSource) {
this.experimentId = experimentId;
this.activityId = activityId;
this.remoteAddressIsSource = remoteAddressIsSource;
}

public int experimentId() {
return experimentId;
}

public int activityId() {
return activityId;
}

private String flowdEntry() {
return " tcp " + sourceAddress + " " + sourcePort + " " + destinationAddress + " "
+ destinationPort + " " + this.experimentId() + " " + this.activityId() + "\n";
}

public void writeStart(String localAddress, int localPort, String remoteAddress, int remotePort) {
if (remoteAddressIsSource) {
this.sourceAddress = remoteAddress;
this.sourcePort = remotePort;
this.destinationAddress = localAddress;
this.destinationPort = localPort;
} else {
this.sourceAddress = localAddress;
this.sourcePort = localPort;
this.destinationAddress = remoteAddress;
this.destinationPort = remotePort;
}
try (RandomAccessFile flowdPipe = new RandomAccessFile(FLOWD_PIPE_NAME, "rw")) {
flowdPipe.writeBytes("start" + this.flowdEntry());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}

public void writeEnd() {
try (RandomAccessFile flowdPipe = new RandomAccessFile(FLOWD_PIPE_NAME, "rw")) {
flowdPipe.writeBytes("end" + this.flowdEntry());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}

}
Loading

0 comments on commit 63498c9

Please sign in to comment.