package mq import ( "context" "fmt" "os" "time" amqp "github.com/rabbitmq/amqp091-go" ) type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewRabbitMQ() (*RabbitMQ, error) { url := os.Getenv("RABBITMQ_URL") if url == "" { url = "amqp://guest:guest@localhost:5672/" } conn, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } ch, err := conn.Channel() if err != nil { conn.Close() return nil, fmt.Errorf("failed to open channel: %w", err) } q, err := ch.QueueDeclare("bales_tasks", true, false, false, false, nil) if err != nil { ch.Close() conn.Close() return nil, fmt.Errorf("failed to declare queue: %w", err) } return &RabbitMQ{ conn: conn, channel: ch, queue: q, }, nil } func (r *RabbitMQ) Publish(ctx context.Context, body []byte) error { return r.channel.PublishWithContext(ctx, "", r.queue.Name, false, false, amqp.Publishing{ ContentType: "application/json", Body: body, }) } func (r *RabbitMQ) Consume() ([]byte, error) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msgs, err := r.channel.Consume(r.queue.Name, "", false, false, false, false, nil) if err != nil { return nil, err } select { case msg, ok := <-msgs: if !ok { return nil, fmt.Errorf("channel closed") } msg.Ack(false) return msg.Body, nil case <-ctx.Done(): return nil, fmt.Errorf("timeout waiting for message") } } func (r *RabbitMQ) Close() { if r.channel != nil { r.channel.Close() } if r.conn != nil { r.conn.Close() } }