From bfe561f2b661840be5c2cdbca67ce346a35e460f Mon Sep 17 00:00:00 2001 From: Yevgeni Tsodikov Date: Sun, 14 Jan 2024 17:11:36 +0200 Subject: [PATCH] Batch policy --- README.md | 5 +++ .../clojure/aerospike_clj/batch_client.clj | 10 ++++-- .../clojure/aerospike_clj/batch_policy.clj | 36 +++++++++++++++++++ src/main/clojure/aerospike_clj/listeners.clj | 19 ++++------ src/main/clojure/aerospike_clj/policy.clj | 31 +--------------- .../integration/integration_test.clj | 16 ++++----- 6 files changed, 64 insertions(+), 53 deletions(-) create mode 100644 src/main/clojure/aerospike_clj/batch_policy.clj diff --git a/README.md b/README.md index 383169b..7eb5a99 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,11 @@ the `com.aerospike/aerospike-client` dependency to be used. Please note that the `aerospike-clj.batch-client` namespace requires the `com.aerospike/aerospike-client` dependency to be version `6.0.0` or higher. +### Setting up a client policy for `com.aerospike/aerospike-client` with version 6.0.0 and above + +The `com.aerospike/aerospike-client` dependency version `6.0.0` is a breaking change. +To set batch operation policies, please use the `aerospike-clj.batch-policy` namespace. + ## Testing ### Unit tests Executed via running `lein test`. diff --git a/src/main/clojure/aerospike_clj/batch_client.clj b/src/main/clojure/aerospike_clj/batch_client.clj index a7241b8..ccd4dbd 100644 --- a/src/main/clojure/aerospike_clj/batch_client.clj +++ b/src/main/clojure/aerospike_clj/batch_client.clj @@ -5,13 +5,19 @@ [aerospike-clj.protocols :as pt] [promesa.core :as p]) (:import (aerospike_clj.client SimpleAerospikeClient) - (aerospike_clj.listeners AsyncBatchOperateListListener) - (com.aerospike.client AerospikeClient BatchRecord) + (com.aerospike.client AerospikeClient AerospikeException BatchRecord) (com.aerospike.client.async EventLoop EventLoops) (com.aerospike.client.listener BatchOperateListListener) (com.aerospike.client.policy BatchPolicy) (java.util List))) +(deftype ^:private AsyncBatchOperateListListener [op-future] + BatchOperateListListener + (^void onSuccess [_this ^List records ^boolean _status] + (p/resolve! op-future records)) + (^void onFailure [_this ^AerospikeException ex] + (p/reject! op-future ex))) + (defn- batch-record->map [^BatchRecord batch-record] (let [k (.key batch-record)] (-> (record/record->map (.record batch-record)) diff --git a/src/main/clojure/aerospike_clj/batch_policy.clj b/src/main/clojure/aerospike_clj/batch_policy.clj new file mode 100644 index 0000000..6c3effe --- /dev/null +++ b/src/main/clojure/aerospike_clj/batch_policy.clj @@ -0,0 +1,36 @@ +(ns aerospike-clj.batch-policy + (:require [aerospike-clj.policy :as policy]) + (:import (com.aerospike.client.policy BatchPolicy BatchWritePolicy ClientPolicy))) + +(defn map->batch-policy + "Create a `BatchPolicy` from a map. + This function is slow due to possible reflection." + ^BatchPolicy [conf] + (let [bp (BatchPolicy. (policy/map->policy conf)) + conf (merge {"timeoutDelay" 3000} conf)] + (policy/set-java bp conf "allowInline") + (policy/set-java bp conf "respondAllKeys") + (policy/set-java bp conf "maxConcurrentThreads") + (policy/set-java bp conf "sendSetName") + bp)) + +(defn map->batch-write-policy + "Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter. + This function is slow due to possible reflection." + ^BatchWritePolicy [conf] + (let [p (BatchWritePolicy.)] + (policy/set-java-enum p conf "RecordExistsAction") + (policy/set-java-enum p conf "CommitLevel") + (policy/set-java-enum p conf "GenerationPolicy") + (policy/set-java p conf "filterExp") + (policy/set-java p conf "generation") + (policy/set-java p conf "expiration") + (policy/set-java p conf "durableDelete") + (policy/set-java p conf "sendKey") + p)) + +(defn add-batch-write-policy + "Set the [[batchWritePolicyDefault]] or the [[batchParentPolicyWriteDefault]] in a [[ClientPolicy]]." + [^ClientPolicy client-policy conf] + (set! (.batchParentPolicyWriteDefault client-policy) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf))) + (set! (.batchWritePolicyDefault client-policy) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf)))) diff --git a/src/main/clojure/aerospike_clj/listeners.clj b/src/main/clojure/aerospike_clj/listeners.clj index 87171c9..081118c 100644 --- a/src/main/clojure/aerospike_clj/listeners.clj +++ b/src/main/clojure/aerospike_clj/listeners.clj @@ -1,10 +1,10 @@ (ns aerospike-clj.listeners - (:require [promesa.core :as p] - [aerospike-clj.aerospike-record :as record]) - (:import (java.util List Map) - (com.aerospike.client Key Record AerospikeException AerospikeException$QueryTerminated) - (com.aerospike.client.listener RecordListener WriteListener DeleteListener - ExistsListener BatchListListener RecordSequenceListener InfoListener ExistsArrayListener BatchOperateListListener))) + (:require [aerospike-clj.aerospike-record :as record] + [promesa.core :as p]) + (:import (com.aerospike.client AerospikeException AerospikeException$QueryTerminated Key Record) + (com.aerospike.client.listener BatchListListener DeleteListener + ExistsArrayListener ExistsListener InfoListener RecordListener RecordSequenceListener WriteListener) + (java.util List Map))) (deftype AsyncExistsListener [op-future] ExistsListener @@ -66,10 +66,3 @@ (p/reject! op-future ex)) (^void onSuccess [_this ^"[Lcom.aerospike.client.Key;" _keys ^"[Z" exists] (p/resolve! op-future exists))) - -(deftype AsyncBatchOperateListListener [op-future] - BatchOperateListListener - (^void onSuccess [_this ^List records ^boolean _status] - (p/resolve! op-future records)) - (^void onFailure [_this ^AerospikeException ex] - (p/reject! op-future ex))) diff --git a/src/main/clojure/aerospike_clj/policy.clj b/src/main/clojure/aerospike_clj/policy.clj index 2600e51..e0ef6b0 100644 --- a/src/main/clojure/aerospike_clj/policy.clj +++ b/src/main/clojure/aerospike_clj/policy.clj @@ -4,7 +4,7 @@ #_{:clj-kondo/ignore [:unused-import]} [com.aerospike.client.policy Policy ClientPolicy WritePolicy RecordExistsAction GenerationPolicy BatchPolicy CommitLevel - AuthMode ReadModeAP ReadModeSC Replica BatchWritePolicy])) + AuthMode ReadModeAP ReadModeSC Replica])) (defmacro set-java [obj conf obj-name] `(when (some? (get ~conf ~obj-name)) @@ -38,21 +38,6 @@ (set-java p conf "totalTimeout") p)) -(defn map->batch-write-policy - "Create a `BatchWritePolicy` from a map. Enumeration names should start with capitalized letter. - This function is slow due to possible reflection." - ^BatchWritePolicy [conf] - (let [p (BatchWritePolicy.)] - (set-java-enum p conf "RecordExistsAction") - (set-java-enum p conf "CommitLevel") - (set-java-enum p conf "GenerationPolicy") - (set-java p conf "filterExp") - (set-java p conf "generation") - (set-java p conf "expiration") - (set-java p conf "durableDelete") - (set-java p conf "sendKey") - p)) - (defn map->batch-policy "Create a `BatchPolicy` from a map. This function is slow due to possible reflection." @@ -92,18 +77,6 @@ (set! (.recordExistsAction wp) record-exists-action) wp))) -(defn batch-write-policy - "Create a write policy to be passed to put methods via `{:policy wp}`. - Also used in `update` and `create`. - The default policy in case the record exists is `RecordExistsAction/UPDATE`." - (^BatchWritePolicy [client expiration] - (batch-write-policy client expiration (RecordExistsAction/UPDATE))) - (^BatchWritePolicy [client expiration record-exists-action] - (let [wp (BatchWritePolicy. (.getBatchWritePolicyDefault ^AerospikeClient client))] - (set! (.expiration wp) expiration) - (set! (.recordExistsAction wp) record-exists-action) - wp))) - (defn set-policy "Create a write policy with UPDATE record exists action. in case of new entry, create it @@ -187,8 +160,6 @@ (set! (.readPolicyDefault cp) (get conf "readPolicyDefault" (map->policy conf))) (set! (.writePolicyDefault cp) (get conf "writePolicyDefault" (map->write-policy conf))) (set! (.batchPolicyDefault cp) (get conf "batchPolicyDefault" (map->batch-policy conf))) - (set! (.batchParentPolicyWriteDefault cp) (get conf "batchParentPolicyWriteDefault" (map->batch-policy conf))) - (set! (.batchWritePolicyDefault cp) (get conf "batchWritePolicyDefault" (map->batch-write-policy conf))) (set-java-enum cp conf "AuthMode") (set-java cp conf "clusterName") diff --git a/test/aerospike_clj/integration/integration_test.clj b/test/aerospike_clj/integration/integration_test.clj index 87c8f19..f39277d 100644 --- a/test/aerospike_clj/integration/integration_test.clj +++ b/test/aerospike_clj/integration/integration_test.clj @@ -5,12 +5,12 @@ [clojure.test :refer [deftest testing is use-fixtures]] [cheshire.core :as json] [aerospike-clj.integration.aerospike-setup :as as-setup] + [aerospike-clj.batch-policy :as batch-policy] [aerospike-clj.client :as client] [aerospike-clj.protocols :as pt] [aerospike-clj.policy :as policy] [aerospike-clj.key :as as-key] [aerospike-clj.utils :as utils] - [clojure.tools.logging :as log] [spy.core :as spy]) (:import (com.aerospike.client Value AerospikeClient BatchWrite Operation Bin) (com.aerospike.client.cdt ListOperation ListPolicy ListOrder ListWriteFlags ListReturnType @@ -652,13 +652,13 @@ (let [expression (Exp/build (Exp/ge (Exp/intBin "a") (Exp/intBin "b"))) c (client/init-simple-aerospike-client *as-hosts* as-namespace - {"batchWritePolicyDefault" (policy/map->batch-write-policy {"CommitLevel" "COMMIT_MASTER" - "durableDelete" true - "expiration" 1000 - "generation" 7 - "GenerationPolicy" "EXPECT_GEN_GT" - "RecordExistsAction" "REPLACE_ONLY" - "filterExp" expression}) + {"batchWritePolicyDefault" (batch-policy/map->batch-write-policy {"CommitLevel" "COMMIT_MASTER" + "durableDelete" true + "expiration" 1000 + "generation" 7 + "GenerationPolicy" "EXPECT_GEN_GT" + "RecordExistsAction" "REPLACE_ONLY" + "filterExp" expression}) "batchParentPolicyWriteDefault" (policy/map->batch-policy {"allowInline" false "maxConcurrentThreads" 2 "sendSetName" true})})