Skip to content

Commit

Permalink
RANGER-4342: Fix new log file creation when errors/exceptions occur w…
Browse files Browse the repository at this point in the history
…riting audits to HDFS as Json (#277)

* Append to last log file in case of errors/exceptions encountered
* Close streams and reset writers
* Add unit tests
* Fallback to create if append fails or is not supported
---------

Co-authored-by: abhishek-kumar <[email protected]>
  • Loading branch information
kumaab and abhishek-kumar authored Sep 12, 2023
1 parent 0f7af3a commit 4d9648b
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 65 deletions.
12 changes: 12 additions & 0 deletions agents-audit/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -394,5 +394,17 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,12 @@ public void runLogAudit() {
boolean fileSpoolDrain = false;
try {
if (fileSpoolerEnabled && fileSpooler.isPending()) {
int percentUsed = queue.size() * 100
/ getMaxQueueSize();
long lastAttemptDelta = fileSpooler
.getLastAttemptTimeDelta();
int percentUsed = queue.size() * 100 / getMaxQueueSize();
long lastAttemptDelta = fileSpooler.getLastAttemptTimeDelta();

fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime;
// If we should even read from queue?
if (!isDrain() && !fileSpoolDrain
&& percentUsed < fileSpoolDrainThresholdPercent) {
if (!isDrain() && !fileSpoolDrain && percentUsed < fileSpoolDrainThresholdPercent) {
// Since some files are still under progress and it is
// not in drain mode, lets wait and retry
if (nextDispatchDuration > 0) {
Expand All @@ -259,10 +256,8 @@ public void runLogAudit() {

AuditEventBase event = null;

if (!isToSpool && !isDrain() && !fileSpoolDrain
&& nextDispatchDuration > 0) {
event = queue.poll(nextDispatchDuration,
TimeUnit.MILLISECONDS);
if (!isToSpool && !isDrain() && !fileSpoolDrain && nextDispatchDuration > 0) {
event = queue.poll(nextDispatchDuration, TimeUnit.MILLISECONDS);
} else {
// For poll() is non blocking
event = queue.poll();
Expand All @@ -271,15 +266,11 @@ public void runLogAudit() {
if (event != null) {
localBatchBuffer.add(event);
if (getMaxBatchSize() >= localBatchBuffer.size()) {
queue.drainTo(localBatchBuffer, getMaxBatchSize()
- localBatchBuffer.size());
queue.drainTo(localBatchBuffer, getMaxBatchSize() - localBatchBuffer.size());
}
} else {
// poll returned due to timeout, so reseting clock
nextDispatchDuration = lastDispatchTime
- System.currentTimeMillis()
+ getMaxBatchInterval();

nextDispatchDuration = lastDispatchTime - System.currentTimeMillis() + getMaxBatchInterval();
lastDispatchTime = System.currentTimeMillis();
}
} catch (InterruptedException e) {
Expand All @@ -293,29 +284,26 @@ public void runLogAudit() {
if (localBatchBuffer.size() > 0 && isToSpool) {
// Let spool to the file directly
if (isDestActive) {
logger.info("Switching to file spool. Queue=" + getName()
+ ", dest=" + consumer.getName());
logger.info("Switching to file spool. Queue = {}, dest = {}", getName(), consumer.getName());
}
isDestActive = false;
// Just before stashing
lastDispatchTime = System.currentTimeMillis();
fileSpooler.stashLogs(localBatchBuffer);
addStashedCount(localBatchBuffer.size());
localBatchBuffer.clear();
} else if (localBatchBuffer.size() > 0
&& (isDrain()
|| localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
} else if (localBatchBuffer.size() > 0 &&
(isDrain() || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
if (fileSpoolerEnabled && !isDestActive) {
logger.info("Switching to writing to destination. Queue="
+ getName() + ", dest=" + consumer.getName());
logger.info("Switching to writing to the destination. Queue = {}, dest = {}",
getName(), consumer.getName());
}
// Reset time just before sending the logs
lastDispatchTime = System.currentTimeMillis();
boolean ret = consumer.log(localBatchBuffer);
if (!ret) {
if (fileSpoolerEnabled) {
logger.info("Switching to file spool. Queue="
+ getName() + ", dest=" + consumer.getName());
logger.info("Switching to file spool. Queue = {}, dest = {}", getName(), consumer.getName());
// Transient error. Stash and move on
fileSpooler.stashLogs(localBatchBuffer);
isDestActive = false;
Expand All @@ -334,9 +322,8 @@ public void runLogAudit() {

if (isDrain()) {
if (!queue.isEmpty() || localBatchBuffer.size() > 0) {
logger.info("Queue is not empty. Will retry. queue.size)="
+ queue.size() + ", localBatchBuffer.size()="
+ localBatchBuffer.size());
logger.info("Queue is not empty. Will retry. queue.size = {}, localBatchBuffer.size = {}",
queue.size(), localBatchBuffer.size());
} else {
break;
}
Expand All @@ -349,12 +336,10 @@ public void runLogAudit() {
}
}

logger.info("Exiting consumerThread. Queue=" + getName() + ", dest="
+ consumer.getName());
logger.info("Exiting consumerThread. Queue = {}, dest = {}", getName(), consumer.getName());
try {
// Call stop on the consumer
logger.info("Calling to stop consumer. name=" + getName()
+ ", consumer.name=" + consumer.getName());
logger.info("Calling to stop consumer. name = {}, consumer.name = {}", getName(), consumer.getName());

consumer.stop();
if (fileSpoolerEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public abstract class AbstractRangerAuditWriter implements RangerAuditWriter {
public boolean rollOverByDuration = false;
public volatile FSDataOutputStream ostream = null; // output stream wrapped in logWriter
private boolean isHFlushCapableStream = false;
protected boolean reUseLastLogFile = false;

@Override
public void init(Properties props, String propPrefix, String auditProviderName, Map<String,String> auditConfigs) {
Expand Down Expand Up @@ -207,29 +208,25 @@ public void init(Properties props, String propPrefix) {

}

public void closeFileIfNeeded() throws IOException {
public void closeFileIfNeeded() {
if (logger.isDebugEnabled()) {
logger.debug("==> AbstractRangerAuditWriter.closeFileIfNeeded()");
}

if (logWriter == null) {
if (logger.isDebugEnabled()){
logger.debug("Log writer is null, aborting rollover condition check!");
}
return;
}

if ( System.currentTimeMillis() >= nextRollOverTime.getTime() ) {
logger.info("Closing file. Rolling over. name=" + auditProviderName
+ ", fileName=" + currentFileName);
try {
logWriter.flush();
logWriter.close();
} catch (Throwable t) {
logger.error("Error on closing log writter. Exception will be ignored. name="
+ auditProviderName + ", fileName=" + currentFileName);
}

logWriter = null;
ostream = null;
logger.info("Closing file. Rolling over. name = {}, fileName = {}", auditProviderName, currentFileName);
logWriter.flush();
closeWriter();
resetWriter();
currentFileName = null;
reUseLastLogFile = false;

if (!rollOverByDuration) {
try {
Expand All @@ -238,7 +235,8 @@ public void closeFileIfNeeded() throws IOException {
}
nextRollOverTime = rollingTimeUtil.computeNextRollingTime(rolloverPeriod);
} catch ( Exception e) {
logger.warn("Rollover by file.rollover.period failed...will be using the file.rollover.sec for " + fileSystemScheme + " audit file rollover...", e);
logger.warn("Rollover by file.rollover.period failed", e);
logger.warn("Using the file.rollover.sec for {} audit file rollover...", fileSystemScheme);
nextRollOverTime = rollOverByDuration();
}
} else {
Expand All @@ -262,10 +260,25 @@ public PrintWriter createWriter() throws Exception {
}

if (logWriter == null) {
// Create the file to write
logger.info("Creating new log file. auditPath=" + fullPath);
createFileSystemFolders();
ostream = fileSystem.create(auditPath);
boolean appendMode = false;
// if append is supported, reuse last log file
if (reUseLastLogFile && fileSystem.hasPathCapability(auditPath, CommonPathCapabilities.FS_APPEND)) {
logger.info("Appending to last log file. auditPath = {}", fullPath);
try {
ostream = fileSystem.append(auditPath);
appendMode = true;
} catch (Exception e){
logger.error("Failed to append to file {} due to {}", fullPath, e.getMessage());
logger.info("Falling back to create a new log file!");
appendMode = false;
}
}
if (!appendMode) {
// Create the file to write
logger.info("Creating new log file. auditPath = {}", fullPath);
createFileSystemFolders();
ostream = fileSystem.create(auditPath);
}
logWriter = new PrintWriter(ostream);
isHFlushCapableStream = ostream.hasCapability(StreamCapabilities.HFLUSH);
}
Expand All @@ -277,16 +290,39 @@ public PrintWriter createWriter() throws Exception {
return logWriter;
}

/**
* Closes the writer after writing audits
**/
public void closeWriter() {
if (logger.isDebugEnabled()) {
logger.debug("==> AbstractRangerAuditWriter.closeWriter()");
}

if (ostream != null) {
try {
ostream.close();
} catch (IOException e) {
logger.error("Error closing the stream {}", e.getMessage());
}
}
if (logWriter != null)
logWriter.close();

if (logger.isDebugEnabled()) {
logger.debug("<== AbstractRangerAuditWriter.closeWriter()");
}
}

public void resetWriter() {
if (logger.isDebugEnabled()) {
logger.debug("==> AbstractRangerAuditWriter.resetWriter()");
}

logWriter = null;
ostream = null;

if (logger.isDebugEnabled()) {
logger.debug("<== AbstractRangerAuditWriter.closeWriter()");
logger.debug("<== AbstractRangerAuditWriter.resetWriter()");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,10 @@ public void init() {
}

synchronized public boolean logJSON(final Collection<String> events) throws Exception {
boolean ret = false;
PrintWriter out = null;
try {
if (logger.isDebugEnabled()) {
logger.debug("UGI=" + MiscUtil.getUGILoginUser()
+ ". Will write to HDFS file=" + currentFileName);
logger.debug("UGI = {}, will write to HDFS file = {}", MiscUtil.getUGILoginUser(), currentFileName);
}
out = MiscUtil.executePrivilegedAction(new PrivilegedExceptionAction<PrintWriter>() {
@Override
Expand All @@ -108,28 +106,30 @@ public PrintWriter run() throws Exception {
// flush and check the stream for errors
if (out.checkError()) {
// In theory, this count may NOT be accurate as part of the messages may have been successfully written.
// However, in practice, since client does buffering, either all of none would succeed.
out.close();
// However, in practice, since client does buffering, either all or none would succeed.
logger.error("Stream encountered errors while writing audits to HDFS!");
closeWriter();
return ret;
resetWriter();
reUseLastLogFile = true;
return false;
}
} catch (Exception e) {
if (out != null) {
out.close();
}
logger.error("Exception encountered while writing audits to HDFS!", e);
closeWriter();
return ret;
resetWriter();
reUseLastLogFile = true;
return false;
} finally {
ret = true;
if (logger.isDebugEnabled()) {
logger.debug("Flushing HDFS audit. Event Size:" + events.size());
}
if (out != null) {
out.flush();
}
//closeWriter();
}

return ret;
return true;
}

@Override
Expand Down Expand Up @@ -166,8 +166,7 @@ synchronized public PrintWriter getLogFileStream() throws Exception {
// close the file inline with audit logging.
closeFileIfNeeded();
}
// Either there are no open log file or the previous one has been rolled
// over
// Either there are no open log file or the previous one has been rolled over
PrintWriter logWriter = createWriter();
return logWriter;
}
Expand Down
Loading

0 comments on commit 4d9648b

Please sign in to comment.