Skip to content

Commit

Permalink
Bump rocksdb to 7.5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
kingster committed Sep 6, 2022
2 parents 19ed5ac + b56e4fe commit bb6d17d
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 113 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN apk add --update --no-cache linux-headers git make cmake gcc g++ musl musl-d
RUN apk add --update --no-cache zlib zlib-dev zlib-static bzip2 bzip2-dev bzip2-static snappy snappy-dev snappy-static lz4 lz4-dev lz4-static gflags-dev zstd zstd-dev zstd-static

# Install RocksDB
RUN git clone --branch v6.22.1 --depth 1 https://github.com/facebook/rocksdb.git && \
RUN git clone --branch v7.5.3 --depth 1 https://github.com/facebook/rocksdb.git && \
cd rocksdb && \
# Fix 'install -c' flag
sed -i 's/install -C/install -c/g' Makefile && \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ require (
github.com/dgraph-io/badger/v3 v3.2103.1
github.com/dgraph-io/ristretto v0.1.0
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/flipkart-incubator/gorocksdb v0.0.0-20210920082714-1f7dcbb7b2e4
github.com/flipkart-incubator/nexus v0.0.0-20220725092354-3772bb325062
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/kpango/fastime v1.0.16
github.com/linxGnu/grocksdb v1.7.7
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,9 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws=
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0=
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64=
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A=
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg=
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk=
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flipkart-incubator/gorocksdb v0.0.0-20210920082714-1f7dcbb7b2e4 h1:9zPLm5QKcBp5xUOBMWhRZPU+iwfl6xJtbSmbazmgy2g=
github.com/flipkart-incubator/gorocksdb v0.0.0-20210920082714-1f7dcbb7b2e4/go.mod h1:kvJSXc90Ifw0rxuTxEHKq6UH/7hQ/gd9RKCyD94ctJ0=
github.com/flipkart-incubator/nexus v0.0.0-20220725092354-3772bb325062 h1:n5cwpgmvZk+YOTBGIgTaK1OQeFVdM550ZDdKOd778hw=
github.com/flipkart-incubator/nexus v0.0.0-20220725092354-3772bb325062/go.mod h1:i5sm5XII1dOXLLjrwLmsHpZ5oHfDI328uCvtm6fhyAg=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -232,6 +224,8 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/linxGnu/grocksdb v1.7.7 h1:b6o8gagb4FL+P55qUzPchBR/C0u1lWjJOWQSWbhvTWg=
github.com/linxGnu/grocksdb v1.7.7/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34=
github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
Expand Down Expand Up @@ -335,13 +329,16 @@ github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk=
github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
Expand Down Expand Up @@ -439,5 +436,6 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
6 changes: 3 additions & 3 deletions internal/storage/rocksdb/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package rocksdb

import (
"github.com/flipkart-incubator/gorocksdb"
"github.com/linxGnu/grocksdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -11,7 +11,7 @@ type rocksDBCollector struct {
memTableUnflushedGauge *prometheus.Desc
memTableReadersTotalGauge *prometheus.Desc
cacheTotalGauge *prometheus.Desc
db *gorocksdb.DB
db *grocksdb.DB
lgr *zap.Logger
}

Expand Down Expand Up @@ -51,7 +51,7 @@ func (collector *rocksDBCollector) Describe(ch chan<- *prometheus.Desc) {

//Collect implements required collect function for all promehteus collectors
func (collector *rocksDBCollector) Collect(ch chan<- prometheus.Metric) {
memoryUsage, err := gorocksdb.GetApproximateMemoryUsageByType([]*gorocksdb.DB{collector.db}, nil)
memoryUsage, err := grocksdb.GetApproximateMemoryUsageByType([]*grocksdb.DB{collector.db}, nil)
if err != nil {
collector.lgr.Error("Failed to get rocksgb memory usage", zap.Error(err))
} else {
Expand Down
104 changes: 56 additions & 48 deletions internal/storage/rocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/flipkart-incubator/dkv/internal/stats"
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/dkv/pkg/serverpb"
"github.com/flipkart-incubator/gorocksdb"
"github.com/linxGnu/grocksdb"
"go.uber.org/zap"
"gopkg.in/ini.v1"
)
Expand All @@ -37,10 +37,10 @@ type DB interface {
}

type rocksDB struct {
db *gorocksdb.DB
normalCF *gorocksdb.ColumnFamilyHandle
ttlCF *gorocksdb.ColumnFamilyHandle
optimTrxnDB *gorocksdb.OptimisticTransactionDB
db *grocksdb.DB
normalCF *grocksdb.ColumnFamilyHandle
ttlCF *grocksdb.ColumnFamilyHandle
optimTrxnDB *grocksdb.OptimisticTransactionDB
opts *rocksDBOpts
stat *storage.Stat

Expand All @@ -50,11 +50,11 @@ type rocksDB struct {
}

type rocksDBOpts struct {
readOpts *gorocksdb.ReadOptions
writeOpts *gorocksdb.WriteOptions
blockTableOpts *gorocksdb.BlockBasedTableOptions
rocksDBOpts *gorocksdb.Options
restoreOpts *gorocksdb.RestoreOptions
readOpts *grocksdb.ReadOptions
writeOpts *grocksdb.WriteOptions
blockTableOpts *grocksdb.BlockBasedTableOptions
rocksDBOpts *grocksdb.Options
restoreOpts *grocksdb.RestoreOptions
folderName string
sstDirectory string
lgr *zap.Logger
Expand Down Expand Up @@ -120,7 +120,7 @@ func WithSSTDir(sstDir string) DBOption {
func WithCacheSize(size uint64) DBOption {
return func(opts *rocksDBOpts) {
if size > 0 {
opts.blockTableOpts.SetBlockCache(gorocksdb.NewLRUCache(size))
opts.blockTableOpts.SetBlockCache(grocksdb.NewLRUCache(size))
} else {
opts.blockTableOpts.SetNoBlockCache(true)
}
Expand All @@ -141,7 +141,7 @@ func WithRocksDBConfig(iniFile string) DBOption {
for key, val := range sectConf {
fmt.Fprintf(&buff, "%s=%s;", key, val)
}
if rdbOpts, err := gorocksdb.GetOptionsFromString(opts.rocksDBOpts, buff.String()); err != nil {
if rdbOpts, err := grocksdb.GetOptionsFromString(opts.rocksDBOpts, buff.String()); err != nil {
panic(fmt.Errorf("unable to parge RocksDB configuration from given file: %s, error: %v", iniFile, err))
} else {
opts.rocksDBOpts = rdbOpts
Expand Down Expand Up @@ -184,16 +184,24 @@ func (m *ttlCompactionFilter) Filter(level int, key, val []byte) (remove bool, n
return false, nil
}

func (m *ttlCompactionFilter) SetIgnoreSnapshots(value bool) {

}

func (m *ttlCompactionFilter) Destroy() {

}

func newOptions(dbFolder string) *rocksDBOpts {
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
opts := gorocksdb.NewDefaultOptions()
bbto := grocksdb.NewDefaultBlockBasedTableOptions()
opts := grocksdb.NewDefaultOptions()
opts.SetCreateIfMissing(true)
opts.SetCreateIfMissingColumnFamilies(true)
opts.SetWALTtlSeconds(uint64(600))
opts.SetBlockBasedTableFactory(bbto)
rstOpts := gorocksdb.NewRestoreOptions()
wrOpts := gorocksdb.NewDefaultWriteOptions()
rdOpts := gorocksdb.NewDefaultReadOptions()
rstOpts := grocksdb.NewRestoreOptions()
wrOpts := grocksdb.NewDefaultWriteOptions()
rdOpts := grocksdb.NewDefaultReadOptions()
cfNames := []string{"default", "ttl"}
return &rocksDBOpts{
folderName: dbFolder,
Expand All @@ -219,19 +227,19 @@ func (rdbOpts *rocksDBOpts) destroy() {

func openStore(opts *rocksDBOpts) (*rocksDB, error) {
normalOpts := opts.rocksDBOpts
ttlOpts, err := gorocksdb.GetOptionsFromString(normalOpts, "")
ttlOpts, err := grocksdb.GetOptionsFromString(normalOpts, "")
if err != nil {
return nil, err
}
ttlOpts.SetCompactionFilter(&ttlCompactionFilter{opts.lgr})
optimTrxnDB, cfh, err := gorocksdb.OpenOptimisticTransactionDbColumnFamilies(opts.rocksDBOpts,
opts.folderName, opts.cfNames, []*gorocksdb.Options{normalOpts, ttlOpts})
optimTrxnDB, cfh, err := grocksdb.OpenOptimisticTransactionDbColumnFamilies(opts.rocksDBOpts,
opts.folderName, opts.cfNames, []*grocksdb.Options{normalOpts, ttlOpts})
if err != nil {
return nil, err
}

rocksdb := rocksDB{
db: optimTrxnDB.GetBaseDb(),
db: optimTrxnDB.GetBaseDB(),
normalCF: cfh[0],
ttlCF: cfh[1],
optimTrxnDB: optimTrxnDB,
Expand Down Expand Up @@ -259,7 +267,7 @@ func (rdb *rocksDB) Compaction() error {
case <-tick:
// trigger a compaction
rdb.opts.lgr.Info("Triggering RocksDB Compaction")
rdb.db.CompactRangeCF(rdb.ttlCF, gorocksdb.Range{nil, nil})
rdb.db.CompactRangeCF(rdb.ttlCF, grocksdb.Range{nil, nil})
}
}
return nil
Expand Down Expand Up @@ -314,7 +322,7 @@ func (rdb *rocksDB) Put(pairs ...*serverpb.KVPair) error {
defer rdb.opts.statsCli.Timing(metricsPrefix+".latency.ms", time.Now())
defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(metricsLabel), time.Now())

wb := gorocksdb.NewWriteBatch()
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
for _, kv := range pairs {
if kv == nil {
Expand Down Expand Up @@ -348,7 +356,7 @@ func (rdb *rocksDB) Delete(key []byte) error {
defer rdb.opts.statsCli.Timing("rocksdb.delete.latency.ms", time.Now())
defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Delete), time.Now())

wb := gorocksdb.NewWriteBatch()
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
wb.DeleteCF(rdb.ttlCF, key)
wb.Delete(key)
Expand Down Expand Up @@ -376,7 +384,7 @@ func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) {

ro := rdb.opts.readOpts
wo := rdb.opts.writeOpts
to := gorocksdb.NewDefaultOptimisticTransactionOptions()
to := grocksdb.NewDefaultOptimisticTransactionOptions()
txn := rdb.optimTrxnDB.TransactionBegin(wo, to, nil)
defer txn.Destroy()

Expand Down Expand Up @@ -416,11 +424,11 @@ const (
snapshotLogSizeForFlush = 0
)

func (rdb *rocksDB) generateSST(snap *gorocksdb.Snapshot, cf *gorocksdb.ColumnFamilyHandle, sstDir string) (*os.File, error) {
func (rdb *rocksDB) generateSST(snap *grocksdb.Snapshot, cf *grocksdb.ColumnFamilyHandle, sstDir string) (*os.File, error) {
var fileName string
envOpts := gorocksdb.NewDefaultEnvOptions()
opts := gorocksdb.NewDefaultOptions()
sstWrtr := gorocksdb.NewSSTFileWriter(envOpts, opts)
envOpts := grocksdb.NewDefaultEnvOptions()
opts := grocksdb.NewDefaultOptions()
sstWrtr := grocksdb.NewSSTFileWriter(envOpts, opts)
defer sstWrtr.Destroy()

if fileName = sstDir + sstDefaultCF; cf == rdb.ttlCF {
Expand All @@ -433,7 +441,7 @@ func (rdb *rocksDB) generateSST(snap *gorocksdb.Snapshot, cf *gorocksdb.ColumnFa
}

// TODO: Any options need to be set
readOpts := gorocksdb.NewDefaultReadOptions()
readOpts := grocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
readOpts.SetSnapshot(snap)

Expand Down Expand Up @@ -586,7 +594,7 @@ func (rdb *rocksDB) BackupTo(folder string) error {

// Retain only the latest backup in the given folder
defer be.PurgeOldBackups(1)
return be.CreateNewBackupFlush(rdb.db, true)
return be.CreateNewBackupFlush(true)
}

const tempDirPrefix = "rocksdb-restore-"
Expand Down Expand Up @@ -675,7 +683,7 @@ func (rdb *rocksDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error

appldChngNum := uint64(0)
for _, chng := range changes {
wb := gorocksdb.WriteBatchFrom(chng.SerialisedForm)
wb := grocksdb.WriteBatchFrom(chng.SerialisedForm)
defer wb.Destroy()
err := rdb.db.Write(rdb.opts.writeOpts, wb)
if err != nil {
Expand All @@ -689,11 +697,11 @@ func (rdb *rocksDB) SaveChanges(changes []*serverpb.ChangeRecord) (uint64, error

type iter struct {
iterOpts storage.IterationOptions
rdbIter *gorocksdb.Iterator
rdbIter *grocksdb.Iterator
ttlCF bool
}

func (rdb *rocksDB) newIterCF(readOpts *gorocksdb.ReadOptions, iterOpts storage.IterationOptions, cf *gorocksdb.ColumnFamilyHandle) *iter {
func (rdb *rocksDB) newIterCF(readOpts *grocksdb.ReadOptions, iterOpts storage.IterationOptions, cf *grocksdb.ColumnFamilyHandle) *iter {
it := rdb.db.NewIteratorCF(readOpts, cf)
if sk, present := iterOpts.StartKey(); present {
it.Seek(sk)
Expand Down Expand Up @@ -769,7 +777,7 @@ func (rdb *rocksDB) Iterate(iterOpts storage.IterationOptions) storage.Iterator
return iterators.Concat(baseIter, ttlIter)
}

func (rdb *rocksDB) toChangeRecord(writeBatch *gorocksdb.WriteBatch, changeNum uint64) *serverpb.ChangeRecord {
func (rdb *rocksDB) toChangeRecord(writeBatch *grocksdb.WriteBatch, changeNum uint64) *serverpb.ChangeRecord {
chngRec := &serverpb.ChangeRecord{}
chngRec.ChangeNumber = changeNum
dataBts := writeBatch.Data()
Expand All @@ -786,21 +794,21 @@ func (rdb *rocksDB) toChangeRecord(writeBatch *gorocksdb.WriteBatch, changeNum u
return chngRec
}

func (rdb *rocksDB) openBackupEngine(folder string) (*gorocksdb.BackupEngine, error) {
func (rdb *rocksDB) openBackupEngine(folder string) (*grocksdb.BackupEngine, error) {
opts := rdb.opts.rocksDBOpts
return gorocksdb.OpenBackupEngine(opts, folder)
return grocksdb.OpenBackupEngine(opts, folder)
}

func (rdb *rocksDB) toTrxnRecord(wbr *gorocksdb.WriteBatchRecord) *serverpb.TrxnRecord {
func (rdb *rocksDB) toTrxnRecord(wbr *grocksdb.WriteBatchRecord) *serverpb.TrxnRecord {
trxnRec := &serverpb.TrxnRecord{}
switch wbr.Type {
case gorocksdb.WriteBatchCFDeletionRecord:
case grocksdb.WriteBatchCFDeletionRecord:
trxnRec.Type = serverpb.TrxnRecord_Delete
case gorocksdb.WriteBatchDeletionRecord:
case grocksdb.WriteBatchDeletionRecord:
trxnRec.Type = serverpb.TrxnRecord_Delete
case gorocksdb.WriteBatchValueRecord:
case grocksdb.WriteBatchValueRecord:
trxnRec.Type = serverpb.TrxnRecord_Put
case gorocksdb.WriteBatchCFValueRecord:
case grocksdb.WriteBatchCFValueRecord:
trxnRec.Type = serverpb.TrxnRecord_Put
default:
trxnRec.Type = serverpb.TrxnRecord_Unknown
Expand All @@ -824,7 +832,7 @@ func byteArrayCopy(src []byte, dstLen int) []byte {
return dst
}

func toByteArray(value *gorocksdb.Slice) []byte {
func toByteArray(value *grocksdb.Slice) []byte {
src := value.Data()
res := byteArrayCopy(src, value.Size())
return res
Expand All @@ -839,11 +847,11 @@ func parseTTLMsgPackData(valueWithTTL []byte) (*ttlDataFormat, error) {
return &row, err
}

func (rdb *rocksDB) getSingleKey(ro *gorocksdb.ReadOptions, key []byte) ([]*serverpb.KVPair, error) {
func (rdb *rocksDB) getSingleKey(ro *grocksdb.ReadOptions, key []byte) ([]*serverpb.KVPair, error) {
defer rdb.opts.statsCli.Timing("rocksdb.single.get.latency.ms", time.Now())
defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.Get), time.Now())

values, err := rdb.db.MultiGetCFMultiCF(ro, []*gorocksdb.ColumnFamilyHandle{rdb.normalCF, rdb.ttlCF}, [][]byte{key, key})
values, err := rdb.db.MultiGetCFMultiCF(ro, []*grocksdb.ColumnFamilyHandle{rdb.normalCF, rdb.ttlCF}, [][]byte{key, key})
if err != nil {
rdb.opts.statsCli.Incr("rocksdb.single.get.errors", 1)
rdb.stat.ResponseError.WithLabelValues(stats.Get).Inc()
Expand All @@ -859,7 +867,7 @@ func (rdb *rocksDB) getSingleKey(ro *gorocksdb.ReadOptions, key []byte) ([]*serv
return nil, nil
}

func (rdb *rocksDB) extractResult(value1 *gorocksdb.Slice, value2 *gorocksdb.Slice, key []byte) *serverpb.KVPair {
func (rdb *rocksDB) extractResult(value1 *grocksdb.Slice, value2 *grocksdb.Slice, key []byte) *serverpb.KVPair {
if value1.Size() > 0 {
//non ttl use-case
val := toByteArray(value1)
Expand Down Expand Up @@ -887,12 +895,12 @@ func (rdb *rocksDB) extractResult(value1 *gorocksdb.Slice, value2 *gorocksdb.Sli
return nil
}

func (rdb *rocksDB) getMultipleKeys(ro *gorocksdb.ReadOptions, keys [][]byte) ([]*serverpb.KVPair, error) {
func (rdb *rocksDB) getMultipleKeys(ro *grocksdb.ReadOptions, keys [][]byte) ([]*serverpb.KVPair, error) {
defer rdb.opts.statsCli.Timing("rocksdb.multi.get.latency.ms", time.Now())
defer stats.MeasureLatency(rdb.stat.RequestLatency.WithLabelValues(stats.MultiGet), time.Now())

kl := len(keys)
reqCFs := make([]*gorocksdb.ColumnFamilyHandle, kl<<1)
reqCFs := make([]*grocksdb.ColumnFamilyHandle, kl<<1)
for i := 0; i < kl; i++ {
reqCFs[i] = rdb.normalCF
reqCFs[i+kl] = rdb.ttlCF
Expand Down
Loading

0 comments on commit bb6d17d

Please sign in to comment.