Skip to content

Commit

Permalink
RS-550: Document support for conditional query on replication engine (#…
Browse files Browse the repository at this point in the history
…138)

* add when filed to the replication api reference

* update rust documentation

* update python example

* update js example

* fix rust example

* update cpp example

* fix when condition

* fix broken links

* fix canonical links in py sdk

* fix webconsole replication guide format

* update reduct-cli example for manual replication

* bold links

* clarify what's replicated

* fix tabs
  • Loading branch information
atimin authored Jan 6, 2025
1 parent 03f685c commit 2855d89
Show file tree
Hide file tree
Showing 29 changed files with 174 additions and 139 deletions.
5 changes: 4 additions & 1 deletion docs/examples/cli/data_replication_manual.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
reduct-cli alias add src-instance -L http://localhost:8383 -t my-token
reduct-cli alias add dst-instance -L https://play.reduct.store -t reductstore
reduct-cli cp src-instance/example-bucket dst-instance/demo --include "anomaly=true" --exclude "status=ok"
reduct-cli cp src-instance/example-bucket dst-instance/demo --when '{
"&status": {"$ne": "ok"},
"&anomaly": {"$eq": true}
}'
4 changes: 2 additions & 2 deletions docs/examples/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 20)

find_package(ZLIB)
find_package(OpenSSL)
find_package(ReductCpp 1.10.0)
find_package(ReductCpp 1.14.0)

## Buckets

Expand Down Expand Up @@ -127,4 +127,4 @@ target_link_libraries(data_management_delete_records.cxx ${REDUCT_CPP_LIBRARIES}
add_executable(data_management_delete_query.cxx
src/data_management_delete_query.cc
)
target_link_libraries(data_management_delete_query.cxx ${REDUCT_CPP_LIBRARIES} ${ZLIB_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto)
target_link_libraries(data_management_delete_query.cxx ${REDUCT_CPP_LIBRARIES} ${ZLIB_LIBRARIES} OpenSSL::SSL OpenSSL::Crypto)
4 changes: 2 additions & 2 deletions docs/examples/cpp/src/data_replication_create.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int main() {
.dst_host = "https://play.reduct.store",
.dst_token = "reductstore",
.entries = {"cpp-example"},
.include = {{"anomaly", "1"}},
.when = R"({"&anomaly":{"$eq": 1}})"
});
assert(repl_err == Error::kOk);
}
}
2 changes: 1 addition & 1 deletion docs/examples/js/src/data_replication_create.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ const settings = {
dstHost: "https://play.reduct.store",
dstToken: "reductstore",
entries: ["js-entry"],
include: { anomaly: "1" },
when: { "&anomaly": { $eq: 1 } },
};
await client.createReplication("my-replication", settings);
21 changes: 12 additions & 9 deletions docs/examples/provisioning/data_replication_create.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ services:
volumes:
- ./data:/data
environment:
- RS_API_TOKEN=my-api-token
- RS_BUCKET_1_NAME=src_bucket
- RS_REPLICATION_1_NAME=my_replication
- RS_REPLICATION_1_SRC_BUCKET=src_bucket
- RS_REPLICATION_1_DST_BUCKET=demo
- RS_REPLICATION_1_DST_HOST=https://play.reduct.store
- RS_REPLICATION_1_DST_TOKEN=reductstore
- RS_REPLICATION_1_ENTRIES=exampl-*
- RS_REPLICATION_1_INCLUDE_anomaly=1
RS_API_TOKEN: my-api-token
RS_BUCKET_1_NAME: src_bucket
RS_REPLICATION_1_NAME: my_replication
RS_REPLICATION_1_SRC_BUCKET: src_bucket
RS_REPLICATION_1_DST_BUCKET: demo
RS_REPLICATION_1_DST_HOST: https://play.reduct.store
RS_REPLICATION_1_DST_TOKEN: reductstore
RS_REPLICATION_1_ENTRIES: exampl-*
RS_REPLICATION_1_WHEN: |
{
"&anomaly": { "$$eq": true }
}
4 changes: 2 additions & 2 deletions docs/examples/py/src/data_replication_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ async def main():
src_bucket="my-bucket",
dst_bucket="demo",
dst_host="https://play.reduct.store",
dst_api_token="reductstore",
dst_token="reductstore",
entries=["py-example"],
include={"anomaly": "1"},
when= {"&anomaly": {"$eq": 1}},
)
await client.create_replication("my-replication", replication_settings)

