ipbl/src/zmqrouter/main.go
Paul Lecuq 9d70341f41
All checks were successful
continuous-integration/drone/push Build is passing
added transactions, cleaned code
2022-05-10 13:00:22 +02:00

80 lines
1.6 KiB
Go

package zmqrouter
import (
"encoding/json"
"fmt"
"log"
"git.paulbsd.com/paulbsd/ipbl/src/config"
"git.paulbsd.com/paulbsd/ipbl/src/models"
"gopkg.in/zeromq/goczmq.v4"
)
func Init(cfg *config.Config) (err error) {
log.Println("Initiating ZMQ sockets")
reqsock, err := InitRep()
if err != nil {
return
}
pubsock, err := InitPub()
if err != nil {
return
}
Handle(cfg, reqsock, pubsock, cfg.Options.ZMQChannel)
return
}
func Handle(cfg *config.Config, reqsock *goczmq.Sock, pubsock *goczmq.Sock, channel string) (err error) {
log.Println("Start handling zmq sockets")
for {
var msg = "err"
var req, err = reqsock.RecvMessage()
if err != nil {
log.Println("unable to receive message from req socket")
continue
}
var topub [][]byte
for _, val := range req {
var apiip = models.APIIP{}
var ip = models.IP{}
err = json.Unmarshal(val, &apiip)
if err != nil {
log.Println("unable to parse ip address", err)
continue
}
ip = *apiip.APIConvert()
numinsert, numupdate, err := ip.InsertOrUpdate(cfg)
if err != nil {
log.Println(err)
}
log.Printf("zmq: Inserted %d IP, Updated %d IP\n", numinsert, numupdate)
tmpval := fmt.Sprintf("%s %s", channel, string(val))
val = []byte(tmpval)
topub = append(topub, val)
}
err = pubsock.SendMessage(topub)
if err != nil {
log.Println("error sending message to pub socket")
continue
}
msg = "ok"
var resp [][]byte = [][]byte{[]byte(msg)}
err = reqsock.SendMessage(resp)
if err != nil {
log.Println("error replying message to req socket")
continue
}
}
}