diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java index 5eeaeea122..8860f1c8fd 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditWriterFactory.java @@ -61,7 +61,7 @@ public void init(Properties props, String propPrefix, String auditProviderName, this.propPrefix = propPrefix; this.auditProviderName = auditProviderName; this.auditConfigs = auditConfigs; - String auditFileType = MiscUtil.getStringProperty(props, propPrefix + ".filetype", AUDIT_FILETYPE_DEFAULT); + String auditFileType = MiscUtil.getStringProperty(props, propPrefix + ".batch.filequeue.filetype", AUDIT_FILETYPE_DEFAULT); String writerClass = MiscUtil.getStringProperty(props, propPrefix + ".filewriter.impl"); auditWriter = StringUtils.isEmpty(writerClass) ? createWriter(getDefaultWriter(auditFileType)) : createWriter(writerClass); diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java index 061c324332..62f8794f63 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/utils/ORCFileUtil.java @@ -404,7 +404,7 @@ protected CompressionKind getORCCompression(String compression) { case "lzo": ret = CompressionKind.LZO; break; - case "zlip": + case "zlib": ret = CompressionKind.ZLIB; break; case "none": diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java index c62fdd89ad..3aeb68ecba 100644 --- a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java +++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java @@ -17,29 +17,34 @@ * under the License. */ + package org.apache.ranger.audit; import static org.junit.Assert.*; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.LocalDate; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; import org.apache.ranger.audit.destination.FileAuditDestination; import org.apache.ranger.audit.model.AuthzAuditEvent; -import org.apache.ranger.audit.provider.AuditHandler; -import org.apache.ranger.audit.provider.AuditProviderFactory; -import org.apache.ranger.audit.provider.BaseAuditHandler; -import org.apache.ranger.audit.provider.MiscUtil; -import org.apache.ranger.audit.provider.MultiDestAuditProvider; +import org.apache.ranger.audit.provider.*; import org.apache.ranger.audit.queue.AuditAsyncQueue; import org.apache.ranger.audit.queue.AuditBatchQueue; import org.apache.ranger.audit.queue.AuditFileSpool; import org.apache.ranger.audit.queue.AuditQueue; import org.apache.ranger.audit.queue.AuditSummaryQueue; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -681,6 +686,172 @@ public void testFileDestination() { } } + @Test + public void testAuditFileQueueSpoolORC(){ + String appType = "test"; + int messageToSend = 10; + String spoolFolderName = "target/spool"; + String logFolderName = "target/testAuditFileQueueSpoolORC"; + try { + FileUtils.deleteDirectory(new File(spoolFolderName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + FileUtils.deleteDirectory(new File(logFolderName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + assertTrue(Files.notExists(Paths.get(spoolFolderName))); + assertTrue(Files.notExists(Paths.get(logFolderName))); + String subdir = appType + "/" + LocalDate.now().toString().replace("-",""); + File logFolder = new File(logFolderName); + File logSubfolder = new File(logFolder, subdir); + String logFileName = "test_ranger_audit.orc"; + File logFile = new File(logSubfolder, logFileName); + Properties props = new Properties(); + props.put(AuditProviderFactory.AUDIT_IS_ENABLED_PROP, "true"); + String hdfsPropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".hdfs"; + props.put(hdfsPropPrefix,"enable"); + props.put(hdfsPropPrefix+".dir",logFolderName); + props.put(hdfsPropPrefix + "." + FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT, + "%app-type%_ranger_audit.orc"); + String orcPrefix = hdfsPropPrefix + ".orc"; + props.put(orcPrefix+".compression","none"); + props.put(orcPrefix+".buffersize",""+10); + props.put(orcPrefix+".stripesize",""+10); + props.put(hdfsPropPrefix + ".batch.queuetype","filequeue"); + String filequeuePrefix = hdfsPropPrefix + ".batch.filequeue"; + props.put(filequeuePrefix+".filetype","orc"); + String fileSpoolPrefix = filequeuePrefix + ".filespool"; + props.put(fileSpoolPrefix+".dir",spoolFolderName); + props.put(fileSpoolPrefix+".buffer.size",""+10); + props.put(fileSpoolPrefix+".file.rollover.sec",""+5); + AuditProviderFactory factory = new AuditProviderFactory(); + factory.init(props, appType); + AuditHandler queue = factory.getAuditProvider(); + for (int i = 0; i < messageToSend; i++) { + queue.log(createEvent()); + } + try { + Thread.sleep(40000); + } catch (InterruptedException e) { + System.out.println(e); + } + queue.waitToComplete(); + assertTrue("File created", logFile.exists()); + long rowCount = getOrcFileRowCount(logFile.getPath()); + assertEquals(messageToSend, rowCount); + } + @Test + public void testAuditFileQueueSpoolORCRollover(){ + String appType = "test"; + int messageToSend = 100000; + String spoolFolderName = "target/spool"; + String logFolderName = "target/testAuditFileQueueSpoolORC"; + try { + FileUtils.deleteDirectory(new File(spoolFolderName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + try { + FileUtils.deleteDirectory(new File(logFolderName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + assertTrue(Files.notExists(Paths.get(spoolFolderName))); + assertTrue(Files.notExists(Paths.get(logFolderName))); + String subdir = appType + "/" + LocalDate.now().toString().replace("-",""); + File logFolder = new File(logFolderName); + File logSubfolder = new File(logFolder, subdir); + String logFileName = "test_ranger_audit.orc"; + File logFile = new File(logSubfolder, logFileName); + Properties props = new Properties(); + props.put(AuditProviderFactory.AUDIT_IS_ENABLED_PROP, "true"); + String hdfsPropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".hdfs"; + props.put(hdfsPropPrefix,"enable"); + props.put(hdfsPropPrefix+".dir",logFolderName); + props.put(hdfsPropPrefix + "." + FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT, + "%app-type%_ranger_audit.orc"); + String orcPrefix = hdfsPropPrefix + ".orc"; + props.put(orcPrefix+".compression","snappy"); + props.put(orcPrefix+".buffersize",""+100000000000000L); + props.put(orcPrefix+".stripesize",""+100000000000000L); + props.put(hdfsPropPrefix + ".batch.queuetype","filequeue"); + String filequeuePrefix = hdfsPropPrefix + ".batch.filequeue"; + props.put(filequeuePrefix+".filetype","orc"); + String fileSpoolPrefix = filequeuePrefix + ".filespool"; + props.put(fileSpoolPrefix+".dir",spoolFolderName); + props.put(fileSpoolPrefix+".buffer.size",""+100000000000000L); + props.put(fileSpoolPrefix+".file.rollover.sec",""+300); + AuditProviderFactory factory = new AuditProviderFactory(); + factory.init(props, appType); + AuditHandler queue = factory.getAuditProvider(); + for (int i = 0; i < messageToSend; i++) { + queue.log(createEvent()); + try { + Thread.sleep(5); + } catch (InterruptedException e) { + System.out.println(e); + } + } + try { + Thread.sleep(40000); + } catch (InterruptedException e) { + System.out.println(e); + } + queue.waitToComplete(); + assertTrue("File created", logFile.exists()); + File[] listOfFiles = logSubfolder.listFiles(); + int totalLogsOrc = 0; + if (listOfFiles != null){ + for(File f : listOfFiles){ + if (f.getName().endsWith(".orc")){ + totalLogsOrc += getOrcFileRowCount(f.getPath()); + } + } + } + System.out.println("Number of logs in orc="+totalLogsOrc); + long totalLogsArchive = 0; + + try { + List convertedLogFiles = getFileNames(spoolFolderName+"/index_AuditFileQueueSpool_hdfs_test_closed.json"); + String[] convertedLogFileNames = new String[convertedLogFiles.size()]; + for(int i=0;i spoolFiles = getFileNames(spoolFolderName+"/index_AuditFileQueueSpool_hdfs_test.json"); + if (spoolFiles!=null){ + for(String f : spoolFiles){ + if (f.endsWith(".log")){ + try { + notYetConvertedToORCLogsCount += getLogCountInFile(f); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + } + catch (IOException | JSONException e){ + throw new RuntimeException(e); + } + System.out.println("Number of logs not converted to ORC:"+notYetConvertedToORCLogsCount); + assertEquals(messageToSend, notYetConvertedToORCLogsCount+totalLogsArchive); + } private AuthzAuditEvent createEvent() { AuthzAuditEvent event = new AuthzAuditEvent(); @@ -699,4 +870,44 @@ private AuthzAuditEvent createEvent(String user, String accessType, event.setSeqNum(++seqNum); return event; } + + private static long getOrcFileRowCount(String filePath) { + try { + Configuration conf = new Configuration(); + Path orcFilePath = new Path(filePath); + Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + long numRows = reader.getNumberOfRows(); + return numRows; + } catch (Exception e) { + e.printStackTrace(); + } + return -1; + } + private static long getLogCountInFile(String filePath) throws IOException { + BufferedReader reader = new BufferedReader(new FileReader(filePath)); + long lines = 0; + while (reader.readLine() != null) { + lines++; + } + reader.close(); + return lines; + } + + private static List getFileNames(String jsonIndexFile) throws IOException, JSONException { + List fileNames = new ArrayList<>(); + BufferedReader reader = new BufferedReader(new FileReader(jsonIndexFile)); + while (true) { + String line = reader.readLine(); + if (line!=null){ + JSONObject jsonObject = new JSONObject(line); + String filePath = (String) jsonObject.get("filePath"); + fileNames.add(filePath); + } + else{ + break; + } + } + reader.close(); + return fileNames; + } }