Skip to content

Commit

Permalink
adding initial copies of the code
Browse files Browse the repository at this point in the history
  • Loading branch information
epsniff committed Mar 18, 2018
1 parent c4303ca commit 63293b3
Show file tree
Hide file tree
Showing 9 changed files with 894 additions and 0 deletions.
110 changes: 110 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Inflight - primitives for coordinating interdependent operations in distrubuted systems

The package inflight provides primitives(data strutures) for managing inflight operations that
are being processed in a distrubuted system.

## CallGroup

CallGroup spawns off a group of operations for each call to `Add()` and
calls the `CallGroupCompletion` func when the last operation have
completed. The CallGroupCompletion func can be thought of as a finalizer where
one can gather errors and/or results from the function calls.

Example Usage:
```go
package main

import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/lytics/inflight"
)

func main() {
data := []string{"1:2", "5:6:7", "1:2", "5:6:7"}
total := int64(0)
wg := sync.WaitGroup{}
wg.Add(1)
cg := inflight.NewCallGroup(func(results map[inflight.ID]*inflight.Response) {
for _, res := range results {
subtotal := res.Result.(int)
atomic.AddInt64(&(total), int64(subtotal))
}
wg.Done()
})

startingLine := sync.WaitGroup{}
startingLine.Add(1) // block all go routines until the loop has finished spinning them up. Otherwise we have a race.
//Spawn off the workers.
for id, entry := range data {
op := cg.Add(uint64(id), entry)
go func(op *inflight.Op) {
startingLine.Wait() //wait here until signaled to start.
str := op.Msg.(string)
subtotal := 0
for _, val := range strings.Split(str, ":") {
i, _ := strconv.ParseInt(val, 10, 64)
subtotal += int(i)
}
op.Finish(nil, subtotal)
}(op)
}
startingLine.Done() // drop the checkered flag and signal all the workers to begin.

//wait for the completion function to finish.
wg.Wait()
totalVal := atomic.LoadInt64(&(total))
if totalVal != 42 {
// total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42
fmt.Printf("total not equal 42, got:%v \n", totalVal)
}
//total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42
fmt.Printf("got the expected amount of %v\n", total)
}
```


## Opqueue

OpQueue is a thread-safe duplicate operation suppression queue, that combines
duplicate operations (queue entires) into sets that will be dequeued togather.

For example, If you enqueue an item with a key that already exists, then that
item will be appended to that key's set of items. Otherwise the item is
inserted into the head of the list as a new item.

On Dequeue a SET is returned of all items that share a key in the queue.
It blocks on dequeue if the queue is empty, but returns an error if the
queue is full during enqueue.

```
+------------Width------------>
+ +-----+
| |ID |
| |923 |
| +-----+
| |
| |
| v
Height | +-----+ +-----+ +-----+
| |ID +---->ID +---->ID |
| |424 | |424 | |424 |
| +-----+ +-----+ +-----+
| |
| |
| v
| +-----+
| |ID |
| |99 |
| +-----+
v
```





109 changes: 109 additions & 0 deletions callgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package inflight

import (
"sync"
)

type ID uint64

// CallGroup spawns off a group of operations for each call to Add() and
// calls the CallGroupCompletion func when the last operation have
// completed. The CallGroupCompletion func can be thought of as a finalizer where
// one can gather errors and/or results from the function calls.
//
// Call Add for all our inflight tasks before calling the first
// call to Finish. Once the last task finishes and the CallGroupCompletion
// is triggered, all future calls to Add will be ignored and orphaned.
//
type CallGroup struct {
mu sync.Mutex

cgcOnce sync.Once
callGroupCompletion CallGroupCompletion

outstandingOps map[ID]*Op
finalState map[ID]*Response
}

// NewCallGroup return a new CallGroup.
// Takes a CallGroupCompletion func as an argument, which will be called when the last Op in
// the CallGroup has called Finish.
//
// In a way a CallGroup is like a Mapper-Reducer in other framworks, with
// the Ops being mapped out to workers and the CallGroupCompletion being the reducer step.
func NewCallGroup(cgc CallGroupCompletion) *CallGroup {
return &CallGroup{
outstandingOps: map[ID]*Op{},
finalState: map[ID]*Response{},
callGroupCompletion: cgc,
cgcOnce: sync.Once{},
}
}

func (cg *CallGroup) Add(k uint64, msg interface{}) *Op {
key := ID(k)

op := &Op{
cg: cg,
Key: key,
Msg: msg,
}

cg.mu.Lock()
defer cg.mu.Unlock()

cg.outstandingOps[key] = op

return op
}

//Used to by the package to extract the active ops for this callgroup.
func (cg *CallGroup) ops() map[ID]*Op {
return cg.outstandingOps
}

func (cg *CallGroup) done() {
if len(cg.outstandingOps) > 0 {
return
}

cg.cgcOnce.Do(func() {
//callGroupCompletion should never be nil, so let it panic if it is.
cg.callGroupCompletion(cg.finalState)
})
}

