btc-pay-checker/internal/services/btc/zmq.go

72 lines
1.4 KiB
Go

package btc
import (
"encoding/hex"
"fmt"
"log"
zmq "github.com/pebbe/zmq4"
)
const (
TOPIC_RAWTX = "rawtx"
TOPIC_RAWBLOCK = "rawblock"
TOPIC_HASHBLOCK = "hashblock"
)
type Zmq struct {
subscriber *zmq.Socket
}
func (q *Zmq) Connect(address string) error {
subscriber, err := zmq.NewSocket(zmq.SUB)
if err != nil {
return fmt.Errorf("error creating ZeroMQ subscriber: %w", err)
}
if err := subscriber.Connect("tcp://" + address); err != nil {
return fmt.Errorf("error connecting to ZeroMQ socket: %w", err)
}
if err := subscriber.SetSubscribe(TOPIC_HASHBLOCK); err != nil {
return fmt.Errorf("subscriber.SetSubscribe: %w", err)
}
q.subscriber = subscriber
log.Println("subsribed to: " + address)
return nil
}
func (q *Zmq) Close() error {
return q.subscriber.Close()
}
func (q *Zmq) Listen(blocksChan chan<- string) {
if q.subscriber == nil {
log.Fatalln("subscriber cannot be nil")
return
}
log.Println("Listening for new blocks...")
for {
msgParts, err := q.subscriber.RecvMessageBytes(0)
if err != nil {
log.Println("Error receiving message:", err)
continue
}
if len(msgParts) < 2 {
log.Println("Received message part is too short:", msgParts)
continue
}
topic := string(msgParts[0])
switch topic {
case TOPIC_HASHBLOCK:
blockHash := hex.EncodeToString(msgParts[1])
blocksChan <- blockHash
case TOPIC_RAWTX:
// TODO: do something with raw tx
continue
}
}
}