Skip to content

Commit

Permalink
Log improvements for Websocket/GraphQL subscription throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
BiyonFernando committed Dec 4, 2024
1 parent 4792184 commit 8275885
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.apimgt.gateway.dto;

/**
* Interface for defining inbound response error information
*/
public interface InboundProcessorResponseError {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.apimgt.gateway.dto;

/**
* DTO class which holds information on throttle response of websocket frames.
*/
public class WebSocketThrottleResponseDTO implements InboundProcessorResponseError {

private boolean isThrottled;
private String throttledOutReason;
private String apiContext;
private String user;

public WebSocketThrottleResponseDTO() {
}

public WebSocketThrottleResponseDTO(boolean throttled, String throttledOutReason) {
isThrottled = throttled;
this.throttledOutReason = throttledOutReason;
}

public WebSocketThrottleResponseDTO(boolean throttled, String throttledOutReason, String apiContext, String user) {
isThrottled = throttled;
this.throttledOutReason = throttledOutReason;
this.apiContext = apiContext;
this.user = user;
}

public boolean isThrottled() {
return isThrottled;
}

public void setThrottled(boolean throttled) {
isThrottled = throttled;
}

public String getThrottledOutReason() {
return throttledOutReason;
}

public void setThrottledOutReason(String throttledOutReason) {
this.throttledOutReason = throttledOutReason;
}

public String getApiContext() {
return apiContext;
}

public void setApiContext(String apiContext) {
this.apiContext = apiContext;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.analytics.Constants;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketAnalyticsMetricsHandler;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketApiConstants;
Expand Down Expand Up @@ -152,8 +153,12 @@ private void handleSubscribeFrameErrorEvent(ChannelHandlerContext ctx, InboundPr
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP) {
if (log.isDebugEnabled()) {
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. " +
ctx.channel().toString());
WebSocketThrottleResponseDTO throttleResponseDTO =
((WebSocketThrottleResponseDTO) responseDTO.getInboundProcessorResponseError());
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. "
+ ctx.channel().toString() + " API Context: " + throttleResponseDTO.getApiContext()
+ ", " + "User: " + throttleResponseDTO.getUser() + ", Reason: "
+ throttleResponseDTO.getThrottledOutReason());
}
} else if (responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_GENERAL_ERROR
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_INVALID_CREDENTIALS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.wso2.carbon.apimgt.common.gateway.constants.HealthCheckConstants;
import org.wso2.carbon.apimgt.common.gateway.dto.JWTConfigurationDto;
import org.wso2.carbon.apimgt.gateway.APIMgtGatewayConstants;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.analytics.Constants;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityConstants;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityException;
Expand Down Expand Up @@ -312,8 +313,12 @@ private void handlePublishFrameErrorEvent(ChannelHandlerContext ctx, InboundProc
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP) {
if (log.isDebugEnabled()) {
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. " +
ctx.channel().toString());
WebSocketThrottleResponseDTO throttleResponseDTO =
((WebSocketThrottleResponseDTO) responseDTO.getInboundProcessorResponseError());
log.debug(channelId + " -- Websocket API request [inbound] : Inbound WebSocket frame is throttled. "
+ ctx.channel().toString() + " API Context: " + throttleResponseDTO.getApiContext()
+ ", " + "User: " + throttleResponseDTO.getUser() + ", Reason: "
+ throttleResponseDTO.getThrottledOutReason());
}
} else if (responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_GENERAL_ERROR
|| responseDTO.getErrorCode() == WebSocketApiConstants.FrameErrorConstants.API_AUTH_INVALID_CREDENTIALS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.carbon.apimgt.api.APIManagementException;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.inbound.InboundMessageContext;
import org.wso2.carbon.apimgt.gateway.inbound.websocket.InboundProcessorResponseDTO;
import org.wso2.carbon.apimgt.gateway.internal.ServiceReferenceHolder;
Expand Down Expand Up @@ -195,6 +196,22 @@ public static boolean isThrottled(String resourceLevelThrottleKey, String subscr
return (isApiLevelThrottled || isApplicationLevelThrottled || isSubscriptionLevelThrottled);
}

public static WebSocketThrottleResponseDTO getThrottleStatus(String resourceLevelThrottleKey,
String subscriptionLevelThrottleKey,
String applicationLevelThrottleKey) {
// Check each level and record reason if throttling occurs
if (ServiceReferenceHolder.getInstance().getThrottleDataHolder().isAPIThrottled(resourceLevelThrottleKey)) {
return new WebSocketThrottleResponseDTO(true, "Throttled due to resource-level constraints");
} else if (ServiceReferenceHolder.getInstance().getThrottleDataHolder().isThrottled(
subscriptionLevelThrottleKey)) {
return new WebSocketThrottleResponseDTO(true, "Throttled due to subscription-level constraints");
} else if (ServiceReferenceHolder.getInstance().getThrottleDataHolder().isThrottled(
applicationLevelThrottleKey)) {
return new WebSocketThrottleResponseDTO(true, "Throttled due to application-level constraints");
}
return null;
}

public static String getAccessTokenCacheKey(String accessToken, String apiContext, String matchingResource) {
return accessToken + ':' + apiContext + ':' + matchingResource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import org.json.JSONObject;
import org.wso2.carbon.apimgt.common.gateway.constants.GraphQLConstants;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketApiConstants;
import org.wso2.carbon.apimgt.gateway.dto.InboundProcessorResponseError;

/**
* Extended DTO class to hold response information during execution of GraphQL subscription Inbound processors.
*/
public class GraphQLProcessorResponseDTO extends InboundProcessorResponseDTO {

String id; // operation ID
InboundProcessorResponseError inboundProcessorResponseError;

public String getId() {
return id;
Expand All @@ -34,4 +36,12 @@ public String getErrorResponseString() {
jsonObject.put(GraphQLConstants.SubscriptionConstants.PAYLOAD_FIELD_NAME_PAYLOAD, errorPayloads);
return jsonObject.toString();
}

public InboundProcessorResponseError getInboundProcessorResponseError() {
return inboundProcessorResponseError;
}

public void setInboundProcessorResponseError(InboundProcessorResponseError inboundProcessorResponseError) {
this.inboundProcessorResponseError = inboundProcessorResponseError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.apimgt.gateway.inbound.websocket;

import org.wso2.carbon.apimgt.gateway.dto.InboundProcessorResponseError;
/**
* DTO class to hold response information during execution of Inbound processors.
*/
Expand All @@ -26,6 +28,7 @@ public class InboundProcessorResponseDTO {
int errorCode;
String errorMessage;
boolean closeConnection = false; // whether to close the connection if during frame validation
InboundProcessorResponseError inboundProcessorResponseError;

public boolean isError() {
return isError;
Expand Down Expand Up @@ -62,4 +65,12 @@ public void setErrorCode(int errorCode) {
public String getErrorResponseString() {
return "Error code: " + errorCode + " reason: " + errorMessage;
}

public InboundProcessorResponseError getInboundProcessorResponseError() {
return inboundProcessorResponseError;
}

public void setInboundProcessorResponseError(InboundProcessorResponseError inboundProcessorResponseError) {
this.inboundProcessorResponseError = inboundProcessorResponseError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.wso2.carbon.apimgt.common.gateway.graphql.QueryValidator;
import org.wso2.carbon.apimgt.gateway.dto.GraphQLOperationDTO;
import org.wso2.carbon.apimgt.common.gateway.graphql.GraphQLProcessorUtil;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.WebsocketUtil;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityException;
import org.wso2.carbon.apimgt.gateway.handlers.streaming.websocket.WebSocketApiConstants;
Expand Down Expand Up @@ -298,6 +299,10 @@ private GraphQLProcessorResponseDTO validateQueryComplexity(QueryAnalyzer queryA
responseDTO.setErrorCode(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX);
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_COMPLEX_MESSAGE
+ " : " + queryAnalyzerResponseDTO.getErrorList().toString());
WebSocketThrottleResponseDTO throttleResponseDTO = new WebSocketThrottleResponseDTO(true, "Throttled "
+ "due to subscription-level query complexity constraint.", inboundMessageContext.getApiContext()
, inboundMessageContext.getInfoDTO().getSubscriber());
responseDTO.setInboundProcessorResponseError(throttleResponseDTO);
return responseDTO;
}
} catch (ParseException e) {
Expand Down Expand Up @@ -337,8 +342,12 @@ private GraphQLProcessorResponseDTO validateQueryDepth(QueryAnalyzer queryAnalyz
log.error("Query depth validation failed for: " + payload + " errors: " + errorList.toString());
responseDTO.setError(true);
responseDTO.setErrorCode(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP);
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP_MESSAGE
+ " : " + queryAnalyzerResponseDTO.getErrorList().toString());
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.GRAPHQL_QUERY_TOO_DEEP_MESSAGE + " : "
+ queryAnalyzerResponseDTO.getErrorList().toString());
WebSocketThrottleResponseDTO throttleResponseDTO = new WebSocketThrottleResponseDTO(true, "Throttled due "
+ "to subscription-level query depth constraint.", inboundMessageContext.getApiContext()
, inboundMessageContext.getInfoDTO().getSubscriber());
responseDTO.setInboundProcessorResponseError(throttleResponseDTO);
return responseDTO;
}
return responseDTO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.json.JSONObject;
import org.wso2.carbon.apimgt.api.APIManagementException;
import org.wso2.carbon.apimgt.common.gateway.constants.GraphQLConstants;
import org.wso2.carbon.apimgt.gateway.dto.WebSocketThrottleResponseDTO;
import org.wso2.carbon.apimgt.gateway.handlers.DataPublisherUtil;
import org.wso2.carbon.apimgt.gateway.handlers.Utils;
import org.wso2.carbon.apimgt.gateway.handlers.WebsocketUtil;
import org.wso2.carbon.apimgt.gateway.handlers.security.APIKeyValidator;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityConstants;
import org.wso2.carbon.apimgt.gateway.handlers.security.APISecurityException;
import org.wso2.carbon.apimgt.gateway.handlers.security.AuthenticationContext;
import org.wso2.carbon.apimgt.gateway.handlers.security.jwt.JWTValidator;
Expand All @@ -56,7 +56,6 @@

import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -247,12 +246,17 @@ public static InboundProcessorResponseDTO doThrottle(int msgSize, VerbInfoDTO ve
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain(
inboundMessageContext.getTenantDomain(), true);
boolean isThrottled = WebsocketUtil.isThrottled(resourceLevelThrottleKey, subscriptionLevelThrottleKey,
applicationLevelThrottleKey);
if (isThrottled) {
WebSocketThrottleResponseDTO throttleResponseDTO = WebsocketUtil.getThrottleStatus(resourceLevelThrottleKey,
subscriptionLevelThrottleKey,
applicationLevelThrottleKey);
if (throttleResponseDTO != null) {
responseDTO.setError(true);
responseDTO.setErrorCode(WebSocketApiConstants.FrameErrorConstants.THROTTLED_OUT_ERROR);
responseDTO.setErrorMessage(WebSocketApiConstants.FrameErrorConstants.THROTTLED_OUT_ERROR_MESSAGE);

throttleResponseDTO.setUser(authorizedUser);
throttleResponseDTO.setApiContext(inboundMessageContext.getApiContext());
responseDTO.setInboundProcessorResponseError(throttleResponseDTO);
}
} finally {
PrivilegedCarbonContext.endTenantFlow();
Expand Down

0 comments on commit 8275885

Please sign in to comment.