Skip to content

Commit

Permalink
RANGER-4357: Consolidate configs required to enable ORC audit logs. T…
Browse files Browse the repository at this point in the history
…est cases for orc audits. Typo fix for zlib compression. (#279)
  • Loading branch information
fateh288 authored Aug 30, 2023
1 parent 5bc3cb3 commit 5afc852
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void init(Properties props, String propPrefix, String auditProviderName,
this.propPrefix = propPrefix;
this.auditProviderName = auditProviderName;
this.auditConfigs = auditConfigs;
String auditFileType = MiscUtil.getStringProperty(props, propPrefix + ".filetype", AUDIT_FILETYPE_DEFAULT);
String auditFileType = MiscUtil.getStringProperty(props, propPrefix + ".batch.filequeue.filetype", AUDIT_FILETYPE_DEFAULT);
String writerClass = MiscUtil.getStringProperty(props, propPrefix + ".filewriter.impl");

auditWriter = StringUtils.isEmpty(writerClass) ? createWriter(getDefaultWriter(auditFileType)) : createWriter(writerClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ protected CompressionKind getORCCompression(String compression) {
case "lzo":
ret = CompressionKind.LZO;
break;
case "zlip":
case "zlib":
ret = CompressionKind.ZLIB;
break;
case "none":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,34 @@
* under the License.
*/


package org.apache.ranger.audit;

import static org.junit.Assert.*;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.ranger.audit.destination.FileAuditDestination;
import org.apache.ranger.audit.model.AuthzAuditEvent;
import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.AuditProviderFactory;
import org.apache.ranger.audit.provider.BaseAuditHandler;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.audit.provider.MultiDestAuditProvider;
import org.apache.ranger.audit.provider.*;
import org.apache.ranger.audit.queue.AuditAsyncQueue;
import org.apache.ranger.audit.queue.AuditBatchQueue;
import org.apache.ranger.audit.queue.AuditFileSpool;
import org.apache.ranger.audit.queue.AuditQueue;
import org.apache.ranger.audit.queue.AuditSummaryQueue;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -681,6 +686,172 @@ public void testFileDestination() {
}

}
@Test
public void testAuditFileQueueSpoolORC(){
String appType = "test";
int messageToSend = 10;
String spoolFolderName = "target/spool";
String logFolderName = "target/testAuditFileQueueSpoolORC";
try {
FileUtils.deleteDirectory(new File(spoolFolderName));
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
FileUtils.deleteDirectory(new File(logFolderName));
} catch (IOException e) {
throw new RuntimeException(e);
}
assertTrue(Files.notExists(Paths.get(spoolFolderName)));
assertTrue(Files.notExists(Paths.get(logFolderName)));
String subdir = appType + "/" + LocalDate.now().toString().replace("-","");
File logFolder = new File(logFolderName);
File logSubfolder = new File(logFolder, subdir);
String logFileName = "test_ranger_audit.orc";
File logFile = new File(logSubfolder, logFileName);
Properties props = new Properties();
props.put(AuditProviderFactory.AUDIT_IS_ENABLED_PROP, "true");
String hdfsPropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".hdfs";
props.put(hdfsPropPrefix,"enable");
props.put(hdfsPropPrefix+".dir",logFolderName);
props.put(hdfsPropPrefix + "." + FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT,
"%app-type%_ranger_audit.orc");
String orcPrefix = hdfsPropPrefix + ".orc";
props.put(orcPrefix+".compression","none");
props.put(orcPrefix+".buffersize",""+10);
props.put(orcPrefix+".stripesize",""+10);
props.put(hdfsPropPrefix + ".batch.queuetype","filequeue");
String filequeuePrefix = hdfsPropPrefix + ".batch.filequeue";
props.put(filequeuePrefix+".filetype","orc");
String fileSpoolPrefix = filequeuePrefix + ".filespool";
props.put(fileSpoolPrefix+".dir",spoolFolderName);
props.put(fileSpoolPrefix+".buffer.size",""+10);
props.put(fileSpoolPrefix+".file.rollover.sec",""+5);
AuditProviderFactory factory = new AuditProviderFactory();
factory.init(props, appType);
AuditHandler queue = factory.getAuditProvider();
for (int i = 0; i < messageToSend; i++) {
queue.log(createEvent());
}
try {
Thread.sleep(40000);
} catch (InterruptedException e) {
System.out.println(e);
}
queue.waitToComplete();
assertTrue("File created", logFile.exists());
long rowCount = getOrcFileRowCount(logFile.getPath());
assertEquals(messageToSend, rowCount);
}
@Test
public void testAuditFileQueueSpoolORCRollover(){
String appType = "test";
int messageToSend = 100000;
String spoolFolderName = "target/spool";
String logFolderName = "target/testAuditFileQueueSpoolORC";
try {
FileUtils.deleteDirectory(new File(spoolFolderName));
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
FileUtils.deleteDirectory(new File(logFolderName));
} catch (IOException e) {
throw new RuntimeException(e);
}
assertTrue(Files.notExists(Paths.get(spoolFolderName)));
assertTrue(Files.notExists(Paths.get(logFolderName)));
String subdir = appType + "/" + LocalDate.now().toString().replace("-","");
File logFolder = new File(logFolderName);
File logSubfolder = new File(logFolder, subdir);
String logFileName = "test_ranger_audit.orc";
File logFile = new File(logSubfolder, logFileName);
Properties props = new Properties();
props.put(AuditProviderFactory.AUDIT_IS_ENABLED_PROP, "true");
String hdfsPropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".hdfs";
props.put(hdfsPropPrefix,"enable");
props.put(hdfsPropPrefix+".dir",logFolderName);
props.put(hdfsPropPrefix + "." + FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT,
"%app-type%_ranger_audit.orc");
String orcPrefix = hdfsPropPrefix + ".orc";
props.put(orcPrefix+".compression","snappy");
props.put(orcPrefix+".buffersize",""+100000000000000L);
props.put(orcPrefix+".stripesize",""+100000000000000L);
props.put(hdfsPropPrefix + ".batch.queuetype","filequeue");
String filequeuePrefix = hdfsPropPrefix + ".batch.filequeue";
props.put(filequeuePrefix+".filetype","orc");
String fileSpoolPrefix = filequeuePrefix + ".filespool";
props.put(fileSpoolPrefix+".dir",spoolFolderName);
props.put(fileSpoolPrefix+".buffer.size",""+100000000000000L);
props.put(fileSpoolPrefix+".file.rollover.sec",""+300);
AuditProviderFactory factory = new AuditProviderFactory();
factory.init(props, appType);
AuditHandler queue = factory.getAuditProvider();
for (int i = 0; i < messageToSend; i++) {
queue.log(createEvent());
try {
Thread.sleep(5);
} catch (InterruptedException e) {
System.out.println(e);
}
}
try {
Thread.sleep(40000);
} catch (InterruptedException e) {
System.out.println(e);
}
queue.waitToComplete();
assertTrue("File created", logFile.exists());
File[] listOfFiles = logSubfolder.listFiles();
int totalLogsOrc = 0;
if (listOfFiles != null){
for(File f : listOfFiles){
if (f.getName().endsWith(".orc")){
totalLogsOrc += getOrcFileRowCount(f.getPath());
}
}
}
System.out.println("Number of logs in orc="+totalLogsOrc);
long totalLogsArchive = 0;

try {
List<String> convertedLogFiles = getFileNames(spoolFolderName+"/index_AuditFileQueueSpool_hdfs_test_closed.json");
String[] convertedLogFileNames = new String[convertedLogFiles.size()];
for(int i=0;i<convertedLogFiles.size();i++){
String[] pathElements = convertedLogFiles.get(i).split("/");
convertedLogFileNames[i] = spoolFolderName+"/archive/"+pathElements[pathElements.length-1];
}
for(String f: convertedLogFileNames){
totalLogsArchive += getLogCountInFile(f);
}
} catch (IOException | JSONException e) {
throw new RuntimeException(e);
}
System.out.println("Number of logs in archive:"+totalLogsArchive);
assertEquals(totalLogsOrc, totalLogsArchive);

long notYetConvertedToORCLogsCount = 0;
//count logs which have not yet been converted to orc
try{
List<String> spoolFiles = getFileNames(spoolFolderName+"/index_AuditFileQueueSpool_hdfs_test.json");
if (spoolFiles!=null){
for(String f : spoolFiles){
if (f.endsWith(".log")){
try {
notYetConvertedToORCLogsCount += getLogCountInFile(f);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
catch (IOException | JSONException e){
throw new RuntimeException(e);
}
System.out.println("Number of logs not converted to ORC:"+notYetConvertedToORCLogsCount);
assertEquals(messageToSend, notYetConvertedToORCLogsCount+totalLogsArchive);
}

private AuthzAuditEvent createEvent() {
AuthzAuditEvent event = new AuthzAuditEvent();
Expand All @@ -699,4 +870,44 @@ private AuthzAuditEvent createEvent(String user, String accessType,
event.setSeqNum(++seqNum);
return event;
}

private static long getOrcFileRowCount(String filePath) {
try {
Configuration conf = new Configuration();
Path orcFilePath = new Path(filePath);
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf));
long numRows = reader.getNumberOfRows();
return numRows;
} catch (Exception e) {
e.printStackTrace();
}
return -1;
}
private static long getLogCountInFile(String filePath) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
long lines = 0;
while (reader.readLine() != null) {
lines++;
}
reader.close();
return lines;
}

private static List<String> getFileNames(String jsonIndexFile) throws IOException, JSONException {
List<String> fileNames = new ArrayList<>();
BufferedReader reader = new BufferedReader(new FileReader(jsonIndexFile));
while (true) {
String line = reader.readLine();
if (line!=null){
JSONObject jsonObject = new JSONObject(line);
String filePath = (String) jsonObject.get("filePath");
fileNames.add(filePath);
}
else{
break;
}
}
reader.close();
return fileNames;
}
}

0 comments on commit 5afc852

Please sign in to comment.