Expand Down
1 change: 0 additions & 1 deletion docs/examples/rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ edition = "2021"

[dependencies]
reduct-rs= { git = "https://github.com/reductstore/reduct-rs.git"}
serde_json = "1.0.133"
tokio = { version = "1", features = ["rt-multi-thread"]}
futures = "0.3.30"
futures-util = "0.3.30"
Expand Down
5 changes: 2 additions & 3 deletions docs/examples/rs/examples/data_management_delete_query.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::time::{Duration, SystemTime};

use bytes::Bytes;
use reduct_rs::{ReductClient, ReductError};
use serde_json::json;
use reduct_rs::{condition, ReductClient, ReductError};
use tokio;

#[tokio::main]
Expand Down Expand Up @@ -42,7 +41,7 @@ async fn main() -> Result<(), ReductError> {
// You can also delete all records with a specific label
bucket
.remove_query("rs-example")
.when(json!({"&key1": {"$eq": "value1"}}))
.when(condition!({"&key1": {"$eq": "value1"}}))
.send()
.await?;

Expand Down
5 changes: 2 additions & 3 deletions docs/examples/rs/examples/data_querying_filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures::StreamExt;
use reduct_rs::{ReductClient, ReductError};
use serde_json::json;
use reduct_rs::{condition, ReductClient, ReductError};
use tokio;

#[tokio::main]
Expand All @@ -19,7 +18,7 @@ async fn main() -> Result<(), ReductError> {
// Query 10 photos from "imdb" entry which taken after 2006 with the face score less than 4
let query = bucket
.query("imdb")
.when(json!({
.when(condition!({
"&photo_taken": {"$gt": 2006},
"&face_score": {"$gt": 4}
}))
Expand Down
11 changes: 6 additions & 5 deletions docs/examples/rs/examples/data_replication_create.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use reduct_rs::{Labels, ReductClient, ReductError};
use reduct_rs::{condition, Labels, ReductClient, ReductError};
use tokio;

#[tokio::main]
Expand All @@ -23,10 +23,11 @@ async fn main() -> Result<(), ReductError> {
.dst_host("https://play.reduct.store")
.dst_token("reductstore")
.entries(vec!["rs-example".to_string()])
.include(Labels::from_iter(vec![(
"anomaly".to_string(),
"1".to_string(),
)]))
.when(condition!({
"&anomaly": {
"$eq": 1
}
}))
.send()
.await?;
Ok(())
Expand Down
5 changes: 2 additions & 3 deletions docs/examples/rs/examples/quick_start.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use futures_util::stream::StreamExt;
use reduct_rs::{QuotaType, ReductClient, ReductError};
use serde_json::json;
use reduct_rs::{condition, QuotaType, ReductClient, ReductError};
use std::pin::pin;
use std::time::{Duration, SystemTime};
use tokio;
Expand Down Expand Up @@ -45,7 +44,7 @@ async fn main() -> Result<(), ReductError> {
.query("sensor-1")
.start(start)
.stop(start + Duration::from_secs(2))
.when(json!({"&score": {"$gt": 15}}))
.when(condition!({"&score": {"$gt": 15}}))
.send()
.await?;

Expand Down
1 change: 0 additions & 1 deletion docs/getting-started/with-rust.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import NextStep from "./common/next_steps.mdx";
```toml
[dependencies]
reduct-rs = "1"
serde_json = "1"
tokio = "1"
futures = "0.3"
futures-util = "0.3"
Expand Down
86 changes: 43 additions & 43 deletions docs/guides/data-replication.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Data replication is a process of copying data from one database to another. Redu

## Concepts

The data replication in ReductStore is based on the concept of a **Replication Task**. A replication task is a configurable thread that filters and copies records from a source bucket to a target bucket. The buckets can belong to the same or different ReductStore instances. For more information on buckets, see the **[Buckets](/docs/guides/buckets.mdx)** guide.
The data replication in ReductStore is based on the concept of a **Replication Task**. A replication task is a configurable thread that filters and copies records from a source bucket to a target bucket. The target bucket can belong to the same or a different ReductStore instance. For more information on buckets, see the **[Buckets](/docs/guides/buckets.mdx)** guide.

![Data Replication with ReductStore](./img/data_replication_diagram.png)

Expand All @@ -30,17 +30,21 @@ For efficiency, the replication task replicates multiple records in a single bat
Once the record has been successfully replicated, the replication task deletes the record from the transaction log.
This approach ensures that data is replicated in real time and that the replication process is fault-tolerant and can recover from failures.

:::info
The replication engine only replicates new records written or updated to the source bucket after the replication task is created.
It doesn't replicate deletions or existing records in the source bucket.
:::

### Conditional Replication

A replication task can filter records before replicating them to the target bucket. You can specify the following filters:

| Parameter | Description | Type |
| --------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `entries` | A list of entries that the replication task will use to filter records. Only records with these entries will be replicated. If the list is empty, all records will be replicated. You can use the `*` wildcard to match any entry. | List of strings |
| `include` | A list of key-value pairs that the replication task will use to filter records. Only records with these labels will be replicated. | List of key-value pairs |
| `exclude` | A list of key-value pairs that the replication task will use to filter records. Records with these labels will not be replicated. | List of key-value pairs |
| `each_s` | Replicate a record every S seconds | Float |
| `each_n` | Replicate only every N record | Integer |
| Parameter | Description | Type |
| --------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- |
| `entries` | A list of entries that the replication task will use to filter records. Only records with these entries will be replicated. If the list is empty, all records will be replicated. You can use the `*` wildcard to match any entry. | List of strings |
| `when` | A set of conditions that the record must meet to be replicated. The conditions are based on the record labels. For more information on conditional queries, see the **[Conditional Query Reference](/docs/conditional-query/index.mdx)** | JSON-like object |
| `each_s` | Replicate a record every S seconds | Float |
| `each_n` | Replicate only every N record | Integer |

### Usage Example

Expand All @@ -54,10 +58,7 @@ In this case, the conditional replication settings will be:

```yaml
entries: ["sensor-1"]
include:
engine_status: "working"
exclude:
sensor_status: "corrupted"
where: { "&rms": { "$gt": 2.0 }, "&quality": { "$eq": "ok" } }
```
See the next section for more information on how to create a replication task with conditional replication settings.
Expand Down Expand Up @@ -101,22 +102,24 @@ import CreateReplicaProvision from "!!raw-loader!../examples/provisioning/data_r
<CodeBlock language="bash">{CreateReplicaCli}</CodeBlock>
</TabItem>
<TabItem value="Web Console">
Steps to create a replication task using the Web Console: 1. Open the Web
Console at **[http://127.0.0.1:8383](http://127.0.0.1:8383)** in your
browser. 2. Enter the API token if the authorization is enabled. 3. Click on
the **"Replication"** tab in the left sidebar. 4. Click on the plus icon in
Steps to create a replication task using the Web Console:
1. Open the Web Console at **[http://127.0.0.1:8383](http://127.0.0.1:8383)** in your browser.
2. Enter the API token if the authorization is enabled.
3. Click on
the **"Replication"** tab in the left sidebar.
4. Click on the plus icon in
the top right corner to create a new replication task: ![Create Replication
Task](./img/data_replication_create.webp) 5. In the **"Create a new
replication"** dialog, entre the name of the replication name and settings:{" "}
<img
Task](./img/data_replication_create.webp)
5. In the **"Create a new
replication"** dialog, entre the name of the replication name and settings:{" "}<img
src={
require("@site/docs/guides/img/data_replication_settings.webp").default
}
alt="Create Replication Task Dialog"
width="300"
/>
6. Click on the **"Create Replication"** button to create the replication
task.
6. Click on the **"Create Replication"** button to create the replication task.

</TabItem>
<TabItem value="Python">
<CodeBlock language="python">{CreateReplicaPy}</CodeBlock>
Expand Down Expand Up @@ -166,16 +169,15 @@ import BrowseReplicaCurl from "!!raw-loader!../examples/curl/data_replication_br
<CodeBlock language="bash">{BrowseReplicaCLI}</CodeBlock>
</TabItem>
<TabItem value="Web Console">
Steps to browse a replication task using the Web Console: 1. Open the Web
Console at **[http://127.0.0.1:8383](http://127.0.0.1:8383)** in your
browser. 2. Enter the API token if the authorization is enabled. 3. Click on
the **"Replication"** tab in the left sidebar. 4. You will see a list of all
replication tasks with their status 5. Click on a specific replication task
in the list: ![Browse Replication Task](./img/data_replication_list.webp) 6.
You will see the details of the replication task: ![Browse Replication Task
Details](./img/data_replication_details.webp) 7. You can also see or update
the replication task settings by clicking he **cog icon(⚙️)** in the
replication task panel.
Steps to browse a replication task using the Web Console:
1. Open the Web Console at **[http://127.0.0.1:8383](http://127.0.0.1:8383)** in your browser.
2. Enter the API token if the authorization is enabled.
3. Click on the **"Replication"** tab in the left sidebar.
4. You will see a list of all replication tasks with their status
5. Click on a specific replication task in the list: ![Browse Replication Task](./img/data_replication_list.webp)
6. You will see the details of the replication task: ![Browse Replication Task Details](./img/data_replication_details.webp)
7. You can also see or update the replication task settings by clicking he **cog icon(⚙️)** in the replication task panel.

</TabItem>
<TabItem value="Python">
<CodeBlock language="python">{BrowseReplicaPy}</CodeBlock>
Expand All @@ -199,7 +201,7 @@ import BrowseReplicaCurl from "!!raw-loader!../examples/curl/data_replication_br
You can remove a replication task by using the ReductStore SDKs, REST API, CLI and Web Console. Once you remove a replication task, the replication process stops immediately, and the transaction log is deleted from the database.

:::info
You can't remove a provisioned replication task. Before removing a provisioned replication task, you need to
You can't remove a provisioned replication task. Before removing it, you need to
unset the corresponding environment variables and restart the ReductStore instance.
:::

Expand All @@ -215,17 +217,15 @@ import RemoveReplicaCurl from "!!raw-loader!../examples/curl/data_replication_re
<CodeBlock language="bash">{RemoveReplicaCli}</CodeBlock>
</TabItem>
<TabItem value="Web Console">
Steps to remove a replication task using the Web Console: 1. Open the Web
Console at **[http://127.0.0.1:8383](http://127.0.0.1:8383)** in your
browser. 2. Enter the API token if the authorization is enabled. 3. Click on
the **"Replication"** tab in the left sidebar. 4. You will see a list of all
replication tasks with their status 5. Click on a specific replication task
in the list: ![Browse Replication Task](./img/data_replication_list.webp) 6.
Click on the **"Remove"** button in the replication task panel. ![Remove
Replication Task](./img/data_replication_remove.webp) 7. Confirm the
deletion by typing the replication task name and clicking on the **Remove**
button: ![Remove Replication Task
Confirmation](./img/data_replication_remove_confirm.webp)
Steps to remove a replication task using the Web Console:
1. Open the Web Console at **[http://127.0.0.1:8383](http://127.0.0.1:8383)** in your browser.
2. Enter the API token if the authorization is enabled.
3. Click on the **"Replication"** tab in the left sidebar.
4. You will see a list of all replication tasks with their status
5. Click on a specific replication task in the list: ![Browse Replication Task](./img/data_replication_list.webp)
6. Click on the **"Remove"** button in the replication task panel. ![Remove Replication Task](./img/data_replication_remove.webp)
7. Confirm the deletion by typing the replication task name and clicking on the **Remove** button: ![Remove Replication Task Confirmation](./img/data_replication_remove_confirm.webp)

</TabItem>
<TabItem value="Python">
<CodeBlock language="python">{RemoveReplicaPy}</CodeBlock>
Expand Down
2 changes: 1 addition & 1 deletion docs/http-api/entry-api/read_data.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ import SwaggerComponent from "@site/src/components/SwaggerComponent";
The method retrieves a batch of records for the given query ID and sends their contents in the HTTP response
body.

The method works in conjunction with the `GET /b/:bucket/:entry/q` method and uses [the Batch Protocol](https://reduct.store/docs/http-api/entry-api/batch-protocol) to read records in a batch.
The method works in conjunction with the `GET /b/:bucket/:entry/q` method and uses **[the Batch Protocol](/docs/http-api/entry-api/index.mdx#batch-protocol)** to read records in a batch.

If authentication is enabled, the method needs a valid API token with read access to the entry's bucket.

Expand Down
Loading

0 comments on commit 2855d89

Please sign in to comment.