Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timebucketbucketizer #66

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ You can define bucketizers as follows:
js:outputStreamId <MyEpicStream>.
```


#### Example of a time-based fragmentation

```turtle
Expand Down Expand Up @@ -110,6 +111,30 @@ The members need to be arrived in order of their timestamps.
When a member arrives, all buckets that hold members with a timestamp older than the new member's timestamp will be made immutable and no new members can be added to them.


#### Example of a timebucket based fragmentation

```turtle
<timebucket-fragmentation> a tree:TimeBucketFragmentation;
tree:timestampPath <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.resultTime>;
tree:buffer 5000; # members can arrive 5 seconds out of sync ()
tree:level ( [ # Create 5 levels, resulting uri's <year>/<month>/<day>/<hour>/<minute>
tree:range "year", "month";
tree:maxSize 0; # place no members at this level
] [
tree:range "day-of-month";
tree:maxSize 1000; # place at most 1000 members at this level
] [
tree:range "hour";
tree:maxSize 1000; # place at most 1000 members at this level
] [
tree:range "minute";
tree:maxSize 10000; # place at most 10000 members at this level, this is the last level thus excess members are also put in this level
ajuvercr marked this conversation as resolved.
Show resolved Hide resolved
] ).
```

This fragmentation will look like this `${year}-${month}/${day}/${hour}/${minute}` after ingesting 2001 members in the same hour (filling day and hour).


### [`js:Ldesify`](https://github.com/rdf-connect/sds-processors/blob/master/configs/ldesify.ttl#L10)

This processor takes a stream of raw entities (e.g., out from a RML transformation process) and creates versioned entities appending the current timestamp to the entity IRI to make it unique. It is capable of keeping a state so that unmodified entities are filtered.
Expand Down
42 changes: 42 additions & 0 deletions configs/bucketizer_configs.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,45 @@
sh:minCount 1;
].

[ ] a sh:NodeShape;
sh:targetClass tree:TimeBucketLevel;
sh:property [
sh:name "ranges";
sh:path tree:range;
sh:minCount 1;
ajuvercr marked this conversation as resolved.
Show resolved Hide resolved
sh:datatype xsd:string;
sh:in ( "year" "month" "day-of-month" "hour" "minute" "second" "millisecond" );
], [
sh:name "amount";
sh:path tree:maxSize;
sh:maxCount 1;
sh:minCount 1;
sh:datatype xsd:integer;
].

[ ] a sh:NodeShape;
sh:targetClass tree:TimeBucketFragmentation;
sh:property [
sh:name "path";
sh:path tree:timestampPath;
sh:class rdfl:PathLens;
sh:maxCount 1;
sh:minCount 1;
], [
sh:name "pathQuads";
sh:path tree:timestampPath;
sh:class <RdfThing>;
sh:maxCount 1;
sh:minCount 1;
], [
sh:name "levels";
sh:path tree:level;
sh:class tree:TimeBucketLevel;
sh:minCount 1;
], [
sh:name "timeBufferMs";
sh:path tree:buffer;
sh:datatype xsd:integer;
sh:maxCount 1;
].

