Skip to content

Commit

Permalink
ingest: batch endpoint: extract writeKey from body
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 20, 2023
1 parent 06e8d27 commit 9b471f2
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions ingest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ func (r *Router) CorsMiddleware(c *gin.Context) {
}

type BatchPayload struct {
Batch []AnalyticsServerEvent `json:"batch"`
Context map[string]any `json:"context"`
Batch []AnalyticsServerEvent `json:"batch"`
Context map[string]any `json:"context"`
WriteKey string `json:"writeKey"`
}

func (r *Router) SettingsHandler(c *gin.Context) {
Expand Down Expand Up @@ -225,7 +226,13 @@ func (r *Router) BatchHandler(c *gin.Context) {
rError = r.ResponseError(c, http.StatusBadRequest, "invalid content type", false, fmt.Errorf("%s. Expected: application/json", c.ContentType()), true)
return
}
loc, err := r.getDataLocator(c, nil, IngestTypeWriteKeyDefined)
err := json.NewDecoder(c.Request.Body).Decode(&payload)
if err != nil {
err = fmt.Errorf("Client Ip: %s: %v", utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), err)
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, err, true)
return
}
loc, err := r.getDataLocator(c, IngestTypeWriteKeyDefined, func() string { return payload.WriteKey })
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error processing message", false, err, true)
return
Expand All @@ -238,12 +245,6 @@ func (r *Router) BatchHandler(c *gin.Context) {
rError = r.ResponseError(c, http.StatusOK, "stream not found", false, fmt.Errorf("for: %+v", loc), true)
return
}
err = json.NewDecoder(c.Request.Body).Decode(&payload)
if err != nil {
err = fmt.Errorf("Client Ip: %s: %v", utils.NvlString(c.GetHeader("X-Real-Ip"), c.GetHeader("X-Forwarded-For"), c.ClientIP()), err)
rError = r.ResponseError(c, http.StatusOK, "error parsing message", false, err, true)
return
}
eventsLogId := stream.Stream.Id
okEvents := 0
errors := make([]string, 0)
Expand Down Expand Up @@ -380,7 +381,8 @@ func (r *Router) IngestHandler(c *gin.Context) {
messageId, _ := message["messageId"].(string)
messageId = utils.DefaultStringFunc(messageId, uuid.New)
c.Set(appbase.ContextMessageId, messageId)
loc, err := r.getDataLocator(c, &message, ingestType)
//func() string { wk, _ := message["writeKey"].(string); return wk }
loc, err := r.getDataLocator(c, ingestType, nil)
if err != nil {
rError = r.ResponseError(c, http.StatusOK, "error processing message", false, fmt.Errorf("%v: %s", err, string(body)), true)
return
Expand Down Expand Up @@ -505,7 +507,7 @@ func patchEvent(c *gin.Context, messageId string, event *AnalyticsServerEvent, t
return nil
}

func (r *Router) getDataLocator(c *gin.Context, event *AnalyticsServerEvent, ingestType IngestType) (cred StreamCredentials, err error) {
func (r *Router) getDataLocator(c *gin.Context, ingestType IngestType, writeKeyExtractor func() string) (cred StreamCredentials, err error) {
dataHosts := strings.Split(r.config.DataDomain, ",")
cred.IngestType = ingestType
if c.GetHeader("Authorization") != "" {
Expand All @@ -518,6 +520,8 @@ func (r *Router) getDataLocator(c *gin.Context, event *AnalyticsServerEvent, ing
cred.WriteKey = string(wkDecoded)
} else if c.GetHeader("X-Write-Key") != "" {
cred.WriteKey = c.GetHeader("X-Write-Key")
} else if writeKeyExtractor != nil {
cred.WriteKey = writeKeyExtractor()
}
host := strings.Split(c.Request.Host, ":")[0]
for _, dataHost := range dataHosts {
Expand Down

0 comments on commit 9b471f2

Please sign in to comment.