Skip to content

Commit

Permalink
Clear Id (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
dantudor authored Oct 22, 2020
1 parent 803b1e2 commit e424cb0
Show file tree
Hide file tree
Showing 34 changed files with 314 additions and 192 deletions.
29 changes: 24 additions & 5 deletions internal/elastic_cache/elastic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ func (i *Index) AddRequest(index string, entity explorer.Entity, reqType Request
"index": index,
"type": reqType,
"slug": entity.Slug(),
"id": entity.Id(),
}).Debugf("AddRequest")

if reqType == UpdateRequest && entity.Id() == "" {
reqType = IndexRequest
}

request := Request{
Index: index,
Entity: entity,
Expand All @@ -120,12 +125,16 @@ func (i *Index) AddRequest(index string, entity explorer.Entity, reqType Request

cached, found := i.cache.Get(entity.Slug())
if found == true {
logrus.WithField("persisted", cached.(Request).Persisted).Debugf("Found in cache %s: %s", cached.(Request).Index, cached.(Request).Entity.Slug())
if cached.(Request).Persisted == false && reqType == UpdateRequest {
logrus.WithFields(logrus.Fields{
"persisted": cached.(Request).Persisted,
"slug": cached.(Request).Entity.Slug(),
"id": cached.(Request).Entity.Id(),
}).Debug("Found in cache ", cached.(Request).Index)

if cached.(Request).Persisted == false && cached.(Request).Entity.Id() == "" && reqType == UpdateRequest {
logrus.Debugf("Switch update to index as not previously persisted %s", entity.Slug())
request.Type = IndexRequest
}
request.Persisted = false
}
i.cache.Set(entity.Slug(), request, cache.DefaultExpiration)
}
Expand Down Expand Up @@ -168,16 +177,24 @@ func (i *Index) BatchPersist(height uint64) bool {

logrus.Infof("Persisting data at height %d", height)
i.Persist()
if config.Get().Reindex == true && height == config.Get().BulkIndexSize {
logrus.Error("Stopping reindex at height:", height)
for {
switch {
}
}
}

return true
}

func (i *Index) Persist() int {
bulk := i.Client.Bulk()
for _, r := range i.GetPendingRequests() {
if r.Type == IndexRequest {
bulk.Add(elastic.NewBulkIndexRequest().Index(r.Index).Id(r.Entity.Slug()).Doc(r.Entity))
bulk.Add(elastic.NewBulkIndexRequest().Index(r.Index).Doc(r.Entity))
} else if r.Type == UpdateRequest {
bulk.Add(elastic.NewBulkUpdateRequest().Index(r.Index).Id(r.Entity.Slug()).Doc(r.Entity))
bulk.Add(elastic.NewBulkUpdateRequest().Index(r.Index).Id(r.Entity.Id()).Doc(r.Entity))
}
r.Persisted = true
i.cache.Set(r.Entity.Slug(), r, cache.DefaultExpiration)
Expand All @@ -192,6 +209,7 @@ func (i *Index) Persist() int {
}

func (i *Index) persist(bulk *elastic.BulkService) {
actions := bulk.NumberOfActions()
response, err := bulk.Do(context.Background())
if err != nil {
logrus.WithError(err).Fatal("Failed to persist requests")
Expand All @@ -206,6 +224,7 @@ func (i *Index) persist(bulk *elastic.BulkService) {
}
}
}
logrus.Infof("Persisted %d actions", actions)
}

func (i *Index) DeleteHeightGT(height uint64, indices ...string) error {
Expand Down
12 changes: 5 additions & 7 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func NewIndexer(
}

func (i *Indexer) BulkIndex() {
log.Debug("Subscribe to 0MQ")

if err := i.Index(IndexOption.BatchIndex); err != nil {
if err.Error() == "-8: Block height out of range" {
i.elastic.Persist()
Expand Down Expand Up @@ -103,31 +101,31 @@ func (i *Indexer) index(height uint64, option IndexOption.IndexOption) error {
go func() {
defer wg.Done()
start := time.Now()
i.addressIndexer.Index(txs, b)
i.addressIndexer.Index(b, txs)
elapsed := time.Since(start)
log.WithField("time", elapsed).Debugf("Indexed addresses at height %d", height)
log.WithField("time", elapsed).Infof("Indexed addresses at height %d", height)
}()

go func() {
defer wg.Done()
start := time.Now()
i.softForkIndexer.Index(b)
elapsed := time.Since(start)
log.WithField("time", elapsed).Debugf("Indexed softforks at height %d", height)
log.WithField("time", elapsed).Infof("Index softforks: %d", height)
}()

go func() {
defer wg.Done()
start := time.Now()
i.daoIndexer.Index(b, txs, header)
elapsed := time.Since(start)
log.WithField("time", elapsed).Debugf("Indexed dao at height %d", height)
log.WithField("time", elapsed).Infof("Index dao: %d", height)
}()

wg.Wait()

elapsed := time.Since(start)
log.WithField("time", elapsed).Debugf("Indexed block at height %d", height)
log.WithField("time", elapsed).Infof("Index block: %d", height)
log.Debugf("")

LastBlockIndexed = height
Expand Down
2 changes: 1 addition & 1 deletion internal/service/address/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func CreateAddress(hash string, height uint64, time time.Time) *explorer.Address
return &explorer.Address{Hash: hash, CreatedBlock: height, CreatedTime: time}
}

func CreateAddressHistory(history *navcoind.AddressHistory, tx *explorer.BlockTransaction, block *explorer.Block) *explorer.AddressHistory {
func CreateAddressHistory(history *navcoind.AddressHistory, tx *explorer.BlockTransaction) *explorer.AddressHistory {
h := &explorer.AddressHistory{
Height: history.Block,
TxIndex: history.TxIndex,
Expand Down
18 changes: 9 additions & 9 deletions internal/service/address/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func NewIndexer(navcoin *navcoind.Navcoind, elastic *elastic_cache.Index, repo *
return &Indexer{navcoin, elastic, repo}
}

func (i *Indexer) Index(txs []*explorer.BlockTransaction, block *explorer.Block) {
func (i *Indexer) Index(block *explorer.Block, txs []*explorer.BlockTransaction) {
if len(txs) == 0 {
return
}

for _, addressHistory := range i.generateAddressHistory(block, txs) {
for _, addressHistory := range i.generateAddressHistory(&block.Height, &block.Height, txs) {
i.elastic.AddIndexRequest(elastic_cache.AddressHistoryIndex.Get(), addressHistory)

err := i.updateAddress(addressHistory, block)
Expand All @@ -32,18 +32,18 @@ func (i *Indexer) Index(txs []*explorer.BlockTransaction, block *explorer.Block)
}
}

func (i *Indexer) generateAddressHistory(block *explorer.Block, txs []*explorer.BlockTransaction) []*explorer.AddressHistory {
func (i *Indexer) generateAddressHistory(start, end *uint64, txs []*explorer.BlockTransaction) []*explorer.AddressHistory {
addressHistory := make([]*explorer.AddressHistory, 0)

addresses := getAddressesForTxs(txs)
history, err := i.navcoin.GetAddressHistory(&block.Height, &block.Height, addresses...)
history, err := i.navcoin.GetAddressHistory(start, end, addresses...)
if err != nil {
log.WithError(err).Errorf("Could not get address history for height: %d", block.Height)
log.WithError(err).Errorf("Could not get address history for height: %d-%d", start, end)
return addressHistory
}

for _, h := range history {
addressHistory = append(addressHistory, CreateAddressHistory(h, getTxsById(h.TxId, txs), block))
addressHistory = append(addressHistory, CreateAddressHistory(h, getTxById(h.TxId, txs)))
}

return addressHistory
Expand Down Expand Up @@ -81,12 +81,12 @@ func getAddressesForTxs(txs []*explorer.BlockTransaction) []string {
return addresses
}

func getTxsById(txid string, txs []*explorer.BlockTransaction) *explorer.BlockTransaction {
func getTxById(id string, txs []*explorer.BlockTransaction) *explorer.BlockTransaction {
for _, tx := range txs {
if tx.Txid == txid {
if tx.Txid == id {
return tx
}
}
log.Fatal("Could not match tx")
log.Fatal("Could not match tx: ", id)
return nil
}
123 changes: 65 additions & 58 deletions internal/service/address/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"github.com/NavExplorer/navexplorer-indexer-go/v2/internal/elastic_cache"
"github.com/NavExplorer/navexplorer-indexer-go/v2/pkg/explorer"
"github.com/getsentry/raven-go"
"github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
"strings"
Expand All @@ -24,110 +23,84 @@ func NewRepo(Client *elastic.Client) *Repository {
return &Repository{Client}
}

func (r *Repository) GetAddress(hash string) (*explorer.Address, error) {
log.Debugf("GetAddress(hash:%s)", hash)

results, err := r.Client.Search(elastic_cache.AddressIndex.Get()).
Query(elastic.NewTermQuery("hash.keyword", hash)).
func (r *Repository) GetBestHeight() (uint64, error) {
result, err := r.Client.Search(elastic_cache.AddressIndex.Get()).
Sort("height", false).
Size(1).
Do(context.Background())
if err != nil {
raven.CaptureError(err, nil)
return nil, err
return 0, err
}

if results == nil || len(results.Hits.Hits) != 1 {
return nil, errors.New("Invalid result found")
address, err := r.findOneAddress(result)
if err != nil {
return 0, err
}

var address *explorer.Address
if err = json.Unmarshal(results.Hits.Hits[0].Source, &address); err != nil {
return address.Height, nil
}

func (r *Repository) GetAddress(hash string) (*explorer.Address, error) {
result, err := r.Client.Search(elastic_cache.AddressIndex.Get()).
Query(elastic.NewTermQuery("hash.keyword", hash)).
Size(1).
Do(context.Background())
if err != nil {
return nil, err
}

return address, nil
return r.findOneAddress(result)
}

func (r *Repository) GetAddresses(hashes []string) ([]*explorer.Address, error) {
log.Debugf("GetAddresses([%s])", strings.Join(hashes, ","))

results, err := r.Client.Search(elastic_cache.AddressIndex.Get()).
result, err := r.Client.Search(elastic_cache.AddressIndex.Get()).
Query(elastic.NewMatchQuery("hash", strings.Join(hashes, " "))).
Size(len(hashes)).
Do(context.Background())
if err != nil || results == nil {
if err != nil {
return nil, err
}

addresses := make([]*explorer.Address, 0)
for _, hit := range results.Hits.Hits {
var address *explorer.Address
if err = json.Unmarshal(hit.Source, &address); err != nil {
return nil, err
}
addresses = append(addresses, address)
}

return addresses, nil
return r.findManyAddress(result)
}

func (r *Repository) GetAddressesHeightGt(height uint64) ([]*explorer.Address, error) {
log.Debugf("GetAddressesHeightGt(height:%d)", height)

results, err := r.Client.
result, err := r.Client.
Search(elastic_cache.AddressIndex.Get()).
Query(elastic.NewRangeQuery("height").Gt(height)).
Size(50000).
Do(context.Background())
if err != nil || results == nil {
if err != nil {
return nil, err
}

addresses := make([]*explorer.Address, 0)
for _, hit := range results.Hits.Hits {
var address *explorer.Address
if err = json.Unmarshal(hit.Source, &address); err != nil {
return nil, err
}
addresses = append(addresses, address)
}

return addresses, nil
return r.findManyAddress(result)
}

func (r *Repository) GetOrCreateAddress(hash string, block *explorer.Block) (*explorer.Address, error) {
log.WithField("address", hash).Debug("GetOrCreateAddress")

results, err := r.Client.
result, err := r.Client.
Search(elastic_cache.AddressIndex.Get()).
Query(elastic.NewMatchQuery("hash", hash)).
Size(1).
Query(elastic.NewTermQuery("hash.keyword", hash)).
Do(context.Background())
if err != nil || results == nil {
if err != nil {
return nil, err
}

var address *explorer.Address
if results.TotalHits() == 0 {
address = CreateAddress(hash, block.Height, block.MedianTime)
_, err := r.Client.
Index().
if result.TotalHits() == 0 {
address := CreateAddress(hash, block.Height, block.MedianTime)
result, err := r.Client.Index().
Index(elastic_cache.AddressIndex.Get()).
Id(address.Slug()).
BodyJson(address).
Do(context.Background())
if err != nil {
return nil, err
}
address.SetId(result.Id)

return address, nil
}

if err = json.Unmarshal(results.Hits.Hits[0].Source, &address); err != nil {
return nil, err
}

return address, nil
return r.findOneAddress(result)
}

func (r *Repository) GetLatestHistoryByHash(hash string) (*explorer.AddressHistory, error) {
Expand All @@ -151,6 +124,40 @@ func (r *Repository) GetLatestHistoryByHash(hash string) (*explorer.AddressHisto
if err != nil {
return nil, err
}
history.SetId(hit.Id)

return history, err
}

func (r *Repository) findOneAddress(result *elastic.SearchResult) (*explorer.Address, error) {
if result == nil || len(result.Hits.Hits) != 1 {
return nil, errors.New("Invalid result")
}

var address *explorer.Address
hit := result.Hits.Hits[0]
if err := json.Unmarshal(hit.Source, &address); err != nil {
return nil, err
}
address.SetId(hit.Id)

return address, nil
}

func (r *Repository) findManyAddress(result *elastic.SearchResult) ([]*explorer.Address, error) {
if result == nil {
return nil, errors.New("Invalid result")
}

addresses := make([]*explorer.Address, 0)
for _, hit := range result.Hits.Hits {
var address *explorer.Address
if err := json.Unmarshal(hit.Source, &address); err != nil {
return nil, err
}
address.SetId(hit.Id)
addresses = append(addresses, address)
}

return addresses, nil
}
4 changes: 2 additions & 2 deletions internal/service/address/rewinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (r *Rewinder) ResetAddress(address *explorer.Address) error {

_, err = r.elastic.Client.Index().
Index(elastic_cache.AddressIndex.Get()).
Id(address.Id()).
BodyJson(address).
Id(address.Slug()).
Do(context.Background())

return nil
return err
}
Loading

0 comments on commit e424cb0

Please sign in to comment.