最近一直在想一个问题,如何实现股票预警服务?股票价格是实时在变化的,而需要预警的价格也不确定,会根据用户的自身需求随时增加、修改、删除,产生变化。并且对于用户来说,对实时性的要求非常高。

最开始我想的是,采用轮询的办法,每当价格变动的时候去把所有的预警条件遍历一遍,看是否有符合条件的,如果有,就发送通知。但是根据经验来看,最容易想到的办法,其实是最不靠谱的,股票数量本来就非常多,当预警的条件数量成倍增长时,会造成遍历的次数成几何级增长。

于是我就想有没有一种办法,可以将预警条件和股票价格绑定的方法,当股票价格变动的时候,自动触发预警条件的判断逻辑。经过咨询后端开发的朋友,原来这就是典型的 生产者 - 消费者 模型。可以使用消息队列中间件来解决。

NSQ 介绍

在 Go 中,比较简洁,功能强大,而且轻量的中间件有 NSQ

NSQ 主要分为四个部分:

  • nsqd:用来接收、缓存、投递消息给客户端。服务启动后,有两个端口,一个提供给客户端,另一个是 HTTP API。
  • nsqlookupd:守护进程,用来负责管理拓扑信息并提供最终一致性的发现服务。客户端可以通过 nsqlookupd 来发现生产者。
  • nsqadmin:提供一套 Web UI 来管理 NSQ 集群,查看集群的实时统计情况。
  • utilities:基础工具集。

部署 NSQ

NSQ 支持 Docker 部署,一个 docker-compose 就能搞定:

docker-compose 文件:docker-compose.yml

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160"
      - "4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150"
      - "4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"

使用 docker-compose up -d 即可启动 NSQ。

测试 NSQ

使用 Docker 启动后,可以试试是否正常工作:

curl -d 'hello world 1' 'http://127.0.0.1:32770/pub?topic=test'

如果成功,会提示 OK%,而且你可以在 Web UI 中看到这个 topic。

实现一个简单的生产者 - 消费者

由于我这里只有一个 NSQ 节点,所以我采用直接连接节点的方式来使用 NSQ:

package main

import (
	"flag"
	"log"
	"time"

	"github.com/nsqio/go-nsq"
)

var url string
var url1 string

func init() {
	flag.StringVar(&url, "url", "127.0.0.1:32771", "nsqd")
	flag.Parse()
}

func startProducer() {
	config := nsq.NewConfig()
	producer, err := nsq.NewProducer(url, config)

	if err != nil {
		log.Fatal(err)
	}

	for {
		if err := producer.Publish("test", []byte("test message")); err != nil {
			log.Fatal("publish error:" + err.Error())
		}
		time.Sleep(1 * time.Second)
	}
}

func startConsumer() {
	config := nsq.NewConfig()
	consumer, err := nsq.NewConsumer("test", "sensor01", config)
	if err != nil {
		log.Fatal(err)
	}

	consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
		log.Println("consumer:" + string(message.Body))
		return nil
	}))

	if err := consumer.ConnectToNSQD(url); err != nil {
		log.Fatal(err)
	}

	<-consumer.StopChan
}

func main() {
	go startConsumer()
	startProducer()
}

每秒生产者都会 publish 一个消息到消息队列,然后消费者会不断的从消息队列中取出消息使用。