ipbl/src/zmqrouter/main.go
2023-01-19 08:51:03 +01:00

103 lines
2.0 KiB
Go

package zmqrouter
import (
"encoding/json"
"fmt"
"log"
"time"
"git.paulbsd.com/paulbsd/ipbl/src/config"
"git.paulbsd.com/paulbsd/ipbl/src/models"
"gopkg.in/zeromq/goczmq.v4"
)
func Init(cfg *config.Config) (err error) {
log.Println("Initiating ZMQ sockets")
reqsock, err := InitRep()
if err != nil {
return
}
pubsock, err := InitPub()
if err != nil {
return
}
Handle(cfg, reqsock, pubsock, cfg.Options.ZMQChannel)
return
}
func Handle(cfg *config.Config, reqsock *goczmq.Sock, pubsock *goczmq.Sock, channel string) (err error) {
log.Println("Start handling zmq sockets")
var lastip string
for {
var msg = "err"
var req, err = reqsock.RecvMessage()
if err != nil {
log.Println("unable to receive message from req socket", err)
continue
}
var topub [][]byte
for _, val := range req {
var apievent = models.APIEvent{}
var event = models.Event{}
err = json.Unmarshal(val, &apievent)
if err != nil {
log.Println("unable to parse ip address", err)
time.Sleep(time.Second)
continue
}
if apievent.IPData.IP != "" && apievent.IPData.IP == lastip {
continue
}
if apievent.MsgType == "add" {
session := cfg.Db.NewSession()
event.APIParse(session, apievent)
session.Close()
err := event.Insert(cfg)
if err != nil {
log.Println(err)
}
log.Printf("zmq: Inserted event")
}
if apievent.MsgType == "file" {
apievent.MsgType = "zmq"
}
val, err = json.Marshal(apievent)
if err != nil {
fmt.Println(err)
}
tmpval := fmt.Sprintf("%s %s", channel, string(val))
ipjson := []byte(tmpval)
topub = append(topub, ipjson)
lastip = apievent.IPData.IP
}
err = pubsock.SendMessage(topub)
if err != nil {
log.Println("error sending message to pub socket")
continue
}
msg = "ok"
var resp [][]byte = [][]byte{[]byte(msg)}
err = reqsock.SendMessage(resp)
if err != nil {
log.Println("error replying message to req socket")
continue
}
}
}