Skip to content

Commit

Permalink
Add SciTags support in HTTP-TPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Bassi committed Apr 11, 2024
1 parent bb5aa0a commit 1f00729
Show file tree
Hide file tree
Showing 22 changed files with 340 additions and 40 deletions.
13 changes: 13 additions & 0 deletions doc/tpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ STORM_WEBDAV_AUTHZ_SERVER_MAX_TOKEN_LIFETIME_SEC="43200"
STORM_WEBDAV_REQUIRE_CLIENT_CERT="false"
```
For other configuration options, see the /etc/sysconfig/storm-webdav file.

## SciTags

StoRM WebDAV supports the `SciTag` header.
To correctly mark the network packets and/or network flows, you need to install [flowd](https://github.com/scitags/flowd) and configure it to use the `np_api` plugin.

Example flowd configuration (`/etc/flowd/flowd-tags.cfg`):

```
PLUGIN='np_api'
BACKEND='udp_firefly'
FLOW_MAP_API='https://www.scitags.org/api.json'
```
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
import org.italiangrid.storm.webdav.server.PathResolver;
import org.italiangrid.storm.webdav.server.util.CANLListener;
import org.italiangrid.storm.webdav.tpc.LocalURLService;
import org.italiangrid.storm.webdav.tpc.SciTagPlainConnectionSocketFactory;
import org.italiangrid.storm.webdav.tpc.SciTagSSLConnectionSocketFactory;
import org.italiangrid.storm.webdav.tpc.StaticHostListLocalURLService;
import org.italiangrid.storm.webdav.tpc.TransferConstants;
import org.italiangrid.storm.webdav.tpc.http.SuperLaxRedirectStrategy;
Expand Down Expand Up @@ -263,8 +265,8 @@ HttpClientConnectionManager tpcClientConnectionManager(ThirdPartyCopyProperties
ctx.init(null, new TrustManager[] {tm}, null);
}

ConnectionSocketFactory sf = PlainConnectionSocketFactory.getSocketFactory();
LayeredConnectionSocketFactory tlsSf = new SSLConnectionSocketFactory(ctx);
ConnectionSocketFactory sf = SciTagPlainConnectionSocketFactory.getSocketFactory();
LayeredConnectionSocketFactory tlsSf = new SciTagSSLConnectionSocketFactory(ctx);

Registry<ConnectionSocketFactory> r = RegistryBuilder.<ConnectionSocketFactory>create()
.register(HTTP, sf)
Expand Down
75 changes: 75 additions & 0 deletions src/main/java/org/italiangrid/storm/webdav/tpc/SciTag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.italiangrid.storm.webdav.tpc;

import java.io.IOException;
import java.io.RandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SciTag {

public static final Logger LOG = LoggerFactory.getLogger(SciTag.class);

private final String flowdPipeName = "/var/run/flowd";
private final int experimentId;
private final int activityId;
private String localAddress;
private int localPort;
private String remoteAddress;
private int remotePort;

public SciTag(int experimentId, int activityId) {
this.experimentId = experimentId;
this.activityId = activityId;
}

public int activityId() {
return activityId;
}

public int experimentId() {
return experimentId;
}

private String flowdEntry() {
return " tcp " + localAddress + " " + localPort + " " + remoteAddress + " " + remotePort + " "
+ this.experimentId() + " " + this.activityId() + "\n";
}

public void writeStart(String localAddress, int localPort, String remoteAddress, int remotePort) {
this.localAddress = localAddress;
this.localPort = localPort;
this.remoteAddress = remoteAddress;
this.remotePort = remotePort;
try {
RandomAccessFile flowdPipe = new RandomAccessFile(flowdPipeName, "rw");
flowdPipe.writeBytes("start" + this.flowdEntry());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}

public void writeEnd() {
try {
RandomAccessFile flowdPipe = new RandomAccessFile(flowdPipeName, "rw");
flowdPipe.writeBytes("end" + this.flowdEntry());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.italiangrid.storm.webdav.tpc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.http.HttpHost;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.protocol.HttpContext;
import org.italiangrid.storm.webdav.tpc.SciTag;

public class SciTagPlainConnectionSocketFactory extends PlainConnectionSocketFactory {

public SciTagPlainConnectionSocketFactory() {
super();
}

@Override
public Socket connectSocket(int connectTimeout, Socket socket, HttpHost host,
InetSocketAddress remoteAddress, InetSocketAddress localAddress, HttpContext context)
throws IOException {
Socket s =
super.connectSocket(connectTimeout, socket, host, remoteAddress, localAddress, context);
SciTag scitag = (SciTag) context.getAttribute("scitag");
if (scitag != null) {
String localIPAddress = s.getLocalAddress().getHostAddress();
int localPort = s.getLocalPort();
String remoteIPAddress = s.getInetAddress().getHostAddress();
int remotePort = s.getPort();
scitag.writeStart(localIPAddress, localPort, remoteIPAddress, remotePort);
}
return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright (c) Istituto Nazionale di Fisica Nucleare, 2014-2023.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.italiangrid.storm.webdav.tpc;

import java.io.IOException;
import java.net.Socket;
import javax.net.ssl.SSLContext;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.protocol.HttpContext;
import org.italiangrid.storm.webdav.tpc.SciTag;

public class SciTagSSLConnectionSocketFactory extends SSLConnectionSocketFactory {

public SciTagSSLConnectionSocketFactory(SSLContext sslContext) {
super(sslContext);
}

@Override
public Socket createLayeredSocket(Socket socket, String target, int port, HttpContext context)
throws IOException {
Socket s = super.createLayeredSocket(socket, target, port, context);
SciTag scitag = (SciTag) context.getAttribute("scitag");
if (scitag != null) {
String localIPAddress = s.getLocalAddress().getHostAddress();
int localPort = s.getLocalPort();
String remoteIPAddress = s.getInetAddress().getHostAddress();
int remotePort = s.getPort();
scitag.writeStart(localIPAddress, localPort, remoteIPAddress, remotePort);
}
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public interface TransferConstants {
String DESTINATION_HEADER = "Destination";
String OVERWRITE_HEADER = "Overwrite";
String REQUIRE_CHECKSUM_HEADER = "RequireChecksumVerification";
String SCITAG_HEADER = "SciTag";
String CREDENTIAL_HEADER = "Credential";

String CREDENTIAL_HEADER_NONE_VALUE = "none";
Expand Down
32 changes: 26 additions & 6 deletions src/main/java/org/italiangrid/storm/webdav/tpc/TransferFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,29 @@ protected void handleTpc(HttpServletRequest request, HttpServletResponse respons
try {

if (validRequest(request, response)) {
SciTag scitag = null;
if (request.getHeader(SCITAG_HEADER) != null) {
// state prot src_ip src_port dst_ip dst_port exp act
// If the active party receives an HTTP-TPC COPY request with a SciTag request header with
// a valid value then the server SHOULD mark the resulting network traffic with the
// experiment ID and activity ID encoded in the value.
int scitagValue = Integer.parseInt(request.getHeader(SCITAG_HEADER));
// Valid value is a single positive integer > 64 and <65536 (16bit). Any other value is
// considered invalid.
if (scitagValue > 64 && scitagValue < 65536) {
scitag = new SciTag(scitagValue >> 6, scitagValue & ((1 << 6) - 1));
} else {
// If the active party receives an HTTP-TPC COPY request with a SciTag request header
// with an invalid value then the server SHOULD mark the resulting network traffic with
// the 0 as the experiment ID and the activity ID.
scitag = new SciTag(0, 0);
}
}
Optional<String> source = Optional.ofNullable(request.getHeader(SOURCE_HEADER));
if (source.isPresent()) {
handlePullCopy(request, response);
handlePullCopy(request, response, scitag);
} else {
handlePushCopy(request, response);
handlePushCopy(request, response, scitag);
}
}

Expand Down Expand Up @@ -210,8 +228,8 @@ protected void logTransferException(TransferRequest request, Exception e) {
}
}

protected void handlePullCopy(HttpServletRequest request, HttpServletResponse response)
throws IOException {
protected void handlePullCopy(HttpServletRequest request, HttpServletResponse response,
SciTag scitag) throws IOException {

URI uri = URI.create(request.getHeader(SOURCE_HEADER));
String path = getScopedPathInfo(request);
Expand All @@ -221,6 +239,7 @@ protected void handlePullCopy(HttpServletRequest request, HttpServletResponse re
.uri(uri)
.path(path)
.headers(getTransferHeaders(request, response))
.scitag(scitag)
.verifyChecksum(verifyChecksum && verifyChecksumRequested(request))
.overwrite(overwriteRequested(request))
.build();
Expand Down Expand Up @@ -249,8 +268,8 @@ protected void handlePullCopy(HttpServletRequest request, HttpServletResponse re
}
}

protected void handlePushCopy(HttpServletRequest request, HttpServletResponse response)
throws IOException {
protected void handlePushCopy(HttpServletRequest request, HttpServletResponse response,
SciTag scitag) throws IOException {
URI uri = URI.create(request.getHeader(DESTINATION_HEADER));
String path = getScopedPathInfo(request);

Expand All @@ -259,6 +278,7 @@ protected void handlePushCopy(HttpServletRequest request, HttpServletResponse re
.uri(uri)
.path(path)
.headers(getTransferHeaders(request, response))
.scitag(scitag)
.verifyChecksum(verifyChecksum && verifyChecksumRequested(request))
.overwrite(overwriteRequested(request))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,25 @@ protected Multimap<String, String> getTransferHeaders(HttpServletRequest request
LOG.warn("Ignoring invalid transfer header {}", headerName);
continue;
}
if (xferHeaderName.trim().equalsIgnoreCase(SCITAG_HEADER)
&& request.getHeader(SCITAG_HEADER) != null) {
// If the active party receives an HTTP-TPC COPY request with both a SciTag request header
// and a TransferHeaderSciTag request header then it SHOULD ignore the
// TransferHeaderSciTag and continue to process the request.
LOG.warn("Ignoring TransferHeaderSciTag header because SciTag header is present");
continue;
}
xferHeaders.put(xferHeaderName.trim(), request.getHeader(headerName));
}
}

// If the active party receives an HTTP-TPC COPY request with a SciTag request header then the
// active party SHOULD include a SciTag request header (with the same value as in the COPY
// request) in all related HTTP requests.
if (request.getHeader(SCITAG_HEADER) != null) {
xferHeaders.put(SCITAG_HEADER, request.getHeader(SCITAG_HEADER));
}

if (isPushTpc(request, localURLService) && request.getContentLength() >= enableExpectContinueThreshold) {
xferHeaders.put(org.apache.http.protocol.HTTP.EXPECT_DIRECTIVE,
org.apache.http.protocol.HTTP.EXPECT_CONTINUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -39,11 +40,13 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.italiangrid.storm.webdav.config.ServiceConfigurationProperties;
import org.italiangrid.storm.webdav.config.ThirdPartyCopyProperties;
import org.italiangrid.storm.webdav.fs.attrs.ExtendedAttributesHelper;
import org.italiangrid.storm.webdav.server.PathResolver;
import org.italiangrid.storm.webdav.tpc.SciTag;
import org.italiangrid.storm.webdav.tpc.transfer.GetTransferRequest;
import org.italiangrid.storm.webdav.tpc.transfer.PutTransferRequest;
import org.italiangrid.storm.webdav.tpc.transfer.TransferClient;
Expand Down Expand Up @@ -165,15 +168,17 @@ public void handle(GetTransferRequest request, TransferStatusCallback cb) {

StormCountingOutputStream os = prepareOutputStream(resolver.resolvePath(request.path()));
HttpGet get = prepareRequest(request);
HttpClientContext context = HttpClientContext.create();

ScheduledFuture<?> reportTask = executorService.scheduleAtFixedRate(() -> {
reportStatus(cb, request, statusBuilder.inProgress(os.getCount()));
}, reportDelaySec, reportDelaySec, TimeUnit.SECONDS);

try {

context.setAttribute("scitag", request.scitag());
httpClient.execute(get, new GetResponseHandler(request, os, attributesHelper,
MDC.getCopyOfContextMap(), socketBufferSize, true));
MDC.getCopyOfContextMap(), socketBufferSize, true), context);

reportTask.cancel(true);
reportStatus(cb, request, statusBuilder.done(os.getCount()));
Expand All @@ -196,6 +201,9 @@ public void handle(GetTransferRequest request, TransferStatusCallback cb) {
if (!reportTask.isCancelled()) {
reportTask.cancel(true);
}
if (context.getAttribute("scitag") != null) {
((SciTag) context.getAttribute("scitag")).writeEnd();
}
}
}

Expand All @@ -221,6 +229,7 @@ public void handle(PutTransferRequest request, TransferStatusCallback cb) {
CountingFileEntity cfe = prepareFileEntity(resolver.resolvePath(request.path()));

HttpPut put = null;
HttpClientContext context = HttpClientContext.create();

try {
put = prepareRequest(request, cfe);
Expand All @@ -236,7 +245,8 @@ public void handle(PutTransferRequest request, TransferStatusCallback cb) {

try {
checkOverwrite(request);
httpClient.execute(put, new PutResponseHandler(MDC.getCopyOfContextMap()));
context.setAttribute("scitag", request.scitag());
httpClient.execute(put, new PutResponseHandler(MDC.getCopyOfContextMap()), context);
reportTask.cancel(true);
reportStatus(cb, request, statusBuilder.done(cfe.getCount()));
} catch (HttpResponseException e) {
Expand All @@ -255,6 +265,9 @@ public void handle(PutTransferRequest request, TransferStatusCallback cb) {
if (!reportTask.isCancelled()) {
reportTask.cancel(true);
}
if (context.getAttribute("scitag") != null) {
((SciTag) context.getAttribute("scitag")).writeEnd();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class GetTransferRequestBuilder extends RequestBuilder<GetTransferRequest


public GetTransferRequest build() {
return new GetTransferRequestImpl(uuid, path, uri, headers, verifyChecksum, overwrite);
return new GetTransferRequestImpl(uuid, path, uri, headers, scitag, verifyChecksum, overwrite);

}

Expand Down
Loading

0 comments on commit 1f00729

Please sign in to comment.