Skip to content

Commit

Permalink
add extra bytes for kafka payload (#26)
Browse files Browse the repository at this point in the history
* add extra bytes for kafka payload

* minor updates for hornbill
  • Loading branch information
Commelina authored Jul 15, 2024
1 parent 09beff0 commit 03445c7
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docker/docker-compose-hornbill.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ services:
jepsen-hornbill-network:
ipv4_address: 172.20.0.8
volumes:
- "../:/home/Work"
- "jepsen-hornbill-shared:/var/jepsen/shared"
- "/home/commelina/.m2/:/root/.m2"

hserver-1:
<< : *hserver-node
Expand Down
1 change: 0 additions & 1 deletion docker/docker-compose-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ services:
volumes:
- "../:/home/Work"
- "jepsen-kafka-shared:/var/jepsen/shared"
- "/home/commelina/.m2/:/root/.m2"

hstore-1:
<< : *logdevice-node
Expand Down
3 changes: 1 addition & 2 deletions scripts/build_hornbill.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ docker compose --file ./docker/docker-compose-hornbill.yml \
--build-arg USE_CHINA_MIRROR=false \
--build-arg arg_http_proxy="" \
--build-arg arg_https_proxy="" \
--build-arg BASE_IMAGE=jepsen-hornbill:base \
--build-arg HORNBILL_IMAGE=hstreamdb/hornbill:latest
--build-arg BASE_IMAGE=jepsen-hornbill:base
2 changes: 1 addition & 1 deletion scripts/build_hornbill_base.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/bash
docker build -t jepsen-hornbill:base \
--build-arg "USE_CHINA_MIRROR=false" \
--build-arg "HORNBILL_IMAGE=hornbill:dev" \
--build-arg "HORNBILL_IMAGE=ghcr.io/hstreamdb/hornbill:v1.0.0-m0" \
./docker/base-hornbill
72 changes: 68 additions & 4 deletions src/jepsen/hstream/kafka/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
map-vals
pprint-str]]
[slingshot.slingshot :refer [try+ throw+]])
(:import (java.time Duration)
(:import (java.nio ByteBuffer)
(java.time Duration)
(java.util Properties)
(java.util.concurrent ExecutionException)
(org.apache.kafka.clients.admin Admin
Expand Down Expand Up @@ -67,7 +68,8 @@
"org.apache.kafka.common.serialization.LongDeserializer"

ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG
"org.apache.kafka.common.serialization.LongDeserializer"
;"org.apache.kafka.common.serialization.LongDeserializer"
"org.apache.kafka.common.serialization.ByteArrayDeserializer"

ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG
(str node ":" port)
Expand Down Expand Up @@ -128,7 +130,8 @@

ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG
;"org.apache.kafka.common.serialization.StringSerializer"
"org.apache.kafka.common.serialization.LongSerializer"
;"org.apache.kafka.common.serialization.LongSerializer"
"org.apache.kafka.common.serialization.ByteArraySerializer"

ProducerConfig/DELIVERY_TIMEOUT_MS_CONFIG 10000
; We choose this lower than DELIVERY_TIMEOUT_MS so that we have a
Expand All @@ -145,6 +148,8 @@
ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 500
ProducerConfig/SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 1000

;;
ProducerConfig/LINGER_MS_CONFIG 1000
}
(not= nil (:acks opts))
(assoc ProducerConfig/ACKS_CONFIG (:acks opts))
Expand Down Expand Up @@ -253,10 +258,68 @@
[topic partition]
(TopicPartition. topic partition))

;;;;;; Helpers for adding/removing extra bytes ;;;;;;
(defn long-to-bytes [n]
(let [buffer (ByteBuffer/allocate (Long/BYTES))]
(.putLong buffer n)
(.array buffer)))

(defn bytes-to-long [byte-array]
(let [buffer (ByteBuffer/wrap byte-array)]
(.getLong buffer)))

;; FIXME: shit
(defn modify-list-elements [java-list f]
(if (nil? java-list)
(java.util.ArrayList.)
(if (= 0 (.size java-list))
java-list
(let [new-list (java.util.ArrayList. (.size java-list))]
(dotimes [i (.size java-list)]
(let [element (.get java-list i)
new-element (f element)]
(.add new-list i new-element)))
new-list))))

;; remove extra bytes from a 'ConsumerRecord'
(defn extract-value-from-consumer-record
[consumer-record]
(let [topic (.topic consumer-record)
partition (.partition consumer-record)
offset (.offset consumer-record)
timestamp (.timestamp consumer-record)
timestampType (.timestampType consumer-record)
serializedKeySize (.serializedKeySize consumer-record)
serializedValueSize (.serializedValueSize consumer-record)
key (.key consumer-record)
value (.value consumer-record)
headers (.headers consumer-record)
leaderEpoch (.leaderEpoch consumer-record)
actual-value (bytes-to-long (byte-array (take 8 value)))]
(ConsumerRecord. topic partition offset timestamp timestampType serializedKeySize serializedValueSize key actual-value headers leaderEpoch)))

;; remove extra bytes from a 'ConsumerRecords'
(defn recover-consumer-records
[consumer-records]
(let [new-map (java.util.HashMap.)
tp-set (.partitions consumer-records)
_ (doseq [tp (seq tp-set)]
(let [lst (.records consumer-records tp)
lst (modify-list-elements lst
extract-value-from-consumer-record)]
(.put new-map tp lst)))]
(ConsumerRecords. new-map)))
;;;

;; FIXME: Runtime configuration for extra bytes
(defn ^ProducerRecord producer-record
"Constructs a ProducerRecord from a topic, partition, key, and value."
[topic partition key value]
(ProducerRecord. topic (int partition) key value))
(let [extra-bytes-len (* 1 1024)
value-bytes (long-to-bytes value)
extra-bytes (byte-array extra-bytes-len (byte \A))
value-to-write (byte-array (+ extra-bytes-len 8) (concat value-bytes extra-bytes))]
(ProducerRecord. topic (int partition) key value-to-write)))

(defn ^OffsetAndMetadata offset+metadata
"Constructs an OffsetAndMetadata."
Expand Down Expand Up @@ -295,6 +358,7 @@
; immediately.
(when (pos? offset)
(let [records (.poll consumer duration)
records (recover-consumer-records records)
records (vec records)
last-record ^ConsumerRecord (peek records)]
;(info :poll-through-records offset records)
Expand Down
3 changes: 2 additions & 1 deletion src/jepsen/hstream/kafka/workload/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@
(case (first mop)
:poll (try
(rc/unwrap-errors
(let [records (.poll consumer (rc/ms->duration poll-ms))]
(let [records (.poll consumer (rc/ms->duration poll-ms))
records (rc/recover-consumer-records records)]
(->> (.partitions records)
(map (fn per-topic-partition [topic-partition]
; Return a key, messages pair
Expand Down

0 comments on commit 03445c7

Please sign in to comment.