ipbl/vendor/gopkg.in/zeromq/goczmq.v4/channeler.go

310 lines
7.3 KiB
Go
Raw Normal View History

2022-03-11 23:34:09 +01:00
package goczmq
/*
#include "czmq.h"
void Sock_init() {zsys_init();}
*/
import "C"
import (
"fmt"
"math/rand"
"strings"
)
// Channeler serializes all access to a socket through a send
// and receive channel. It starts two threads, on is used for receiving
// from the zeromq socket. The other is used to listen to the receive
// channel, and send everything back to the socket thrad for sending
// using an additional inproc socket.
type Channeler struct {
id int64
sockType int
endpoints string
subscribe *string
commandAddr string
proxyAddr string
commandChan chan<- string
SendChan chan<- [][]byte
RecvChan <-chan [][]byte
}
// Destroy sends a message to the Channeler to shut it down
// and clean it up.
func (c *Channeler) Destroy() {
c.commandChan <- "destroy"
}
// Subscribe to a Topic
func (c *Channeler) Subscribe(topic string) {
c.commandChan <- fmt.Sprintf("subscribe %s", topic)
}
// Unsubscribe from a Topic
func (c *Channeler) Unsubscribe(topic string) {
c.commandChan <- fmt.Sprintf("unsubscribe %s", topic)
}
// actor is a routine that handles communication with
// the zeromq socket.
func (c *Channeler) actor(recvChan chan<- [][]byte) {
pipe, err := NewPair(fmt.Sprintf(">%s", c.commandAddr))
if err != nil {
panic(err)
}
defer pipe.Destroy()
defer close(recvChan)
pull, err := NewPull(c.proxyAddr)
if err != nil {
panic(err)
}
defer pull.Destroy()
sock := NewSock(c.sockType)
defer sock.Destroy()
switch c.sockType {
case Pub, Rep, Pull, Router, XPub:
err = sock.Attach(c.endpoints, true)
if err != nil {
panic(err)
}
case Req, Push, Dealer, Pair, Stream, XSub:
err = sock.Attach(c.endpoints, false)
if err != nil {
panic(err)
}
case Sub:
if c.subscribe != nil {
subscriptions := strings.Split(*c.subscribe, ",")
for _, topic := range subscriptions {
sock.SetSubscribe(topic)
}
}
err = sock.Attach(c.endpoints, false)
if err != nil {
panic(err)
}
default:
panic(ErrInvalidSockType)
}
poller, err := NewPoller(sock, pull, pipe)
if err != nil {
panic(err)
}
defer poller.Destroy()
for {
s := poller.Wait(-1)
switch s {
case pipe:
cmd, err := pipe.RecvMessage()
if err != nil {
panic(err)
}
switch string(cmd[0]) {
case "destroy":
disconnect := strings.Split(c.endpoints, ",")
for _, endpoint := range disconnect {
sock.Disconnect(endpoint)
}
pipe.SendMessage([][]byte{[]byte("ok")})
goto ExitActor
case "subscribe":
topic := string(cmd[1])
sock.SetSubscribe(topic)
pipe.SendMessage([][]byte{[]byte("ok")})
case "unsubscribe":
topic := string(cmd[1])
sock.SetUnsubscribe(topic)
pipe.SendMessage([][]byte{[]byte("ok")})
}
case sock:
msg, err := s.RecvMessage()
if err != nil {
panic(err)
}
recvChan <- msg
case pull:
msg, err := pull.RecvMessage()
if err != nil {
panic(err)
}
err = sock.SendMessage(msg)
if err != nil {
panic(err)
}
}
}
ExitActor:
}
// channeler is a routine that handles the channel select loop
// and sends commands to the zeromq socket.
func (c *Channeler) channeler(commandChan <-chan string, sendChan <-chan [][]byte) {
push, err := NewPush(c.proxyAddr)
if err != nil {
panic(err)
}
defer push.Destroy()
pipe, err := NewPair(fmt.Sprintf("@%s", c.commandAddr))
if err != nil {
panic(err)
}
defer pipe.Destroy()
for {
select {
case cmd := <-commandChan:
switch cmd {
case "destroy":
err = pipe.SendFrame([]byte("destroy"), FlagNone)
if err != nil {
panic(err)
}
_, err = pipe.RecvMessage()
if err != nil {
panic(err)
}
goto ExitChanneler
default:
parts := strings.Split(cmd, " ")
numParts := len(parts)
message := make([][]byte, numParts, numParts)
for i, p := range parts {
message[i] = []byte(p)
}
err := pipe.SendMessage(message)
if err != nil {
panic(err)
}
_, err = pipe.RecvMessage()
if err != nil {
panic(err)
}
}
case msg := <-sendChan:
err := push.SendMessage(msg)
if err != nil {
panic(err)
}
}
}
ExitChanneler:
}
// newChanneler accepts arguments from the socket type based
// constructors and creates a new Channeler instance
func newChanneler(sockType int, endpoints string, subscribe ...string) *Channeler {
commandChan := make(chan string)
sendChan := make(chan [][]byte)
recvChan := make(chan [][]byte)
C.Sock_init()
c := &Channeler{
id: rand.Int63(),
endpoints: endpoints,
sockType: sockType,
commandChan: commandChan,
SendChan: sendChan,
RecvChan: recvChan,
}
c.commandAddr = fmt.Sprintf("inproc://actorcontrol%d", c.id)
c.proxyAddr = fmt.Sprintf("inproc://proxy%d", c.id)
if len(subscribe) > 0 {
topics := strings.Join(subscribe, ",")
c.subscribe = &topics
}
go c.channeler(commandChan, sendChan)
go c.actor(recvChan)
return c
}
// NewPubChanneler creats a new Channeler wrapping
// a Pub socket. The socket will bind by default.
func NewPubChanneler(endpoints string) *Channeler {
return newChanneler(Pub, endpoints, "")
}
// NewSubChanneler creates a new Channeler wrapping
// a Sub socket. Along with an endpoint list
// it accepts a comma delimited list of topics.
// The socket will connect by default.
func NewSubChanneler(endpoints string, subscribe ...string) *Channeler {
return newChanneler(Sub, endpoints, subscribe...)
}
// NewRepChanneler creates a new Channeler wrapping
// a Rep socket. The socket will bind by default.
func NewRepChanneler(endpoints string) *Channeler {
return newChanneler(Rep, endpoints, "")
}
// NewReqChanneler creates a new Channeler wrapping
// a Req socket. The socket will connect by default.
func NewReqChanneler(endpoints string) *Channeler {
return newChanneler(Req, endpoints, "")
}
// NewPullChanneler creates a new Channeler wrapping
// a Pull socket. The socket will bind by default.
func NewPullChanneler(endpoints string) *Channeler {
return newChanneler(Pull, endpoints, "")
}
// NewPushChanneler creates a new Channeler wrapping
// a Push socket. The socket will connect by default.
func NewPushChanneler(endpoints string) *Channeler {
return newChanneler(Push, endpoints, "")
}
// NewRouterChanneler creates a new Channeler wrapping
// a Router socket. The socket will Bind by default.
func NewRouterChanneler(endpoints string) *Channeler {
return newChanneler(Router, endpoints, "")
}
// NewDealerChanneler creates a new Channeler wrapping
// a Dealer socket. The socket will connect by default.
func NewDealerChanneler(endpoints string) *Channeler {
return newChanneler(Dealer, endpoints, "")
}
// NewXPubChanneler creates a new Channeler wrapping
// an XPub socket. The socket will Bind by default.
func NewXPubChanneler(endpoints string) *Channeler {
return newChanneler(XPub, endpoints, "")
}
// NewXSubChanneler creates a new Channeler wrapping
// a XSub socket. The socket will connect by default.
func NewXSubChanneler(endpoints string) *Channeler {
return newChanneler(XSub, endpoints, "")
}
// NewPairChanneler creates a new Channeler wrapping
// a Pair socket. The socket will connect by default.
func NewPairChanneler(endpoints string) *Channeler {
return newChanneler(Pair, endpoints, "")
}
// NewStreamChanneler creates a new Channeler wrapping
// a Pair socket. The socket will connect by default.
func NewStreamChanneler(endpoints string) *Channeler {
return newChanneler(Stream, endpoints, "")
}