diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java index 165b5aff0e..f901484fe9 100644 --- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java +++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java @@ -101,7 +101,6 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; - public abstract class YarnGateway extends AbstractGateway { private static final String HTML_TAG_REGEX = "
(.*)
"; private final String TMP_SQL_EXEC_DIR = @@ -111,8 +110,6 @@ public abstract class YarnGateway extends AbstractGateway { protected YarnClient yarnClient; - private static boolean ENABLE_KERBEROS_AUTH = false; - public YarnGateway() {} public YarnGateway(GatewayConfig config) { @@ -156,13 +153,11 @@ private void initConfig() { if (configuration.containsKey(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key())) { try { - ENABLE_KERBEROS_AUTH = true; SecurityUtils.install(new SecurityConfiguration(configuration)); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); logger.info( "Security authentication completed, user and authentication method:{}", currentUser.toString()); } catch (Exception e) { - ENABLE_KERBEROS_AUTH = false; logger.error(e.getMessage(), e); } } @@ -391,7 +386,7 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul String webUrl; int counts = SystemConfiguration.getInstances().GetJobIdWaitValue(); while (yarnClient.getApplicationReport(clusterClient.getClusterId()).getYarnApplicationState() - == YarnApplicationState.ACCEPTED + == YarnApplicationState.ACCEPTED && counts-- > 0) { Thread.sleep(1000); } @@ -408,21 +403,21 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul // 睡眠1秒,防止flink因为依赖或其他问题导致任务秒挂 Thread.sleep(1000); String url = yarnClient - .getApplicationReport(clusterClient.getClusterId()) - .getTrackingUrl() + .getApplicationReport(clusterClient.getClusterId()) + .getTrackingUrl() + JobsOverviewHeaders.URL.substring(1); - // 访问Flink WebUI 增加Kerberos认证调用HTTP API - // ----------------开始---------------------------- - String json = null; - logger.info("ENABLE_KERBEROS_AUTH:" + ENABLE_KERBEROS_AUTH); - org.apache.http.HttpResponse httpResponse = null; - if (ENABLE_KERBEROS_AUTH) { + String json = HttpUtil.get(url); + + // 增加判断访问Flink WebUI如果认证失败,尝试使用Kerberos认证 + if (HttpUtil.createGet(url).execute().getStatus() == 401) { + logger.info("yarn application api url:" + url); logger.info( - "you are using kerberos authentication, please make sure you have kinit, now start to login"); + "HTTP API return code 401, try to authenticate using the Kerberos get yarn application state."); + org.apache.http.HttpResponse httpResponse = null; String principal = configuration.get(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); String keytab = configuration.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB); - logger.info("认证凭证 principal:" + principal + "||keytab:" + keytab); + logger.info("get principal:" + principal + "||keytab:" + keytab); BufferedReader in = null; try { RequestKerberosUrlUtils restTest = new RequestKerberosUrlUtils(principal, keytab, null, false); @@ -431,26 +426,21 @@ protected String getWebUrl(ClusterClient clusterClient, YarnResul in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); String str = null; while ((str = in.readLine()) != null) { - logger.info("返回Flink Web API结果:" + str); + logger.info("yarn application state api content:" + str); json = str; } if (httpResponse.getStatusLine().getStatusCode() != 200) { - logger.info( - "认证失败:" + httpResponse.getEntity().getContent().toString()); throw new RuntimeException(String.format( "Failed to get job details, please check yarn cluster status. Web URL is: %s the job tracking url is: %s", webUrl, url)); } } catch (Exception e) { - logger.info("认证失败:" + e.getMessage()); + logger.info("Failed to kerberos authentication:" + e.getMessage()); e.printStackTrace(); } logger.info("kerberos authentication login successfully and start to get job details"); - } else { - json = HttpUtil.get(url); } - // 访问Flink WebUI 增加Kerberos认证调用HTTP API - // ----------------结束---------------------------- + try { MultipleJobsDetails jobsDetails = FlinkJsonUtil.toBean(json, JobsOverviewHeaders.getInstance()); jobDetailsList.addAll(jobsDetails.getJobs()); @@ -480,8 +470,8 @@ protected String getYarnContainerLog(ApplicationReport applicationReport) throws // Wait for up to 2.5 s. If the history log is not found yet, a prompt message will be returned. int counts = 5; while (yarnClient - .getContainers(applicationReport.getCurrentApplicationAttemptId()) - .isEmpty() + .getContainers(applicationReport.getCurrentApplicationAttemptId()) + .isEmpty() && counts-- > 0) { ThreadUtil.sleep(500); }