310 lines
7.3 KiB
Go
310 lines
7.3 KiB
Go
package goczmq
|
|
|
|
/*
|
|
#include "czmq.h"
|
|
void Sock_init() {zsys_init();}
|
|
*/
|
|
import "C"
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"strings"
|
|
)
|
|
|
|
// Channeler serializes all access to a socket through a send
|
|
// and receive channel. It starts two threads, on is used for receiving
|
|
// from the zeromq socket. The other is used to listen to the receive
|
|
// channel, and send everything back to the socket thrad for sending
|
|
// using an additional inproc socket.
|
|
type Channeler struct {
|
|
id int64
|
|
sockType int
|
|
endpoints string
|
|
subscribe *string
|
|
commandAddr string
|
|
proxyAddr string
|
|
commandChan chan<- string
|
|
SendChan chan<- [][]byte
|
|
RecvChan <-chan [][]byte
|
|
}
|
|
|
|
// Destroy sends a message to the Channeler to shut it down
|
|
// and clean it up.
|
|
func (c *Channeler) Destroy() {
|
|
c.commandChan <- "destroy"
|
|
}
|
|
|
|
// Subscribe to a Topic
|
|
func (c *Channeler) Subscribe(topic string) {
|
|
c.commandChan <- fmt.Sprintf("subscribe %s", topic)
|
|
}
|
|
|
|
// Unsubscribe from a Topic
|
|
func (c *Channeler) Unsubscribe(topic string) {
|
|
c.commandChan <- fmt.Sprintf("unsubscribe %s", topic)
|
|
}
|
|
|
|
// actor is a routine that handles communication with
|
|
// the zeromq socket.
|
|
func (c *Channeler) actor(recvChan chan<- [][]byte) {
|
|
pipe, err := NewPair(fmt.Sprintf(">%s", c.commandAddr))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer pipe.Destroy()
|
|
defer close(recvChan)
|
|
|
|
pull, err := NewPull(c.proxyAddr)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer pull.Destroy()
|
|
|
|
sock := NewSock(c.sockType)
|
|
defer sock.Destroy()
|
|
switch c.sockType {
|
|
case Pub, Rep, Pull, Router, XPub:
|
|
err = sock.Attach(c.endpoints, true)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
case Req, Push, Dealer, Pair, Stream, XSub:
|
|
err = sock.Attach(c.endpoints, false)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
case Sub:
|
|
if c.subscribe != nil {
|
|
subscriptions := strings.Split(*c.subscribe, ",")
|
|
for _, topic := range subscriptions {
|
|
sock.SetSubscribe(topic)
|
|
}
|
|
}
|
|
|
|
err = sock.Attach(c.endpoints, false)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
default:
|
|
panic(ErrInvalidSockType)
|
|
}
|
|
|
|
poller, err := NewPoller(sock, pull, pipe)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer poller.Destroy()
|
|
|
|
for {
|
|
s := poller.Wait(-1)
|
|
switch s {
|
|
case pipe:
|
|
cmd, err := pipe.RecvMessage()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
switch string(cmd[0]) {
|
|
case "destroy":
|
|
disconnect := strings.Split(c.endpoints, ",")
|
|
for _, endpoint := range disconnect {
|
|
sock.Disconnect(endpoint)
|
|
}
|
|
pipe.SendMessage([][]byte{[]byte("ok")})
|
|
goto ExitActor
|
|
case "subscribe":
|
|
topic := string(cmd[1])
|
|
sock.SetSubscribe(topic)
|
|
pipe.SendMessage([][]byte{[]byte("ok")})
|
|
case "unsubscribe":
|
|
topic := string(cmd[1])
|
|
sock.SetUnsubscribe(topic)
|
|
pipe.SendMessage([][]byte{[]byte("ok")})
|
|
}
|
|
|
|
case sock:
|
|
msg, err := s.RecvMessage()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
recvChan <- msg
|
|
|
|
case pull:
|
|
msg, err := pull.RecvMessage()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
err = sock.SendMessage(msg)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|
|
ExitActor:
|
|
}
|
|
|
|
// channeler is a routine that handles the channel select loop
|
|
// and sends commands to the zeromq socket.
|
|
func (c *Channeler) channeler(commandChan <-chan string, sendChan <-chan [][]byte) {
|
|
push, err := NewPush(c.proxyAddr)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer push.Destroy()
|
|
|
|
pipe, err := NewPair(fmt.Sprintf("@%s", c.commandAddr))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer pipe.Destroy()
|
|
|
|
for {
|
|
select {
|
|
case cmd := <-commandChan:
|
|
switch cmd {
|
|
case "destroy":
|
|
err = pipe.SendFrame([]byte("destroy"), FlagNone)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
_, err = pipe.RecvMessage()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
goto ExitChanneler
|
|
default:
|
|
parts := strings.Split(cmd, " ")
|
|
numParts := len(parts)
|
|
message := make([][]byte, numParts, numParts)
|
|
for i, p := range parts {
|
|
message[i] = []byte(p)
|
|
}
|
|
err := pipe.SendMessage(message)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
_, err = pipe.RecvMessage()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
case msg := <-sendChan:
|
|
err := push.SendMessage(msg)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|
|
ExitChanneler:
|
|
}
|
|
|
|
// newChanneler accepts arguments from the socket type based
|
|
// constructors and creates a new Channeler instance
|
|
func newChanneler(sockType int, endpoints string, subscribe ...string) *Channeler {
|
|
commandChan := make(chan string)
|
|
sendChan := make(chan [][]byte)
|
|
recvChan := make(chan [][]byte)
|
|
|
|
C.Sock_init()
|
|
c := &Channeler{
|
|
id: rand.Int63(),
|
|
endpoints: endpoints,
|
|
sockType: sockType,
|
|
commandChan: commandChan,
|
|
SendChan: sendChan,
|
|
RecvChan: recvChan,
|
|
}
|
|
c.commandAddr = fmt.Sprintf("inproc://actorcontrol%d", c.id)
|
|
c.proxyAddr = fmt.Sprintf("inproc://proxy%d", c.id)
|
|
|
|
if len(subscribe) > 0 {
|
|
topics := strings.Join(subscribe, ",")
|
|
c.subscribe = &topics
|
|
}
|
|
|
|
go c.channeler(commandChan, sendChan)
|
|
go c.actor(recvChan)
|
|
|
|
return c
|
|
}
|
|
|
|
// NewPubChanneler creats a new Channeler wrapping
|
|
// a Pub socket. The socket will bind by default.
|
|
func NewPubChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Pub, endpoints, "")
|
|
}
|
|
|
|
// NewSubChanneler creates a new Channeler wrapping
|
|
// a Sub socket. Along with an endpoint list
|
|
// it accepts a comma delimited list of topics.
|
|
// The socket will connect by default.
|
|
func NewSubChanneler(endpoints string, subscribe ...string) *Channeler {
|
|
return newChanneler(Sub, endpoints, subscribe...)
|
|
}
|
|
|
|
// NewRepChanneler creates a new Channeler wrapping
|
|
// a Rep socket. The socket will bind by default.
|
|
func NewRepChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Rep, endpoints, "")
|
|
}
|
|
|
|
// NewReqChanneler creates a new Channeler wrapping
|
|
// a Req socket. The socket will connect by default.
|
|
func NewReqChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Req, endpoints, "")
|
|
}
|
|
|
|
// NewPullChanneler creates a new Channeler wrapping
|
|
// a Pull socket. The socket will bind by default.
|
|
func NewPullChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Pull, endpoints, "")
|
|
}
|
|
|
|
// NewPushChanneler creates a new Channeler wrapping
|
|
// a Push socket. The socket will connect by default.
|
|
func NewPushChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Push, endpoints, "")
|
|
}
|
|
|
|
// NewRouterChanneler creates a new Channeler wrapping
|
|
// a Router socket. The socket will Bind by default.
|
|
func NewRouterChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Router, endpoints, "")
|
|
}
|
|
|
|
// NewDealerChanneler creates a new Channeler wrapping
|
|
// a Dealer socket. The socket will connect by default.
|
|
func NewDealerChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Dealer, endpoints, "")
|
|
}
|
|
|
|
// NewXPubChanneler creates a new Channeler wrapping
|
|
// an XPub socket. The socket will Bind by default.
|
|
func NewXPubChanneler(endpoints string) *Channeler {
|
|
return newChanneler(XPub, endpoints, "")
|
|
}
|
|
|
|
// NewXSubChanneler creates a new Channeler wrapping
|
|
// a XSub socket. The socket will connect by default.
|
|
func NewXSubChanneler(endpoints string) *Channeler {
|
|
return newChanneler(XSub, endpoints, "")
|
|
}
|
|
|
|
// NewPairChanneler creates a new Channeler wrapping
|
|
// a Pair socket. The socket will connect by default.
|
|
func NewPairChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Pair, endpoints, "")
|
|
}
|
|
|
|
// NewStreamChanneler creates a new Channeler wrapping
|
|
// a Pair socket. The socket will connect by default.
|
|
func NewStreamChanneler(endpoints string) *Channeler {
|
|
return newChanneler(Stream, endpoints, "")
|
|
}
|