golang - nats

nats

轻量级的MQ.

demo的场景

企业钉钉有个每分钟1500调用的限制,目前公司都是各个项目各自去触发, 但大家相对独立,所以没法进行全局的控制, 造成拥堵的时候超过限制导致发送失败.

这个demo 主要是想做一个全局的消息频率控制.

思路

独立一个服务,进行钉钉通知管理.

放出一个接口接收大家的推送任务

服务内部 需要一个持久化容器存储消息

按一定频率消费这些消息,如果失败需要重新放入队列

  1. 可以直接用数据库做队列
  2. 自建消息队列

如果是数据库的话,单独为这个服务搞一个,感觉没啥必要,也就一个表.
如果自建消息队列,以后其他消息还能使用,选个轻量的就好了.

这里最终选了nats.
为了支持持久化用的是nats-streaming

demo代码

package main

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"time"

nats "github.com/nats-io/go-nats"
)

type person struct {
Name string
Address string
Age int
}

func main() {
// 创建MQ连接
nc, _ := nats.Connect(nats.DefaultURL)
// 是否需要序列化json
c, _ := nats.NewEncodedConn(nc, "json")
defer nc.Close()

// 假设为接受channel
ch := make(chan *person)
c.BindRecvChan("hello", ch)
// 独立一个go routine模拟发送消息
go send(c)
// 一直接受消息进行处理
receive(ch, c)

}

func receive(ch chan *person, c *nats.EncodedConn) {
for r := range ch {
// 频率限制
time.Sleep(10 * time.Second)
fmt.Printf("%v says hello!\n", r)
// 发送钉钉消息
url := "https://oapi.dingtalk.com/robot/send?access_token=69656ec1a327a4e8505816ab855af00d7d84ead5591dd57380bcb881b684a2e3"
// 这里写死,模拟接受到的信息
var jsonStr = []byte(`{
"msgtype": "link",
"link": {
"text": "这个即将发布的新版本,创始人陈航(花名“无招”)称它为“红树林”。
而在此之前,每当面临重大升级,产品经理们都会取一个应景的代号,这一次,为什么是“红树林”?",
"title": "时代的火车向前开",
"picUrl": "",
"messageUrl": "https://mp.weixin.qq.com/s?__biz=MzA4NjMwMTA2Ng==&mid=2650316842&idx=1&sn=60da3ea2b29f1dcc43a7c8e4a7c97a16&scene=2&srcid=09189AnRJEdIiWVaKltFzNTw&from=timeline&isappinstalled=0&key=&ascene=2&uin=&devicetype=android-23&version=26031933&nettype=WIFI"
}
}`)
// 发送消息
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
// 如果失败,重新进入队列排队
c.Publish("hello", jsonStr)
}
fmt.Println("response Status Code:", resp.StatusCode)
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println("response Body:", string(body))
}
}
func send(c *nats.EncodedConn) {
// 模拟连续发送次数
total := 4
me := &person{Name: "derek", Age: 22, Address: "85 Second St"}
for i := 0; i < total; i++ {
fmt.Println(i)
c.Publish("hello", me)
}
}