Files
2026-04-29 14:27:16 +03:00

85 lines
1.6 KiB
Go

package mq
import (
"context"
"fmt"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
}
func NewRabbitMQ() (*RabbitMQ, error) {
url := os.Getenv("RABBITMQ_URL")
if url == "" {
url = "amqp://guest:guest@localhost:5672/"
}
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open channel: %w", err)
}
q, err := ch.QueueDeclare("bales_tasks", true, false, false, false, nil)
if err != nil {
ch.Close()
conn.Close()
return nil, fmt.Errorf("failed to declare queue: %w", err)
}
return &RabbitMQ{
conn: conn,
channel: ch,
queue: q,
}, nil
}
func (r *RabbitMQ) Publish(ctx context.Context, body []byte) error {
return r.channel.PublishWithContext(ctx, "", r.queue.Name, false, false, amqp.Publishing{
ContentType: "application/json",
Body: body,
})
}
func (r *RabbitMQ) Consume() ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
msgs, err := r.channel.Consume(r.queue.Name, "", false, false, false, false, nil)
if err != nil {
return nil, err
}
select {
case msg, ok := <-msgs:
if !ok {
return nil, fmt.Errorf("channel closed")
}
msg.Ack(false)
return msg.Body, nil
case <-ctx.Done():
return nil, fmt.Errorf("timeout waiting for message")
}
}
func (r *RabbitMQ) Close() {
if r.channel != nil {
r.channel.Close()
}
if r.conn != nil {
r.conn.Close()
}
}