ipbl/src/zmqrouter/main.go
Paul Lecuq e92d332269
All checks were successful
continuous-integration/drone/push Build is passing
updated ipbl to handle init of ipblc
2022-09-11 23:34:51 +02:00

82 lines
1.6 KiB
Go

package zmqrouter
import (
"encoding/json"
"fmt"
"log"
"git.paulbsd.com/paulbsd/ipbl/src/config"
"git.paulbsd.com/paulbsd/ipbl/src/models"
"gopkg.in/zeromq/goczmq.v4"
)
func Init(cfg *config.Config) (err error) {
log.Println("Initiating ZMQ sockets")
reqsock, err := InitRep()
if err != nil {
return
}
pubsock, err := InitPub()
if err != nil {
return
}
Handle(cfg, reqsock, pubsock, cfg.Options.ZMQChannel)
return
}
func Handle(cfg *config.Config, reqsock *goczmq.Sock, pubsock *goczmq.Sock, channel string) (err error) {
log.Println("Start handling zmq sockets")
for {
var msg = "err"
var req, err = reqsock.RecvMessage()
if err != nil {
log.Println("unable to receive message from req socket")
continue
}
var topub [][]byte
for _, val := range req {
var apiip = models.APIIP{}
var ip = models.IP{}
err = json.Unmarshal(val, &apiip)
if err != nil {
log.Println("unable to parse ip address", err)
continue
}
if apiip.Mode != "init" {
ip = *apiip.APIConvert()
numinsert, numupdate, err := ip.InsertOrUpdate(cfg)
if err != nil {
log.Println(err)
}
log.Printf("zmq: Inserted %d IP, Updated %d IP\n", numinsert, numupdate)
}
tmpval := fmt.Sprintf("%s %s", channel, string(val))
ipjson := []byte(tmpval)
topub = append(topub, ipjson)
}
err = pubsock.SendMessage(topub)
if err != nil {
log.Println("error sending message to pub socket")
continue
}
msg = "ok"
var resp [][]byte = [][]byte{[]byte(msg)}
err = reqsock.SendMessage(resp)
if err != nil {
log.Println("error replying message to req socket")
continue
}
}
}