Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Karthik/5614/generic offers parsing #5531

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
108 changes: 108 additions & 0 deletions ingest/offers/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package offers

import (
"github.com/guregu/null"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/utils"
"github.com/stellar/go/xdr"
)

// Constants for event types
const (
EventTypeOfferCreated = "OfferCreated"
EventTypeOfferFill = "OfferUpdated"
EventTypeOfferClosed = "OfferClosed"
)

// Base struct with common fields for all offer events.
type OfferEventData struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to just use xdr.OfferEntry instead of this because it contains the same data so there is no need to introduce another struct

SellerId string
OfferID int64
BuyingAsset xdr.Asset
SellingAsset xdr.Asset
RemainingAmount int64 // Remaining amount that still needs to be filled for this offer
PriceN int32
PriceD int32
Flags int32
IsPassive bool
LastModifiedLedger uint32
Sponsor null.String
}

type OfferEvent interface {
OfferEventType() string
}

type OfferCreatedEvent struct {
OfferEventData
}

// Method to get common event data
func (e OfferEventData) GetOfferData() OfferEventData {
return e
}

func (e OfferCreatedEvent) OfferEventType() string { return EventTypeOfferCreated }

type OfferFillEvent struct {
OfferEventData
FillAmount int64 // How much amount of the order was filled from last time
}

func (e OfferFillEvent) OfferEventType() string { return EventTypeOfferFill }

type OfferClosedEvent struct {
OfferEventData
CloseReason string
}

func (e OfferClosedEvent) OfferEventType() string { return EventTypeOfferClosed }

func populateOfferData(e *xdr.LedgerEntry) OfferEventData {
offer := e.Data.MustOffer()
return OfferEventData{
SellerId: offer.SellerId.Address(),
OfferID: int64(offer.OfferId),

BuyingAsset: offer.Buying,
SellingAsset: offer.Selling,
RemainingAmount: int64(offer.Amount),
PriceN: int32(offer.Price.N),
PriceD: int32(offer.Price.D),
Flags: int32(offer.Flags),
IsPassive: int32(offer.Flags) == int32(xdr.OfferEntryFlagsPassiveFlag),
LastModifiedLedger: uint32(e.LastModifiedLedgerSeq),
Sponsor: utils.LedgerEntrySponsorToNullString(*e),
}
}

func ProcessOffer(change ingest.Change) OfferEvent {
Copy link
Contributor Author

@karthikiyer56 karthikiyer56 Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ProcessOffer has been pulled out into the top level /ingest package.
now horizon and stellar-etl can leverage this ProcessOffer function and thus eliminate the code duplication.

Further down in this PR in offer_processor.go, I have updated the horizon code to use this common function
next step: make stellar-etl use this ProcesOffer function here Cc: @chowbao

if change.Type != xdr.LedgerEntryTypeOffer {
return nil
}
var o OfferEventData
var event OfferEvent

switch {
case change.Pre == nil && change.Post != nil:
// New offer
o = populateOfferData(change.Post)
event = OfferCreatedEvent{OfferEventData: o}

case change.Pre != nil && change.Post != nil:
// Order Fill
o = populateOfferData(change.Post)
fillAmt := int64(change.Pre.Data.MustOffer().Amount - change.Post.Data.MustOffer().Amount)
event = OfferFillEvent{OfferEventData: o, FillAmount: fillAmt}
//TODO: populate MatchingOrders field in OfferFillEvent

// Offer Fill
case change.Pre != nil && change.Post == nil:
// Offer Removed
o = populateOfferData(change.Pre)
event = OfferClosedEvent{OfferEventData: o}
//TODO: populate CloseReason field in OfferClosedEvent
}

return event
}
17 changes: 17 additions & 0 deletions ingest/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package utils

import (
"github.com/guregu/null"
"github.com/stellar/go/xdr"
)

func LedgerEntrySponsorToNullString(entry xdr.LedgerEntry) null.String {
sponsoringID := entry.SponsoringID()

var sponsor null.String
if sponsoringID != nil {
sponsor.SetValid((*sponsoringID).Address())
}

return sponsor
}
54 changes: 26 additions & 28 deletions services/horizon/internal/ingest/processors/offers_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package processors

import (
"context"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/offers"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// The offers processor can be configured to trim the offers table
Expand Down Expand Up @@ -37,54 +36,53 @@ func (p *OffersProcessor) reset() {
}

func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeOffer {
event := offers.ProcessOffer(change)
if event == nil {
return nil
}

switch {
case change.Pre == nil && change.Post != nil:
// Created
err := p.insertBatchBuilder.Add(p.ledgerEntryToRow(change.Post))
switch ev := event.(type) {
case offers.OfferCreatedEvent:
row := p.offerEventToRow(ev.OfferEventData)
err := p.insertBatchBuilder.Add(row)
if err != nil {
return errors.New("Error adding to OffersBatchInsertBuilder")
}
case change.Pre != nil && change.Post != nil:
// Updated
row := p.ledgerEntryToRow(change.Post)
case offers.OfferFillEvent:
row := p.offerEventToRow(ev.OfferEventData)
p.batchUpdateOffers = append(p.batchUpdateOffers, row)
case change.Pre != nil && change.Post == nil:
// Removed
row := p.ledgerEntryToRow(change.Pre)
case offers.OfferClosedEvent:
row := p.offerEventToRow(ev.OfferEventData)
row.Deleted = true
row.LastModifiedLedger = p.sequence
p.batchUpdateOffers = append(p.batchUpdateOffers, row)
default:
return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil")
}
return errors.New("Unknown offer event")

}
if p.insertBatchBuilder.Len()+len(p.batchUpdateOffers) > maxBatchSize {
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error in Commit")
}
}

return nil

}

func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer {
offer := entry.Data.MustOffer()
func (p *OffersProcessor) offerEventToRow(e offers.OfferEventData) history.Offer {
return history.Offer{
SellerID: offer.SellerId.Address(),
OfferID: int64(offer.OfferId),
SellingAsset: offer.Selling,
BuyingAsset: offer.Buying,
Amount: int64(offer.Amount),
Pricen: int32(offer.Price.N),
Priced: int32(offer.Price.D),
Price: float64(offer.Price.N) / float64(offer.Price.D),
Flags: int32(offer.Flags),
LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq),
Sponsor: ledgerEntrySponsorToNullString(*entry),
SellerID: e.SellerId,
OfferID: e.OfferID,
SellingAsset: e.SellingAsset,
BuyingAsset: e.BuyingAsset,
Amount: e.RemainingAmount,
Pricen: e.PriceN,
Priced: e.PriceD,
Price: float64(e.PriceN) / float64(e.PriceD),
Flags: e.Flags,
LastModifiedLedger: e.LastModifiedLedger,
Sponsor: e.Sponsor,
}
}

Expand Down
Loading