Skip to content

Commit

Permalink
避免FE日志打印URL打印明文token。
Browse files Browse the repository at this point in the history
Signed-off-by: [email protected]
Signed-off-by: xyllq999 <[email protected]>
  • Loading branch information
xyllq999 committed Nov 4, 2024
1 parent ba168a6 commit 129637c
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 14 deletions.
7 changes: 5 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.TimeoutException;
import com.starrocks.http.WebUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.SemanticException;
Expand Down Expand Up @@ -321,8 +322,10 @@ public static String getResultForUrl(String urlStr, String encodedAuthInfo, int
int readTimeoutMs) {
StringBuilder sb = new StringBuilder();
InputStream stream = null;
String safeUrl = urlStr;
try {
URL url = new URL(urlStr);
safeUrl = WebUtils.sanitizeHttpReqUri(urlStr);
URLConnection conn = url.openConnection();
if (encodedAuthInfo != null) {
conn.setRequestProperty("Authorization", "Basic " + encodedAuthInfo);
Expand All @@ -338,14 +341,14 @@ public static String getResultForUrl(String urlStr, String encodedAuthInfo, int
sb.append(line);
}
} catch (Exception e) {
LOG.warn("failed to get result from url: {}. {}", urlStr, e.getMessage());
LOG.warn("failed to get result from url: {}. {}", safeUrl, e.getMessage());
return null;
} finally {
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
LOG.warn("failed to close stream when get result from url: {}", urlStr, e);
LOG.warn("failed to close stream when get result from url: {}", safeUrl, e);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/http/BaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ protected void writeResponse(BaseRequest request, BaseResponse response, HttpRes
writeCustomHeaders(response, responseObj);
writeCookies(response, responseObj);

boolean keepAlive = HttpUtil.isKeepAlive(request.getRequest());
// Connection can be keep-alive only when
// - The client requests to keep-alive and,
// - The action doesn't close the connection forcibly.
boolean keepAlive = HttpUtil.isKeepAlive(request.getRequest()) && !response.isForceCloseConnection();
if (!keepAlive) {
responseObj.headers().set(HttpHeaderNames.CONNECTION.toString(), HttpHeaderValues.CLOSE.toString());
request.getContext().write(responseObj).addListener(ChannelFutureListener.CLOSE);
} else {
responseObj.headers().set(HttpHeaderNames.CONNECTION.toString(), HttpHeaderValues.KEEP_ALIVE.toString());
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/http/BaseResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class BaseResponse {
protected Map<String, List<String>> customHeaders = Maps.newHashMap();
private Set<Cookie> cookies = Sets.newHashSet();

// whether the connection needs to be closed forcibly.
// Default: no, allow the client to reuse the connection whenever possible.
private boolean forceCloseConnection = false;

public String getContentType() {
return contentType;
}
Expand All @@ -42,6 +46,14 @@ public void setContentType(String contentType) {
this.contentType = contentType;
}

public boolean isForceCloseConnection() {
return forceCloseConnection;
}

public void setForceCloseConnection(boolean closeConnection) {
this.forceCloseConnection = closeConnection;
}

public StringBuilder getContent() {
return content;
}
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/http/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TNetworkAddress;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -95,10 +95,14 @@ public void executeWithoutPassword(BaseRequest request, BaseResponse response) t
public void executeWithoutPasswordInternal(BaseRequest request, BaseResponse response) throws DdlException,
AccessDeniedException {

// A 'Load' request must have 100-continue header
if (!request.getRequest().headers().contains(HttpHeaders.Names.EXPECT)) {
// A 'Load' request must have "Expect: 100-continue" header
if (!HttpUtil.is100ContinueExpected(request.getRequest())) {
// TODO: should respond "HTTP 417 Expectation Failed"
throw new DdlException("There is no 100-continue header");
}
// close the connection forcibly after the request, so the `Expect: 100-Continue` won't
// affect subsequent requests processing.
response.setForceCloseConnection(true);

boolean enableBatchWrite = "true".equalsIgnoreCase(
request.getRequest().headers().get(StreamLoadHttpHeader.HTTP_ENABLE_BATCH_WRITE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@
import com.starrocks.common.Pair;
import com.starrocks.common.StarRocksHttpException;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.http.ActionController;
import com.starrocks.http.BaseAction;
import com.starrocks.http.BaseRequest;
import com.starrocks.http.BaseResponse;
import com.starrocks.http.HttpConnectContext;
import com.starrocks.http.*;

Check failure on line 45 in fe/fe-core/src/main/java/com/starrocks/http/rest/RestBaseAction.java

View workflow job for this annotation

GitHub Actions / FE Code Style Check

[checkstyle] reported by reviewdog 🐶 Using the '.*' form of import should be avoided - com.starrocks.http.*. Raw Output: /github/workspace/./fe/fe-core/src/main/java/com/starrocks/http/rest/RestBaseAction.java:45:26: error: Using the '.*' form of import should be avoided - com.starrocks.http.*. (com.puppycrawl.tools.checkstyle.checks.imports.AvoidStarImportCheck)
import com.starrocks.privilege.AccessDeniedException;
import com.starrocks.privilege.AuthorizationMgr;
import com.starrocks.qe.ConnectContext;
Expand Down Expand Up @@ -92,18 +88,20 @@ public RestBaseAction(ActionController controller) {
@Override
public void handleRequest(BaseRequest request) {
BaseResponse response = new BaseResponse();
String url = request.getRequest().uri();
try {
url = WebUtils.sanitizeHttpReqUri(request.getRequest().uri());
execute(request, response);
} catch (AccessDeniedException accessDeniedException) {
LOG.warn("failed to process url: {}", request.getRequest().uri(), accessDeniedException);
LOG.warn("failed to process url: {}", url, accessDeniedException);
response.updateHeader(HttpHeaderNames.WWW_AUTHENTICATE.toString(), "Basic realm=\"\"");
response.appendContent(new RestBaseResult(getErrorRespWhenUnauthorized(accessDeniedException)).toJson());
writeResponse(request, response, HttpResponseStatus.UNAUTHORIZED);
} catch (DdlException e) {
LOG.warn("fail to process url: {}", request.getRequest().uri(), e);
LOG.warn("fail to process url: {}", url, e);
sendResult(request, response, new RestBaseResult(e.getMessage()));
} catch (Exception e) {
LOG.warn("fail to process url: {}", request.getRequest().uri(), e);
LOG.warn("fail to process url: {}", url, e);
String msg = e.getMessage();
if (msg == null) {
msg = e.toString();
Expand Down
70 changes: 70 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/http/LoadActionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,39 @@
package com.starrocks.http;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Multimap;
import com.starrocks.load.batchwrite.BatchWriteMgr;
import com.starrocks.load.batchwrite.RequestCoordinatorBackendResult;
import com.starrocks.load.batchwrite.TableId;
import com.starrocks.load.streamload.StreamLoadKvParams;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.NodeSelector;
import com.starrocks.thrift.TStatus;
import com.starrocks.thrift.TStatusCode;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import mockit.Mock;
import mockit.MockUp;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClients;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -117,6 +132,15 @@ private Request buildRequest(Map<String, String> headers) {
return builder.build();
}

private HttpPut buildPutRequest(int bodyLength) {
HttpPut put = new HttpPut(String.format("%s/api/%s/%s/_stream_load", BASE_URL, DB_NAME, TABLE_NAME));
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, rootAuth);
StringEntity entity = new StringEntity(Arrays.toString(new byte[bodyLength]), "UTF-8");
put.setEntity(entity);
return put;
}

private String getLoadUrl(String host, int port) {
return String.format("http://%s:%d/api/%s/%s/_stream_load", host, port, DB_NAME, TABLE_NAME);
}
Expand All @@ -127,4 +151,50 @@ private static Map<String, Object> parseResponseBody(Response response) throws I
String bodyStr = body.string();
return objectMapper.readValue(bodyStr, new TypeReference<>() {});
}

@Test
public void testLoadTest100ContinueRespondHTTP307() throws Exception {
new MockUp<NodeSelector>() {
@Mock
public List<Long> seqChooseBackendIds(int backendNum, boolean needAvailable,
boolean isCreate, Multimap<String, String> locReq) {
List<Long> result = new ArrayList<>();
result.add(testBackendId1);
return result;
}
};

RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(3000)
.build();

// reuse the same client
// NOTE: okhttp client will close the connection and create a new connection, so the issue can't be reproduced.
CloseableHttpClient client = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return false;
}
})
.setDefaultRequestConfig(requestConfig)
.build();

int repeat = 3;
for (int i = 0; i < repeat; ++i) {
// NOTE: Just a few bytes, so the next request header is corrupted but not completely available at all.
// otherwise FE will discard bytes from the connection as many as X bytes, and possibly skip the
// next request entirely, so it will be looked like the server never respond at all from client side.
HttpPut put = buildPutRequest(2);
try (CloseableHttpResponse response = client.execute(put)) {
Assert.assertEquals(HttpResponseStatus.TEMPORARY_REDIRECT.code(),
response.getStatusLine().getStatusCode());
// The server indicates that the connection should be closed.
Assert.assertEquals(HttpHeaderValues.CLOSE.toString(),
response.getFirstHeader(HttpHeaderNames.CONNECTION.toString()).getValue());
}
}
client.close();
}
}

0 comments on commit 129637c

Please sign in to comment.