diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml index 82ff162910..791519b396 100644 --- a/agents-audit/pom.xml +++ b/agents-audit/pom.xml @@ -394,5 +394,17 @@ ${slf4j.version} test + + junit + junit + + + org.testng + testng + + + org.mockito + mockito-core + diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java index afa2879e2e..103f926566 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java @@ -237,15 +237,12 @@ public void runLogAudit() { boolean fileSpoolDrain = false; try { if (fileSpoolerEnabled && fileSpooler.isPending()) { - int percentUsed = queue.size() * 100 - / getMaxQueueSize(); - long lastAttemptDelta = fileSpooler - .getLastAttemptTimeDelta(); + int percentUsed = queue.size() * 100 / getMaxQueueSize(); + long lastAttemptDelta = fileSpooler.getLastAttemptTimeDelta(); fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime; // If we should even read from queue? - if (!isDrain() && !fileSpoolDrain - && percentUsed < fileSpoolDrainThresholdPercent) { + if (!isDrain() && !fileSpoolDrain && percentUsed < fileSpoolDrainThresholdPercent) { // Since some files are still under progress and it is // not in drain mode, lets wait and retry if (nextDispatchDuration > 0) { @@ -259,10 +256,8 @@ public void runLogAudit() { AuditEventBase event = null; - if (!isToSpool && !isDrain() && !fileSpoolDrain - && nextDispatchDuration > 0) { - event = queue.poll(nextDispatchDuration, - TimeUnit.MILLISECONDS); + if (!isToSpool && !isDrain() && !fileSpoolDrain && nextDispatchDuration > 0) { + event = queue.poll(nextDispatchDuration, TimeUnit.MILLISECONDS); } else { // For poll() is non blocking event = queue.poll(); @@ -271,15 +266,11 @@ public void runLogAudit() { if (event != null) { localBatchBuffer.add(event); if (getMaxBatchSize() >= localBatchBuffer.size()) { - queue.drainTo(localBatchBuffer, getMaxBatchSize() - - localBatchBuffer.size()); + queue.drainTo(localBatchBuffer, getMaxBatchSize() - localBatchBuffer.size()); } } else { // poll returned due to timeout, so reseting clock - nextDispatchDuration = lastDispatchTime - - System.currentTimeMillis() - + getMaxBatchInterval(); - + nextDispatchDuration = lastDispatchTime - System.currentTimeMillis() + getMaxBatchInterval(); lastDispatchTime = System.currentTimeMillis(); } } catch (InterruptedException e) { @@ -293,8 +284,7 @@ public void runLogAudit() { if (localBatchBuffer.size() > 0 && isToSpool) { // Let spool to the file directly if (isDestActive) { - logger.info("Switching to file spool. Queue=" + getName() - + ", dest=" + consumer.getName()); + logger.info("Switching to file spool. Queue = {}, dest = {}", getName(), consumer.getName()); } isDestActive = false; // Just before stashing @@ -302,20 +292,18 @@ public void runLogAudit() { fileSpooler.stashLogs(localBatchBuffer); addStashedCount(localBatchBuffer.size()); localBatchBuffer.clear(); - } else if (localBatchBuffer.size() > 0 - && (isDrain() - || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) { + } else if (localBatchBuffer.size() > 0 && + (isDrain() || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) { if (fileSpoolerEnabled && !isDestActive) { - logger.info("Switching to writing to destination. Queue=" - + getName() + ", dest=" + consumer.getName()); + logger.info("Switching to writing to the destination. Queue = {}, dest = {}", + getName(), consumer.getName()); } // Reset time just before sending the logs lastDispatchTime = System.currentTimeMillis(); boolean ret = consumer.log(localBatchBuffer); if (!ret) { if (fileSpoolerEnabled) { - logger.info("Switching to file spool. Queue=" - + getName() + ", dest=" + consumer.getName()); + logger.info("Switching to file spool. Queue = {}, dest = {}", getName(), consumer.getName()); // Transient error. Stash and move on fileSpooler.stashLogs(localBatchBuffer); isDestActive = false; @@ -334,9 +322,8 @@ public void runLogAudit() { if (isDrain()) { if (!queue.isEmpty() || localBatchBuffer.size() > 0) { - logger.info("Queue is not empty. Will retry. queue.size)=" - + queue.size() + ", localBatchBuffer.size()=" - + localBatchBuffer.size()); + logger.info("Queue is not empty. Will retry. queue.size = {}, localBatchBuffer.size = {}", + queue.size(), localBatchBuffer.size()); } else { break; } @@ -349,12 +336,10 @@ public void runLogAudit() { } } - logger.info("Exiting consumerThread. Queue=" + getName() + ", dest=" - + consumer.getName()); + logger.info("Exiting consumerThread. Queue = {}, dest = {}", getName(), consumer.getName()); try { // Call stop on the consumer - logger.info("Calling to stop consumer. name=" + getName() - + ", consumer.name=" + consumer.getName()); + logger.info("Calling to stop consumer. name = {}, consumer.name = {}", getName(), consumer.getName()); consumer.stop(); if (fileSpoolerEnabled) { diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java index 17a7fb91d2..0e74e3bd4b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java @@ -66,6 +66,7 @@ public abstract class AbstractRangerAuditWriter implements RangerAuditWriter { public boolean rollOverByDuration = false; public volatile FSDataOutputStream ostream = null; // output stream wrapped in logWriter private boolean isHFlushCapableStream = false; + protected boolean reUseLastLogFile = false; @Override public void init(Properties props, String propPrefix, String auditProviderName, Map auditConfigs) { @@ -207,29 +208,25 @@ public void init(Properties props, String propPrefix) { } - public void closeFileIfNeeded() throws IOException { + public void closeFileIfNeeded() { if (logger.isDebugEnabled()) { logger.debug("==> AbstractRangerAuditWriter.closeFileIfNeeded()"); } if (logWriter == null) { + if (logger.isDebugEnabled()){ + logger.debug("Log writer is null, aborting rollover condition check!"); + } return; } if ( System.currentTimeMillis() >= nextRollOverTime.getTime() ) { - logger.info("Closing file. Rolling over. name=" + auditProviderName - + ", fileName=" + currentFileName); - try { - logWriter.flush(); - logWriter.close(); - } catch (Throwable t) { - logger.error("Error on closing log writter. Exception will be ignored. name=" - + auditProviderName + ", fileName=" + currentFileName); - } - - logWriter = null; - ostream = null; + logger.info("Closing file. Rolling over. name = {}, fileName = {}", auditProviderName, currentFileName); + logWriter.flush(); + closeWriter(); + resetWriter(); currentFileName = null; + reUseLastLogFile = false; if (!rollOverByDuration) { try { @@ -238,7 +235,8 @@ public void closeFileIfNeeded() throws IOException { } nextRollOverTime = rollingTimeUtil.computeNextRollingTime(rolloverPeriod); } catch ( Exception e) { - logger.warn("Rollover by file.rollover.period failed...will be using the file.rollover.sec for " + fileSystemScheme + " audit file rollover...", e); + logger.warn("Rollover by file.rollover.period failed", e); + logger.warn("Using the file.rollover.sec for {} audit file rollover...", fileSystemScheme); nextRollOverTime = rollOverByDuration(); } } else { @@ -262,10 +260,25 @@ public PrintWriter createWriter() throws Exception { } if (logWriter == null) { - // Create the file to write - logger.info("Creating new log file. auditPath=" + fullPath); - createFileSystemFolders(); - ostream = fileSystem.create(auditPath); + boolean appendMode = false; + // if append is supported, reuse last log file + if (reUseLastLogFile && fileSystem.hasPathCapability(auditPath, CommonPathCapabilities.FS_APPEND)) { + logger.info("Appending to last log file. auditPath = {}", fullPath); + try { + ostream = fileSystem.append(auditPath); + appendMode = true; + } catch (Exception e){ + logger.error("Failed to append to file {} due to {}", fullPath, e.getMessage()); + logger.info("Falling back to create a new log file!"); + appendMode = false; + } + } + if (!appendMode) { + // Create the file to write + logger.info("Creating new log file. auditPath = {}", fullPath); + createFileSystemFolders(); + ostream = fileSystem.create(auditPath); + } logWriter = new PrintWriter(ostream); isHFlushCapableStream = ostream.hasCapability(StreamCapabilities.HFLUSH); } @@ -277,16 +290,39 @@ public PrintWriter createWriter() throws Exception { return logWriter; } + /** + * Closes the writer after writing audits + **/ public void closeWriter() { if (logger.isDebugEnabled()) { logger.debug("==> AbstractRangerAuditWriter.closeWriter()"); } + if (ostream != null) { + try { + ostream.close(); + } catch (IOException e) { + logger.error("Error closing the stream {}", e.getMessage()); + } + } + if (logWriter != null) + logWriter.close(); + + if (logger.isDebugEnabled()) { + logger.debug("<== AbstractRangerAuditWriter.closeWriter()"); + } + } + + public void resetWriter() { + if (logger.isDebugEnabled()) { + logger.debug("==> AbstractRangerAuditWriter.resetWriter()"); + } + logWriter = null; ostream = null; if (logger.isDebugEnabled()) { - logger.debug("<== AbstractRangerAuditWriter.closeWriter()"); + logger.debug("<== AbstractRangerAuditWriter.resetWriter()"); } } diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java index 755b76df7e..f74f0cbd32 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java @@ -88,12 +88,10 @@ public void init() { } synchronized public boolean logJSON(final Collection events) throws Exception { - boolean ret = false; PrintWriter out = null; try { if (logger.isDebugEnabled()) { - logger.debug("UGI=" + MiscUtil.getUGILoginUser() - + ". Will write to HDFS file=" + currentFileName); + logger.debug("UGI = {}, will write to HDFS file = {}", MiscUtil.getUGILoginUser(), currentFileName); } out = MiscUtil.executePrivilegedAction(new PrivilegedExceptionAction() { @Override @@ -108,28 +106,30 @@ public PrintWriter run() throws Exception { // flush and check the stream for errors if (out.checkError()) { // In theory, this count may NOT be accurate as part of the messages may have been successfully written. - // However, in practice, since client does buffering, either all of none would succeed. - out.close(); + // However, in practice, since client does buffering, either all or none would succeed. + logger.error("Stream encountered errors while writing audits to HDFS!"); closeWriter(); - return ret; + resetWriter(); + reUseLastLogFile = true; + return false; } } catch (Exception e) { - if (out != null) { - out.close(); - } + logger.error("Exception encountered while writing audits to HDFS!", e); closeWriter(); - return ret; + resetWriter(); + reUseLastLogFile = true; + return false; } finally { - ret = true; if (logger.isDebugEnabled()) { logger.debug("Flushing HDFS audit. Event Size:" + events.size()); } if (out != null) { out.flush(); } + //closeWriter(); } - return ret; + return true; } @Override @@ -166,8 +166,7 @@ synchronized public PrintWriter getLogFileStream() throws Exception { // close the file inline with audit logging. closeFileIfNeeded(); } - // Either there are no open log file or the previous one has been rolled - // over + // Either there are no open log file or the previous one has been rolled over PrintWriter logWriter = createWriter(); return logWriter; } diff --git a/agents-audit/src/test/java/org/apache/ranger/audit/utils/RangerJSONAuditWriterTest.java b/agents-audit/src/test/java/org/apache/ranger/audit/utils/RangerJSONAuditWriterTest.java new file mode 100644 index 0000000000..3d65790d50 --- /dev/null +++ b/agents-audit/src/test/java/org/apache/ranger/audit/utils/RangerJSONAuditWriterTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.audit.utils; + +import org.apache.hadoop.fs.CommonPathCapabilities; +import org.apache.hadoop.fs.FileSystem; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Properties; +import java.util.Collections; +import java.io.PrintWriter; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RangerJSONAuditWriterTest { + + public Properties props; + public Map auditConfigs; + + public void setup(){ + props = new Properties(); + props.setProperty("test.dir", "/tmp"); + auditConfigs = new HashMap<>(); + auditConfigs.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); + } + + @Test + public void checkReUseFlagInStreamErrors() throws Exception { + + RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter()); + PrintWriter out = mock(PrintWriter.class); + + setup(); + jsonAuditWriter.init(props, "test", "localfs", auditConfigs); + + assertFalse(jsonAuditWriter.reUseLastLogFile); + when(jsonAuditWriter.getLogFileStream()).thenReturn(out); + when(out.checkError()).thenReturn(true); + assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!"))); + assertTrue(jsonAuditWriter.reUseLastLogFile); + + // cleanup + jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); + jsonAuditWriter.logJSON(Collections.singleton("cleaning up!")); + jsonAuditWriter.closeWriter(); + } + + @Test + public void checkAppendtoFileWhenExceptionsOccur() throws Exception { + RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter()); + + setup(); + jsonAuditWriter.init(props, "test", "localfs", auditConfigs); + jsonAuditWriter.createFileSystemFolders(); + // File creation should fail with an exception which will trigger append next time. + when(jsonAuditWriter.fileSystem.create(jsonAuditWriter.auditPath)) + .thenThrow(new IOException("Creation not allowed!")); + jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!")); + jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); + + assertTrue(jsonAuditWriter.reUseLastLogFile); + assertNull(jsonAuditWriter.ostream); + assertNull(jsonAuditWriter.logWriter); + + jsonAuditWriter.fileSystem = mock(FileSystem.class); + when(jsonAuditWriter.fileSystem + .hasPathCapability(jsonAuditWriter.auditPath, CommonPathCapabilities.FS_APPEND)).thenReturn(true); + jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); + // this will lead to an exception since append is called on mocks + jsonAuditWriter.logJSON(Collections.singleton( + "This event should be appended but won't be as appended we use mocks.")); + } + + + @Test + public void checkFileRolloverAfterThreshold() throws Exception { + RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter()); + + setup(); + props.setProperty("test.file.rollover.enable.periodic.rollover", "true"); + props.setProperty("test.file.rollover.periodic.rollover.check.sec", "2"); + // rollover log file after this interval + jsonAuditWriter.fileRolloverSec = 5; // in seconds + jsonAuditWriter.init(props, "test", "localfs", auditConfigs); + + + assertTrue(jsonAuditWriter.logJSON(Collections.singleton("First file created and added this line!"))); + jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // cleanup + Thread.sleep(6000); + + assertFalse(jsonAuditWriter.reUseLastLogFile); + assertNull(jsonAuditWriter.ostream); + assertNull(jsonAuditWriter.logWriter); + + assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Second file created since rollover happened!"))); + jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // cleanup + jsonAuditWriter.closeWriter(); + } +}