Skip to content

Commit

Permalink
add compression method gz
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickbr committed Nov 7, 2024
1 parent a695ed4 commit 2b1b2a2
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 25 deletions.
8 changes: 7 additions & 1 deletion include/osm2rdf/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ enum GeoTriplesMode {
full = 1
};

enum CompressFormat {
NONE = 0,
BZ2 = 1,
GZ = 2,
};

enum SourceDataset {
OSM = 0,
OHM = 1
Expand Down Expand Up @@ -94,7 +100,7 @@ struct Config {
std::string outputFormat = "qlever";
osm2rdf::util::OutputMergeMode mergeOutput =
osm2rdf::util::OutputMergeMode::CONCATENATE;
bool outputCompress = true;
CompressFormat outputCompress = BZ2;
bool outputKeepFiles = false;

// osmium location cache
Expand Down
11 changes: 6 additions & 5 deletions include/osm2rdf/config/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
namespace osm2rdf::config::constants {

const static inline std::string BZIP2_EXTENSION = ".bz2";
const static inline std::string GZ_EXTENSION = ".gz";
const static inline std::string STATS_EXTENSION = ".stats";
const static inline std::string CONTAINS_STATS_EXTENSION = ".contains-stats";
const static inline std::string JSON_EXTENSION = ".json";
Expand Down Expand Up @@ -75,11 +76,11 @@ const static inline std::string OUTPUT_KEEP_FILES_OPTION_HELP =
const static inline std::string OUTPUT_KEEP_FILES_OPTION_INFO =
"Keeping temporary output files";

const static inline std::string OUTPUT_NO_COMPRESS_OPTION_SHORT = "";
const static inline std::string OUTPUT_NO_COMPRESS_OPTION_LONG =
"output-no-compress";
const static inline std::string OUTPUT_NO_COMPRESS_OPTION_HELP =
"Do not compress output";
const static inline std::string OUTPUT_COMPRESS_OPTION_SHORT = "";
const static inline std::string OUTPUT_COMPRESS_OPTION_LONG =
"output-compression";
const static inline std::string OUTPUT_COMPRESS_OPTION_HELP =
"Output file compression, valid values: none, bz2, gz2";

const static inline std::string STORE_LOCATIONS_INFO =
"Storing locations osmium locations:";
Expand Down
2 changes: 2 additions & 0 deletions include/osm2rdf/util/Output.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define OSM2RDF_UTIL_OUTPUT_H

#include <bzlib.h>
#include <zlib.h>
#include <fstream>
#include <vector>

Expand Down Expand Up @@ -79,6 +80,7 @@ class Output {

std::vector<FILE*> _rawFiles;
std::vector<BZFILE*> _files;
std::vector<gzFile> _gzFiles;
std::vector<size_t> _outBufPos;

std::vector<size_t> _lines;
Expand Down
20 changes: 13 additions & 7 deletions src/config/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,11 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
osm2rdf::config::constants::OUTPUT_KEEP_FILES_OPTION_SHORT,
osm2rdf::config::constants::OUTPUT_KEEP_FILES_OPTION_LONG,
osm2rdf::config::constants::OUTPUT_KEEP_FILES_OPTION_HELP);
auto outputNoCompressOp = parser.add<popl::Switch, popl::Attribute::advanced>(
osm2rdf::config::constants::OUTPUT_NO_COMPRESS_OPTION_SHORT,
osm2rdf::config::constants::OUTPUT_NO_COMPRESS_OPTION_LONG,
osm2rdf::config::constants::OUTPUT_NO_COMPRESS_OPTION_HELP);
auto outputCompressOp =
parser.add<popl::Value<std::string>, popl::Attribute::advanced>(
osm2rdf::config::constants::OUTPUT_COMPRESS_OPTION_SHORT,
osm2rdf::config::constants::OUTPUT_COMPRESS_OPTION_LONG,
osm2rdf::config::constants::OUTPUT_COMPRESS_OPTION_HELP, outputFormat);
auto cacheOp = parser.add<popl::Value<std::string>>(
osm2rdf::config::constants::CACHE_OPTION_SHORT,
osm2rdf::config::constants::CACHE_OPTION_LONG,
Expand Down Expand Up @@ -473,10 +474,10 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
// Output
output = outputOp->value();
outputFormat = outputFormatOp->value();
outputCompress = !outputNoCompressOp->is_set();
outputCompress = outputCompressOp->value() == "none" ? NONE : (outputCompressOp->value() == "gz" ? GZ : BZ2);
outputKeepFiles = outputKeepFilesOp->is_set();
if (output.empty()) {
outputCompress = false;
outputCompress = NONE;
mergeOutput = util::OutputMergeMode::NONE;
}

Expand All @@ -486,11 +487,16 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
rdfStatisticsPath += osm2rdf::config::constants::JSON_EXTENSION;

// Mark compressed output
if (outputCompress && !output.empty() &&
if (outputCompress == BZ2 && !output.empty() &&
output.extension() != osm2rdf::config::constants::BZIP2_EXTENSION) {
output += osm2rdf::config::constants::BZIP2_EXTENSION;
}

if (outputCompress == GZ && !output.empty() &&
output.extension() != osm2rdf::config::constants::GZ_EXTENSION) {
output += osm2rdf::config::constants::GZ_EXTENSION;
}

// osmium location cache
cache = std::filesystem::absolute(cacheOp->value()).string();

Expand Down
93 changes: 81 additions & 12 deletions src/util/Output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// along with osm2rdf. If not, see <https://www.gnu.org/licenses/>.

#include <bzlib.h>
#include <zlib.h>

#include <cassert>
#include <cmath>
Expand All @@ -31,6 +32,10 @@
#include "osm2rdf/config/Config.h"
#include "osm2rdf/util/Output.h"

using osm2rdf::config::BZ2;
using osm2rdf::config::GZ;
using osm2rdf::config::NONE;

// ____________________________________________________________________________
osm2rdf::util::Output::Output(const osm2rdf::config::Config& config,
const std::string& prefix)
Expand All @@ -56,21 +61,24 @@ bool osm2rdf::util::Output::open() {
assert(_partCount > 0);

_rawFiles.resize(_partCount);
_gzFiles.resize(_partCount);
_files.resize(_partCount);
_outBufPos.resize(_partCount);

for (size_t i = 0; i < _partCount; i++) {
_rawFiles[i] = fopen(partFilename(i).c_str(), "w");
if (_config.outputCompress == BZ2 || _config.outputCompress == NONE) {
_rawFiles[i] = fopen(partFilename(i).c_str(), "w");

if (_rawFiles[i] == NULL) {
std::stringstream ss;
ss << "Could not open file '" << partFilename(i)
<< "' for writing:\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
if (_rawFiles[i] == NULL) {
std::stringstream ss;
ss << "Could not open file '" << partFilename(i)
<< "' for writing:\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
}

if (_config.outputCompress) {
if (_config.outputCompress == BZ2) {
int err = 0;
_files[i] = BZ2_bzWriteOpen(&err, _rawFiles[i], 3, 0, 30);
if (err != BZ_OK) {
Expand All @@ -81,6 +89,17 @@ bool osm2rdf::util::Output::open() {
throw std::runtime_error(ss.str());
}
}

if (_config.outputCompress == GZ) {
_gzFiles[i] = gzopen(partFilename(i).c_str(), "w");
if (_gzFiles[i] == Z_NULL) {
std::stringstream ss;
ss << "Could not open gz file '" << partFilename(i)
<< "' for writing:\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
}
_outBuffers[i] = new unsigned char[BUFFER_S];
}

Expand Down Expand Up @@ -109,7 +128,7 @@ void osm2rdf::util::Output::close() {
_outBuffers[i][_outBufPos[i]] = '\0';
std::cout << reinterpret_cast<const char*>(_outBuffers[i]);
}
} else if (_config.outputCompress) {
} else if (_config.outputCompress == BZ2) {
#pragma omp parallel for
for (size_t i = 0; i < _partCount; ++i) {
int err = 0;
Expand All @@ -125,6 +144,20 @@ void osm2rdf::util::Output::close() {
BZ2_bzWriteClose(&err, _files[i], 0, 0, 0);
fclose(_rawFiles[i]);
}
} else if (_config.outputCompress == GZ) {
#pragma omp parallel for
for (size_t i = 0; i < _partCount; ++i) {
int r = gzwrite(_gzFiles[i], _outBuffers[i], _outBufPos[i]);
if (r != (int)_outBufPos[i]) {
gzclose(_gzFiles[i]);
std::stringstream ss;
ss << "Could not write to gz file '"
<< partFilename(i) << "':\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
gzclose(_gzFiles[i]);
}
} else {
#pragma omp parallel for
for (size_t i = 0; i < _partCount; ++i) {
Expand Down Expand Up @@ -215,7 +248,7 @@ void osm2rdf::util::Output::write(std::string_view strv, size_t t) {
assert(t < _partCount);
if (_toStdOut) {
// on output to stdout, we only flush on newlines
} else if (_config.outputCompress) {
} else if (_config.outputCompress == BZ2) {
if (_outBufPos[t] + strv.size() + 1 >= BUFFER_S) {
int err = 0;
BZ2_bzWrite(&err, _files[t], _outBuffers[t], _outBufPos[t]);
Expand All @@ -229,6 +262,19 @@ void osm2rdf::util::Output::write(std::string_view strv, size_t t) {
}
_outBufPos[t] = 0;
}
} else if (_config.outputCompress == GZ) {
if (_outBufPos[t] + strv.size() + 1 >= BUFFER_S) {
int r = gzwrite(_gzFiles[t], _outBuffers[t], _outBufPos[t]);
if (r != (int)_outBufPos[t]) {
gzclose(_gzFiles[t]);
std::stringstream ss;
ss << "Could not write to gz file '"
<< partFilename(t) << "':\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
_outBufPos[t] = 0;
}
} else {
if (_outBufPos[t] + strv.size() + 1 >= BUFFER_S) {
size_t r =
Expand Down Expand Up @@ -266,7 +312,7 @@ void osm2rdf::util::Output::write(const char c, size_t t) {
assert(t < _partCount);
if (_toStdOut) {
// on output to stdout, we only flush on newlines
} else if (_config.outputCompress) {
} else if (_config.outputCompress == BZ2) {
if (_outBufPos[t] + 2 >= BUFFER_S) {
int err = 0;
BZ2_bzWrite(&err, _files[t], _outBuffers[t], _outBufPos[t]);
Expand All @@ -280,6 +326,19 @@ void osm2rdf::util::Output::write(const char c, size_t t) {
}
_outBufPos[t] = 0;
}
} else if (_config.outputCompress == GZ) {
if (_outBufPos[t] + 2 >= BUFFER_S) {
int r = gzwrite(_gzFiles[t], _outBuffers[t], _outBufPos[t]);
if (r != (int)_outBufPos[t]) {
gzclose(_gzFiles[t]);
std::stringstream ss;
ss << "Could not write to gz file '"
<< partFilename(t) << "':\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
_outBufPos[t] = 0;
}
} else {
if (_outBufPos[t] + 2 >= BUFFER_S) {
size_t r =
Expand Down Expand Up @@ -320,7 +379,7 @@ void osm2rdf::util::Output::flush(size_t i) {
_lines[i] = 0;
_outBuffers[i][_outBufPos[i]] = '\0';
std::cout << reinterpret_cast<const char*>(_outBuffers[i]);
} else if (_config.outputCompress) {
} else if (_config.outputCompress == BZ2) {
int err = 0;
BZ2_bzWrite(&err, _files[i], _outBuffers[i], _outBufPos[i]);
if (err == BZ_IO_ERROR) {
Expand All @@ -331,6 +390,16 @@ void osm2rdf::util::Output::flush(size_t i) {
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
} else if (_config.outputCompress == GZ) {
int r = gzwrite(_gzFiles[i], _outBuffers[i], _outBufPos[i]);
if (r != (int)_outBufPos[i]) {
gzclose(_gzFiles[i]);
std::stringstream ss;
ss << "Could not write to gz file '"
<< partFilename(i) << "':\n";
ss << strerror(errno) << std::endl;
throw std::runtime_error(ss.str());
}
} else {
size_t r =
fwrite(_outBuffers[i], sizeof(char), _outBufPos[i], _rawFiles[i]);
Expand Down

0 comments on commit 2b1b2a2

Please sign in to comment.