Skip to content

Commit

Permalink
More work. Not exactly sure if everything works right or not.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Darlington committed Mar 3, 2013
1 parent 606385f commit a77a673
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 114 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GOlang version of https://github.com/seomoz/qless. A redis job queue.
Go (golang) version of https://github.com/seomoz/qless. A redis job queue.

Goqless now works but not every feature is implemented or tested.
Any reported issues or pull requests are welcome.
Expand Down
54 changes: 54 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/garyburd/redigo/redis"
)

type TaggedReply struct {
Total int
Jobs StringSlice
}

type Client struct {
conn redis.Conn
host string
Expand All @@ -28,6 +33,7 @@ func Dial(host, port string) (*Client, error) {
}

c.lua = NewLua(conn)
// _, filename, _, _ := runtime.Caller(0)
err = c.lua.LoadScripts("qless-core") // make get from lib path
if err != nil {
conn.Close()
Expand Down Expand Up @@ -144,7 +150,55 @@ func (c *Client) GetRecurringJob(jid string) (*RecurringJob, error) {
return job, err
}

func (c *Client) Completed(start, count int) ([]string, error) {
reply, err := redis.Values(c.Do("jobs", 0, "complete"))
if err != nil {
return nil, err
}

// fmt.Println(reply)

ret := []string{}
for _, val := range reply {
s, _ := redis.String(val, err)
ret = append(ret, s)
}
return ret, err
}

func (c *Client) Tagged(tag string, start, count int) (*TaggedReply, error) {
byts, err := redis.Bytes(c.Do("tag", 0, "get", tag, start, count))
if err != nil {
return nil, err
}

t := &TaggedReply{}
err = json.Unmarshal(byts, t)
return t, err
}

// // returns all the failed jobs
// func (c *Client) Failed(group string, start, limit int) ([]*Job, error) {
// c.Do("failed", 0,
// }

// config(0, 'get', [option])
func (c *Client) GetConfig(option string) string {
args := []interface{}{0}
if option != "" {
args = append(args, option)
}

str, _ := redis.String(c.Do("get", args...))
return str
}

// config(0, 'set', option, value)
func (c *Client) SetConfig(option string, value interface{}) {
c.Do("set", option, value)
}

// config(0, 'unset', [option])
func (c *Client) UnsetConfig(option string) {
c.Do("unset", option)
}
106 changes: 53 additions & 53 deletions examples_test.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,67 @@
package goqless

import (
"fmt"
// "github.com/garyburd/redigo/redis"
// "time"
)
// import (
// "fmt"
// // "github.com/garyburd/redigo/redis"
// // "time"
// )

func ExampleGoqless_1() {
c, err := Dial("", "6379")
if err != nil {
panic(err)
}
defer c.Close()
// func ExampleGoqless_1() {
// c, err := Dial("", "6379")
// if err != nil {
// panic(err)
// }
// defer c.Close()

jid := "f5b66400bf027c191ddb80a85785b03eb9765456"
c.Track(jid)
q := c.Queue("goqless_testing_queue")
// jid := "f5b66400bf027c191ddb80a85785b03eb9765456"
// c.Track(jid)
// q := c.Queue("goqless_testing_queue")

putreply, err := q.Put(jid, "dunno", `{"hey": "there"}`, -1, -1, []string{}, -1, []string{})
fmt.Println("Put:", putreply, err)
//fmt.Println(q.Recur(jid, "dunno", `{"hey": "there"}`, 5, 0, 0, []string{}, 1))
// putreply, err := q.Put(jid, "dunno", `{"hey": "there"}`, -1, -1, []string{}, -1, []string{})
// fmt.Println("Put:", putreply, err)
// //fmt.Println(q.Recur(jid, "dunno", `{"hey": "there"}`, 5, 0, 0, []string{}, 1))

jobs, err := q.Pop(1)
fmt.Printf("Pop: %s %s %v\n", jobs[0].Queue, jobs[0].Data, err)
fmt.Print("Complete: ")
fmt.Println(jobs[0].Complete())
// jobs, err := q.Pop(1)
// fmt.Printf("Pop: %s %s %v\n", jobs[0].Queue, jobs[0].Data, err)
// fmt.Print("Complete: ")
// fmt.Println(jobs[0].Complete())

//wg.Wait()
// //wg.Wait()

// Output:
// Put: f5b66400bf027c191ddb80a85785b03eb9765456 <nil>
// Pop: goqless_testing_queue map[hey:there] <nil>
// Complete: complete <nil>
}
// // Output:
// // Put: f5b66400bf027c191ddb80a85785b03eb9765456 <nil>
// // Pop: goqless_testing_queue map[hey:there] <nil>
// // Complete: complete <nil>
// }

func ExampleGoqless_2() {
c, err := Dial("", "6379")
if err != nil {
panic(err)
}
defer c.Close()
// func ExampleGoqless_2() {
// c, err := Dial("", "6379")
// if err != nil {
// panic(err)
// }
// defer c.Close()

jid := "f5b66400bf027c191ddb80a85785b03eb9765457"
c.Track(jid)
q := c.Queue("goqless_testing_queue")
// jid := "f5b66400bf027c191ddb80a85785b03eb9765457"
// c.Track(jid)
// q := c.Queue("goqless_testing_queue")

data := struct {
Str string
}{
"a string",
}
// data := struct {
// Str string
// }{
// "a string",
// }

putreply, err := q.Put(jid, "dunno", data, -1, -1, []string{}, -1, []string{})
fmt.Println("Put:", putreply, err)
//fmt.Println(q.Recur(jid, "dunno", `{"hey": "there"}`, 5, 0, 0, []string{}, 1))
// putreply, err := q.Put(jid, "dunno", data, -1, -1, []string{}, -1, []string{})
// fmt.Println("Put:", putreply, err)
// //fmt.Println(q.Recur(jid, "dunno", `{"hey": "there"}`, 5, 0, 0, []string{}, 1))

jobs, err := q.Pop(1)
fmt.Printf("Pop: %s %v %v\n", jobs[0].Queue, jobs[0].Data, err)
fmt.Print("Fail: ")
fmt.Println(jobs[0].Fail("justbecause", "i said so"))
// jobs, err := q.Pop(1)
// fmt.Printf("Pop: %s %v %v\n", jobs[0].Queue, jobs[0].Data, err)
// fmt.Print("Fail: ")
// fmt.Println(jobs[0].Fail("justbecause", "i said so"))

// Output:
// Put: f5b66400bf027c191ddb80a85785b03eb9765457 <nil>
// Pop: goqless_testing_queue map[Str:a string] <nil>
// Fail: true <nil>
}
// // Output:
// // Put: f5b66400bf027c191ddb80a85785b03eb9765457 <nil>
// // Pop: goqless_testing_queue map[Str:a string] <nil>
// // Fail: true <nil>
// }
9 changes: 8 additions & 1 deletion goqless.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ func (s *StringSlice) UnmarshalJSON(data []byte) error {
return nil
}

return json.Unmarshal(data, s)
var str []string
err := json.Unmarshal(data, &str)
if err != nil {
return err
}

*s = str
return nil
}

// Generates a jid
Expand Down
141 changes: 86 additions & 55 deletions goqless_test.go
Original file line number Diff line number Diff line change
@@ -1,63 +1,94 @@
package goqless

import (
"fmt"
"github.com/garyburd/redigo/redis"
"sync"
"testing"
"time"
)

// func TestRandom(t *testing.T) {
// c, err := Dial("", "6379")
// if err != nil {
// panic(err)
// }
// defer c.Close()

// // fmt.Println(c.Completed(0, 9999))

// tagged, err := c.Tagged("__callback", 0, 0)
// if err != nil {
// fmt.Println(err)
// }

// if err == nil {
// job := NewJob(c)
// for n, j := range tagged.Jobs {
// fmt.Println(n, j)
// job.Jid = j
// fmt.Println(job.Untag("__callback"))
// }
// }
// }

func TestGoqless(t *testing.T) {
// var wg sync.WaitGroup

// c, err := Dial("", "6379")
// if err != nil {
// panic(err)
// }
// defer c.Close()

// e := c.Events()
// ch, err := e.Listen()
// if err != nil {
// panic(err)
// }

// go func() {
// wg.Add(1)
// defer wg.Done()

// for {
// if val, ok := <-ch; ok {
// switch v := (val).(type) {
// case redis.Message:
// fmt.Printf("WATCH: %s: message: %s\n", v.Channel, v.Data)
// case redis.Subscription:
// fmt.Printf("WATCH: %s: %s %d\n", v.Channel, v.Kind, v.Count)
// case error:
// return
// }
// } else {
// return
// }
// }
// }()

// jid := generateJID()
// c.Track(jid)
// q := c.Queue("goqless_testing_queue")

// putreply, err := q.Put(jid, "dunno", `{"hey": "there"}`, -1, -1, []string{}, -1, []string{})
// fmt.Println("Put:", putreply, err)
// //fmt.Println(q.Recur(jid, "dunno", `{"hey": "there"}`, 5, 0, 0, []string{}, 1))

// for {
// jobs, err := q.Pop(1)
// if err != nil {
// panic(err)
// }

// if len(jobs) > 0 {
// fmt.Println(jobs[0].Complete())
// }
// time.Sleep(3 * time.Second)
// }

// wg.Wait()
var wg sync.WaitGroup

c, err := Dial("", "6379")
if err != nil {
panic(err)
}
defer c.Close()

e := c.Events()
ch, err := e.Listen()
if err != nil {
panic(err)
}

go func() {
wg.Add(1)
defer wg.Done()

for {
if val, ok := <-ch; ok {
switch v := (val).(type) {
case redis.Message:
fmt.Printf("WATCH: %s: message: %s\n", v.Channel, v.Data)
case redis.Subscription:
fmt.Printf("WATCH: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
return
}
} else {
return
}
}
}()

jid := generateJID()
c.Track(jid)
q := c.Queue("goqless_testing_queue")

putreply, err := q.Put(jid, "dunno", `{"hey": "there"}`, -1, -1, []string{"__callback"}, -1, []string{})
fmt.Println("Put:", putreply, err)
//fmt.Println(q.Recur(jid, "dunno", `{"hey": "there"}`, 5, 0, 0, []string{}, 1))

for {
jobs, err := q.Pop(1)
if err != nil {
panic(err)
}

if len(jobs) > 0 {
jobs[0].Data = map[string]interface{}{"idid": "id"}
fmt.Println(jobs[0].Complete())
}
time.Sleep(3 * time.Second)

fmt.Println(c.GetJob(jid))
}

wg.Wait()
}
Loading

0 comments on commit a77a673

Please sign in to comment.