Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAT-860, NAVY-1030: VenicePushJob with s3 #2

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def jacksonVersion = '2.13.3'
def pulsarGroup = 'org.apache.pulsar'
def pulsarVersion = '2.10.3'
def alpnAgentVersion = '2.0.10'
def hadoopVersion = '2.7.2'
def hadoopVersion = '2.10.2'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public class DefaultInputDataInfoProvider implements InputDataInfoProvider {
@Override
public InputDataInfo validateInputAndGetInfo(String inputUri) throws Exception {
long inputModificationTime = getInputLastModificationTime(inputUri);
FileSystem fs = FileSystem.get(new Configuration());
Path srcPath = new Path(inputUri);
FileSystem fs = srcPath.getFileSystem(new Configuration());
FileStatus[] fileStatuses = fs.listStatus(srcPath, PATH_FILTER);

if (fileStatuses == null || fileStatuses.length == 0) {
Expand Down Expand Up @@ -326,8 +326,8 @@ public Schema extractAvroSubSchema(Schema origin, String fieldName) {

@Override
public long getInputLastModificationTime(String inputUri) throws IOException {
FileSystem fs = FileSystem.get(new Configuration());
Path srcPath = new Path(inputUri);
FileSystem fs = srcPath.getFileSystem(new Configuration());
try {
return fs.getFileStatus(srcPath).getModificationTime();
} catch (FileNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ protected void initInputData(JobConf job, VeniceProperties props) throws Excepti
}

try {
fileSystem = FileSystem.get(job);
Path srcPath = new Path(inputDirectory);
fileSystem = srcPath.getFileSystem(job);
fileStatuses = fileSystem.listStatus(srcPath, PATH_FILTER);
} catch (IOException e) {
/** Should not happen as this is already done in driver, unless there has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ public ValidateSchemaAndBuildDictMapperOutputReader(String outputDir, String fil
ValidateSchemaAndBuildDictMapper.class.getSimpleName() + " output fileName should not be empty");

this.outputDir = outputDir;
String filePath = outputDir + "/" + fileName;
Path filePath = new Path(String.format("%s/%s", outputDir, fileName));

LOGGER.info(
"Reading file {} to retrieve info persisted by {}",
filePath,
ValidateSchemaAndBuildDictMapper.class.getSimpleName());
Configuration conf = new Configuration();
fs = FileSystem.get(conf);
fs = filePath.getFileSystem(conf);

try {
inputStream = fs.open(new Path(filePath));
inputStream = fs.open(filePath);
avroDataFileStream =
new DataFileStream(inputStream, new SpecificDatumReader(ValidateSchemaAndBuildDictMapperOutput.class));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ private static void createDirectoryWithPermission(FileSystem fs, Path path, Stri
protected static void setValidateSchemaAndBuildDictionaryOutputDirPath(JobConf job) throws IOException {
// parent directory: Common directory under which all the different push jobs
// create their job specific directories.
FileSystem fs = FileSystem.get(job);
String parentOutputDir = job.get(MAPPER_OUTPUT_DIRECTORY);
Path outputPath = new Path(parentOutputDir);
FileSystem fs = outputPath.getFileSystem(job);
createDirectoryWithPermission(fs, outputPath, "777");

// store+job specific unique directory under parent directory: already derived in VPJ driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public VeniceFileInputRecordReader(InputSplit split, JobConf job) throws IOExcep

protected int getTotalNumberOfFiles(String inputDirectory, JobConf job) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path srcPath = new Path(inputDirectory);
FileSystem fs = srcPath.getFileSystem(conf);
FileStatus[] fileStatuses = fs.listStatus(srcPath, PATH_FILTER);
// Path validity and length validity are already checked for the flow to be here, so not checking again
return fileStatuses.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ public void run() {
pushJobSetting.repushTTLInSeconds = storeSetting.storeRewindTimeInSeconds;
// make the base directory TEMP_DIR_PREFIX with 777 permissions
Path baseSchemaDir = new Path(TEMP_DIR_PREFIX);
FileSystem fs = FileSystem.get(new Configuration());
FileSystem fs = baseSchemaDir.getFileSystem(new Configuration());
if (!fs.exists(baseSchemaDir)) {
fs.mkdirs(baseSchemaDir);
fs.setPermission(baseSchemaDir, new FsPermission("777"));
Expand Down Expand Up @@ -2000,8 +2000,9 @@ protected String getInputURI(VeniceProperties props) throws Exception {
return "";
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String uri = props.getString(INPUT_PATH_PROP);
Path uriPath = new Path(uri);
FileSystem fs = uriPath.getFileSystem(conf);
Path sourcePath = getLatestPathOfInputDirectory(uri, fs);
return sourcePath.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class HDFSRmdSchemaSource implements RmdSchemaSource, AutoCloseable {

public HDFSRmdSchemaSource(final String schemaDir, final String storeName) throws IOException {
Configuration conf = new Configuration();
this.fs = FileSystem.get(conf);
this.schemaDir = new Path(schemaDir);
this.fs = this.schemaDir.getFileSystem(conf);
if (!fs.exists(this.schemaDir)) {
fs.mkdirs(this.schemaDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public static boolean shouldPathBeIgnored(org.apache.hadoop.fs.Path path) throws
public static void cleanUpHDFSPath(String path, boolean recursive) {
Configuration conf = new Configuration();
try {
FileSystem fs = FileSystem.get(conf);
Path p = new Path(path);
FileSystem fs = p.getFileSystem(conf);
fs.delete(p, recursive);
fs.close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public void testShouldPathBeIgnored() throws IOException {
public void testCleanUpHDFSPath() throws IOException {
String path = "/tmp/venice-test/";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// create the path
Path p = new Path(path);
FileSystem fs = p.getFileSystem(conf);
if (!fs.exists(p)) {
fs.mkdirs(p);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public class TestZstdLibrary {

private void runTest(int numOfFiles, int numOfRecordsPerFile, int dictSizeLimitInKB, int dictSampleSizeLimitInMB)
throws Exception {
FileSystem fs = FileSystem.get(new Configuration());
File inputDir = Utils.getTempDataDirectory();
Path srcPath = new Path(inputDir.getAbsolutePath());
FileSystem fs = srcPath.getFileSystem(new Configuration());
try {
for (int i = 0; i < numOfFiles; i++) {
writeSimpleAvroFileWithStringToStringSchema(inputDir, numOfRecordsPerFile, "testInput" + i + ".avro");
Expand All @@ -46,7 +47,6 @@ private void runTest(int numOfFiles, int numOfRecordsPerFile, int dictSizeLimitI

PushJobZstdConfig pushJobZstdConfig = new PushJobZstdConfig(vProps, numOfFiles);

Path srcPath = new Path(inputDir.getAbsolutePath());
FileStatus[] fileStatuses = fs.listStatus(srcPath, PATH_FILTER);
LOGGER.info("Collect maximum of {} Bytes from {} files", pushJobZstdConfig.getMaxBytesPerFile(), numOfFiles);
for (FileStatus fileStatus: fileStatuses) {
Expand Down
6 changes: 3 additions & 3 deletions docker/venice-client/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ RUN apt-get update
RUN apt-get install netcat tree wget python3 -y
RUN mkdir -p "${VENICE_DIR}/bin"
RUN wget -O ${VENICE_DIR}/bin/avro-tools.jar https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.11.2/avro-tools-1.11.2.jar
RUN wget -O ${VENICE_DIR}/bin/hadoop-mapreduce-client-core.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/2.3.0/hadoop-mapreduce-client-core-2.3.0.jar
RUN wget -O ${VENICE_DIR}/bin/hadoop-mapreduce-client-common.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-common/2.3.0/hadoop-mapreduce-client-common-2.3.0.jar
RUN wget -O ${VENICE_DIR}/bin/hadoop-common.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/2.3.0/hadoop-common-2.3.0.jar
RUN wget -O ${VENICE_DIR}/bin/hadoop-mapreduce-client-core.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/2.10.2/hadoop-mapreduce-client-core-2.10.2.jar
RUN wget -O ${VENICE_DIR}/bin/hadoop-mapreduce-client-common.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-common/2.10.2/hadoop-mapreduce-client-common-2.10.2.jar
RUN wget -O ${VENICE_DIR}/bin/hadoop-common.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/2.10.2/hadoop-common-2.10.2.jar
WORKDIR ${VENICE_DIR}

COPY venice-push-job-all.jar bin/venice-push-job-all.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -286,26 +287,26 @@ public void testRunJobByPickingUpLatestFolder() throws Exception {
File inputDir_v2_file = new File(inputDir_v2, "v3.avro"); // Added to ensure lexically greater files do not get
// resolved
inputDir_v2_file.createNewFile();

FileSystem fs = FileSystem.get(new Configuration());
String basePath = "file:" + inputDir.getAbsolutePath();
FileSystem fs = new Path(basePath).getFileSystem(new Configuration());

Assert.assertEquals(
getLatestPathOfInputDirectory("file:" + inputDir.getAbsolutePath() + "/#LATEST", fs).toString(),
getLatestPathOfInputDirectory(basePath + "/#LATEST", fs).toString(),
"file:" + inputDir_v2.getAbsolutePath(),
"VenicePushJob should parse #LATEST to latest directory when it is in the last level in the input path");

Assert.assertEquals(
getLatestPathOfInputDirectory("file:" + inputDir.getAbsolutePath() + "/#LATEST/v1", fs).toString(),
getLatestPathOfInputDirectory(basePath + "/#LATEST/v1", fs).toString(),
"file:" + inputDir_v2_v1.getAbsolutePath(),
"VenicePushJob should parse #LATEST to latest directory when it is only in an intermediate level in the input path");

Assert.assertEquals(
getLatestPathOfInputDirectory("file:" + inputDir.getAbsolutePath() + "/#LATEST/#LATEST", fs).toString(),
getLatestPathOfInputDirectory(basePath + "/#LATEST/#LATEST", fs).toString(),
"file:" + inputDir_v2_v2.getAbsolutePath(),
"VenicePushJob should parse all occurrences of #LATEST to respective latest directories");

Assert.assertEquals(
getLatestPathOfInputDirectory("file:" + inputDir.getAbsolutePath() + "/#LATEST/#LATEST/", fs).toString(),
getLatestPathOfInputDirectory(basePath + "/#LATEST/#LATEST/", fs).toString(),
"file:" + inputDir_v2_v2.getAbsolutePath(),
"VenicePushJob should parse #LATEST to latest directory to respective latest directories");
}
Expand Down
Loading