Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added timeRange to HBaseStorage #7

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPu
private final long limit_;
private final int caching_;
private final boolean noWAL_;
private final long minTimeRange;
private final long maxTimeRange;

protected transient byte[] gt_;
protected transient byte[] gte_;
Expand All @@ -176,6 +178,8 @@ private static void populateValidOptions() {
validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
"HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
validOptions_.addOption("minTimeRange", true, "Timestamp most be greater then this value");
validOptions_.addOption("maxTimeRange", true, "Timestamp must be less then this value");
}

/**
Expand Down Expand Up @@ -214,6 +218,8 @@ public HBaseStorage(String columnList) throws ParseException, IOException {
* <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column names (default true)
* <li>-caching=numRows number of rows to cache (faster scans, more memory).
* <li>-noWAL=(true|false) Sets the write ahead to false for faster loading.
* <li>-minTimeRange= min Scan TimeRagne
* <li>-maxTimeRange= max Scan TimeRange
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
* </ul>
Expand All @@ -227,7 +233,7 @@ public HBaseStorage(String columnList, String optString) throws ParseException,
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace]", validOptions_ );
formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] [-columnPrefix] [-caching] [-caster] [-noWAL] [-limit] [-delim] [-ignoreWhitespace] [-minTimeRange] [-maxTimeRange]", validOptions_ );
throw e;
}

Expand All @@ -245,6 +251,8 @@ public HBaseStorage(String columnList, String optString) throws ParseException,
ignoreWhitespace_ = false;
}
}



columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);

Expand All @@ -271,6 +279,21 @@ public HBaseStorage(String columnList, String optString) throws ParseException,
caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
noWAL_ = configuredOptions_.hasOption("noWAL");

if (configuredOptions_.hasOption("minTimeRange")){
minTimeRange = Long.parseLong(configuredOptions_.getOptionValue("minTimeRange"));
}else
{
minTimeRange = Long.MIN_VALUE;
}

if (configuredOptions_.hasOption("maxTimeRange")){
maxTimeRange = Long.parseLong(configuredOptions_.getOptionValue("maxTimeRange"));
}else
{
maxTimeRange = Long.MIN_VALUE;
}

initScan();
}

Expand Down Expand Up @@ -327,7 +350,7 @@ private List<ColumnInfo> parseColumnList(String columnList,
return columnInfo;
}

private void initScan() {
private void initScan() throws IOException {
scan = new Scan();

// Map-reduce jobs should not run with cacheBlocks
Expand All @@ -350,6 +373,9 @@ private void initScan() {
lte_ = Bytes.toBytesBinary(Utils.slashisize(configuredOptions_.getOptionValue("lte")));
addRowFilter(CompareOp.LESS_OR_EQUAL, lte_);
}
if (configuredOptions_.hasOption("minTimeRange") || configuredOptions_.hasOption("maxTimeRange")){
scan.setTimeRange(minTimeRange, maxTimeRange);
}

// apply any column filters
FilterList allColumnFilters = null;
Expand Down