Go开源:Gores - Redis 的消息队列系统
<h2>Gores</h2> <p>An asynchronous job execution system based on Redis</p> <h2>Installation</h2> <p>Get the package</p> <pre> <code class="language-go">$ go get github.com/wang502/gores/gores</code></pre> <p>Import the package</p> <pre> <code class="language-go">import "github.com/wang502/gores/gores"</code></pre> <h2>Usage</h2> <h3>Configuration</h3> <p>Add a config.json in your project folder</p> <pre> <code class="language-go">{ "REDISURL": "127.0.0.1:6379", "REDIS_PW": "mypassword", "BLPOP_MAX_BLOCK_TIME" : 1, "MAX_WORKERS": 2, "Queues": ["queue1", "queue2"], "DispatcherTimeout": 5, "WorkerTimeout": 5 }</code></pre> <ul> <li><strong><em>REDISURL</em> </strong> : Redis server address. If you run in a local Redis, the dafault host is 127.0.0.1:6379</li> <li><strong><em>REDIS_PW</em> </strong> : Redis password. If the password is not set, then password can be any string.</li> <li><strong><em>BLPOP_MAX_BLOCK_TIME</em> </strong> : Blocking time when calling BLPOP command in Redis.</li> <li><strong><em>MAX_WORKERS</em> </strong> : Maximum number of concurrent workers, each worker is a separate goroutine that execute specific task on the fetched item.</li> <li><strong><em>Queues</em> </strong> : Array of queue names on Redis message broker.</li> <li><strong><em>DispatcherTimeout</em> </strong> : Duration dispatcher will wait to dispatch new job before quitting.</li> <li><strong><em>WorkerTimeout</em> </strong> : Duration worker will wait to process new job before quitting.</li> </ul> <p>Initialize config</p> <pre> <code class="language-go">configPath := flag.String("c", "config.json", "path to configuration file") flag.Parse() config, err := gores.InitConfig(*configPath)</code></pre> <h3>Enqueue item to Redis queue</h3> <p>An item is a Go map. It is required to have several keys:</p> <ul> <li><strong><em>Name</em> </strong> : name of the item to enqueue, items with different names are mapped to different tasks.</li> <li><strong><em>Queue</em> </strong> : name of the queue you want to put the item in.</li> <li><strong><em>Args</em> </strong> : the required arguments that you need in order for the workers to execute those tasks.</li> <li><strong><em>Enqueue_timestamp</em> </strong> : the Unix timestamp of when the item is enqueued.</li> </ul> <pre> <code class="language-go">resq := gores.NewResQ(config) item := map[string]interface{}{ "Name": "Rectangle", "Queue": "TestJob", "Args": map[string]interface{}{ "Length": 10, "Width": 10, }, "Enqueue_timestamp": time.Now().Unix(), } err = resq.Enqueue(item) if err != nil { log.Fatalf("ERROR Enqueue item to ResQ") }</code></pre> <pre> <code class="language-go">$ go run main.go -c ./config.json -o produce</code></pre> <h3>Define tasks</h3> <pre> <code class="language-go">package tasks // task for item with 'Name' = 'Rectangle' // calculating the area of an rectangle by multiplying Length with Width func CalculateArea(args map[string]interface{}) error { var err error length := args["Length"] width := args["Width"] if length == nil || width == nil { err = errors.New("Map has no required attributes") return err } fmt.Printf("The area is %d\n", int(length.(float64)) * int(width.(float64))) return err }</code></pre> <h3>Launch workers to consume items and execute tasks</h3> <pre> <code class="language-go">tasks := map[string]interface{}{ "Item": tasks.PrintItem, "Rectangle": tasks.CalculateArea, } gores.Launch(config, &tasks)</code></pre> <pre> <code class="language-go">$ go run main.go -c ./config.json -o consume</code></pre> <p>The output will be:</p> <pre> <code class="language-go">The rectangle area is 100</code></pre> <h3>Info about processed/failed job</h3> <pre> <code class="language-go">resq := gores.NewResQ(config) if resq == nil { log.Fatalf("resq is nil") } info := resq.Info() for k, v := range info { switch v.(type) { case string: fmt.Printf("%s : %s\n", k, v) case int: fmt.Printf("%s : %d\n", k, v) case int64: fmt.Printf("%s : %d\n", k, v) } }</code></pre> <p>The output will be:</p> <pre> <code class="language-go">Gores Info: queues : 2 workers : 0 failed : 0 host : 127.0.0.1:6379 pending : 0 processed : 1</code></pre> <h2>Contribution</h2> <p>Please feel free to suggest new features. Also open to pull request!</p> <p> </p> <p> </p>
本文由用户 k5135 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
转载本站原创文章,请注明出处,并保留原始链接、图片水印。
本站是一个以用户分享为主的开源技术平台,欢迎各类分享!