85 lines
1.6 KiB
Go
85 lines
1.6 KiB
Go
package mq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
type RabbitMQ struct {
|
|
conn *amqp.Connection
|
|
channel *amqp.Channel
|
|
queue amqp.Queue
|
|
}
|
|
|
|
func NewRabbitMQ() (*RabbitMQ, error) {
|
|
url := os.Getenv("RABBITMQ_URL")
|
|
if url == "" {
|
|
url = "amqp://guest:guest@localhost:5672/"
|
|
}
|
|
|
|
conn, err := amqp.Dial(url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
|
}
|
|
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, fmt.Errorf("failed to open channel: %w", err)
|
|
}
|
|
|
|
q, err := ch.QueueDeclare("bales_tasks", true, false, false, false, nil)
|
|
if err != nil {
|
|
ch.Close()
|
|
conn.Close()
|
|
return nil, fmt.Errorf("failed to declare queue: %w", err)
|
|
}
|
|
|
|
return &RabbitMQ{
|
|
conn: conn,
|
|
channel: ch,
|
|
queue: q,
|
|
}, nil
|
|
}
|
|
|
|
func (r *RabbitMQ) Publish(ctx context.Context, body []byte) error {
|
|
return r.channel.PublishWithContext(ctx, "", r.queue.Name, false, false, amqp.Publishing{
|
|
ContentType: "application/json",
|
|
Body: body,
|
|
})
|
|
}
|
|
|
|
func (r *RabbitMQ) Consume() ([]byte, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
msgs, err := r.channel.Consume(r.queue.Name, "", false, false, false, false, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case msg, ok := <-msgs:
|
|
if !ok {
|
|
return nil, fmt.Errorf("channel closed")
|
|
}
|
|
msg.Ack(false)
|
|
return msg.Body, nil
|
|
case <-ctx.Done():
|
|
return nil, fmt.Errorf("timeout waiting for message")
|
|
}
|
|
}
|
|
|
|
func (r *RabbitMQ) Close() {
|
|
if r.channel != nil {
|
|
r.channel.Close()
|
|
}
|
|
if r.conn != nil {
|
|
r.conn.Close()
|
|
}
|
|
}
|