//CallGroupCompletion is the reducer function for a callgroup, its called once all
// Ops in the callgroup have called Finished and the final state is passed to this
// function.
type CallGroupCompletion func(finalState map[ID]*Response)

//Op represents one inflight operaton or message. When this Op's Finish func is called
// the results for this Op will be added to the finalState. When all Ops in the
// callgroup have called Finish, then the CallGroup's CallGroupCompletion func will be
// called with the final state for all Ops.
type Op struct {
cg *CallGroup
Key ID
Msg interface{}
}

func (o *Op) Finish(err error, resp interface{}) {
o.cg.mu.Lock()
defer o.cg.mu.Unlock()

if err != nil {
o.cg.finalState[o.Key] = &Response{Op: o, Err: err}
} else {
o.cg.finalState[o.Key] = &Response{Op: o, Result: resp}
}
delete(o.cg.outstandingOps, o.Key)

o.cg.done()
}

type Response struct {
Op *Op
Err error
Result interface{}
}
72 changes: 72 additions & 0 deletions callgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package inflight

import (
"runtime"
"sync"
"testing"

"github.com/bmizerany/assert"
)

func TestCompletion(t *testing.T) {
t.Parallel()
completed := 0
reslen := 0
cg := NewCallGroup(func(finalState map[ID]*Response) {
completed++
reslen += len(finalState)
})

op1 := cg.Add(1, &tsMsg{123, 5, "user", 1234567})
op2 := cg.Add(2, &tsMsg{123, 5, "user", 2222222})

assert.T(t, completed == 0)
assert.T(t, reslen == 0)
op1.Finish(nil, nil)
assert.T(t, completed == 0)
assert.T(t, reslen == 0)
op2.Finish(nil, nil)
assert.T(t, completed == 1)
assert.T(t, reslen == 2)
}

func TestConcurrentDone(t *testing.T) {
runtime.GOMAXPROCS(16)
t.Parallel()
completed := 0
reslen := 0
cg := NewCallGroup(func(finalState map[ID]*Response) {
completed++
reslen += len(finalState)
})

ops := []*Op{}
for i := 0; i < 1000; i++ {
ops = append(ops, cg.Add(uint64(i), &tsMsg{123, 5, "user", uint64(i)}))
}

wgend := sync.WaitGroup{}
wgstart := sync.WaitGroup{}
wgstart.Add(1)

for i := 0; i < 1000; i++ {
wgend.Add(1)
go func(id int) {
defer wgend.Done()
wgstart.Wait() //block until the testcase signals all go routines to fire at once.
ops[id].Finish(nil, nil)
}(i)
}
wgstart.Done() //start all go routines at the same time.
wgend.Wait()

assert.T(t, completed == 1)
assert.T(t, reslen == 1000)
}

type tsMsg struct {
Aid int
Gen int
Table string
RefsID uint64
}
15 changes: 15 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
The package inflight provides primitives(data strutures) for managing inflight operations that
are being processed in a distrubuted system.
CallGroup spawns off a group of operations for each call to Add() and
calls the CallGroupCompletion func when the last operation have
completed. The CallGroupCompletion func can be thought of as a finalizer where
one can gather errors and/or results from the function calls.
OpQueue is a thread-safe duplicate operation suppression queue, that combines
duplicate operations (queue entires) into sets that will be dequeued togather.
*/
package inflight
53 changes: 53 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/lytics/inflight"
)

func main() {
data := []string{"1:2", "5:6:7", "1:2", "5:6:7"}
total := int64(0)
wg := sync.WaitGroup{}
wg.Add(1)
cg := inflight.NewCallGroup(func(results map[inflight.ID]*inflight.Response) {
for _, res := range results {
subtotal := res.Result.(int)
atomic.AddInt64(&(total), int64(subtotal))
}
wg.Done()
})

startingLine := sync.WaitGroup{}
startingLine.Add(1) // block all go routines until the loop has finished spinning them up. Otherwise we have a race.
//Spawn off the workers.
for id, entry := range data {
op := cg.Add(uint64(id), entry)
go func(op *inflight.Op) {
startingLine.Wait() //wait here until signaled to start.
str := op.Msg.(string)
subtotal := 0
for _, val := range strings.Split(str, ":") {
i, _ := strconv.ParseInt(val, 10, 64)
subtotal += int(i)
}
op.Finish(nil, subtotal)
}(op)
}
startingLine.Done() // drop the checkered flag and signal all the workers to begin.

//wait for the completion function to finish.
wg.Wait()
totalVal := atomic.LoadInt64(&(total))
if totalVal != 42 {
// total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42
fmt.Printf("total not equal 42, got:%v \n", totalVal)
}
//total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42
fmt.Printf("got the expected amount of %v\n", total)
}
Loading

0 comments on commit 63293b3

Please sign in to comment.