6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@rdfc/sds-processors-ts",
"version": "1.0.0-alpha.3",
"version": "1.0.0-alpha.5",
"type": "module",
"scripts": {
"build": "tspc && tsc-alias",
Expand Down Expand Up @@ -39,7 +39,7 @@
"devDependencies": {
"@ajuvercr/ts-transformer-inline-file": "^0.2.0",
"@rdfc/js-runner": "^1.0.0-alpha.0",
"@rdfjs/types": "^1.1.0",
"@rdfjs/types": "^1.1.2",
"@types/jsonld": "^1.5.15",
"@types/n3": "^1.21.0",
"@types/node": "^22.7.4",
Expand Down
3 changes: 2 additions & 1 deletion src/bucketizers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ export async function bucketize(
savePath: string | undefined,
sourceStream: Term | undefined,
resultingStream: Term,
prefix = "",
) {
set_metadata(channels, resultingStream, sourceStream, config);
const save = read_save(savePath);
Expand Down Expand Up @@ -318,7 +319,7 @@ export async function bucketize(
requestedBuckets,
newMembers,
newRelations,
sourceStream?.value || "root",
prefix,
);

record_buckets.forEach((x) => requestedBuckets.add(x));
Expand Down
42 changes: 33 additions & 9 deletions src/bucketizers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { readFileSync } from "fs";
import * as path from "path";
import { Term } from "@rdfjs/types";
import { BasicLensM, Cont } from "rdf-lens";
import {
Expand All @@ -16,13 +14,20 @@ import SubjectBucketizer from "./subjectBucketizer";
import TimebasedBucketizer from "./timebasedBucketizer";

import { $INLINE_FILE } from "@ajuvercr/ts-transformer-inline-file";
import TimeBucketBucketizer, { TimeBucketTreeConfig } from "./timeBucketTree";

export { TimeBucketTreeConfig } from "./timeBucketTree";

const df = new DataFactory();
export const SHAPES_TEXT = $INLINE_FILE("../../configs/bucketizer_configs.ttl");

export type BucketizerConfig = {
type: Term;
config: SubjectFragmentation | PageFragmentation | TimebasedFragmentation;
config:
| SubjectFragmentation
| PageFragmentation
| TimebasedFragmentation
| TimeBucketTreeConfig;
};

export type SubjectFragmentation = {
Expand Down Expand Up @@ -78,10 +83,23 @@ function createBucketizer(config: BucketizerConfig, save?: string): Bucketizer {
<TimebasedFragmentation>config.config,
save,
);
case TREE.custom("TimeBucketFragmentation"):
return new TimeBucketBucketizer(
<TimeBucketTreeConfig>config.config,
save,
);
}
throw "Unknown bucketizer " + config.type.value;
}

function combineIds(id1: string, id2: string) {
const id1Slash = id1.endsWith("/");
const id2Slash = id1.startsWith("/");
if (id1Slash && id2Slash) return id1 + id2.slice(1);
if (id1 === "" || id1Slash || id2Slash) return id1 + id2;
return id1 + "/" + id2;
}

export class BucketizerOrchestrator {
private readonly configs: BucketizerConfig[];

Expand All @@ -104,7 +122,7 @@ export class BucketizerOrchestrator {
origin: Bucket;
relation: BucketRelation;
}[],
prefix = "",
prefix: string = "",
): string[] {
let queue = [prefix];

Expand Down Expand Up @@ -139,12 +157,18 @@ export class BucketizerOrchestrator {
const bucketizer = this.getBucketizer(i, prefix);

const getBucket = (value: string, root?: boolean) => {
const terms = value.split("/");
const key = encodeURIComponent(
decodeURIComponent(terms[terms.length - 1]),
);
const encodedValue = value
.split("/")
ajuvercr marked this conversation as resolved.
Show resolved Hide resolved
.map((x) => encodeURIComponent(decodeURIComponent(x)))
.join("/");
const key = value.endsWith("/")
? encodedValue
: encodedValue + "/";
// If the requested bucket is the root, it actually is the previous bucket
const id = root ? prefix : prefix + "/" + key;

// avoid double slashes and leading slashes
const next = combineIds(prefix, key);
const id = root ? prefix : next;
if (!buckets[id]) {
buckets[id] = new Bucket(df.namedNode(id), [], false);

Expand Down
3 changes: 2 additions & 1 deletion src/bucketizers/subjectBucketizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Term } from "@rdfjs/types";
import { Bucket, RdfThing, Record } from "../utils";
import { TREE } from "@treecg/types";
import { getLoggerFor } from "../utils/logUtil";
import { Writer } from "n3";

export default class SubjectBucketizer implements Bucketizer {
protected readonly logger = getLoggerFor(this);
Expand Down Expand Up @@ -42,7 +43,7 @@ export default class SubjectBucketizer implements Bucketizer {

const out: Bucket[] = [];

const root = getBucket("root", true);
const root = getBucket("", true);

if (values.length === 0 && this.defaultName) {
values.push({ value: this.defaultName });
Expand Down
Loading
Loading