chore: add qe and integrations with rabbit mq
This commit is contained in:
@@ -0,0 +1,84 @@
|
||||
package mq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
type RabbitMQ struct {
|
||||
conn *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
queue amqp.Queue
|
||||
}
|
||||
|
||||
func NewRabbitMQ() (*RabbitMQ, error) {
|
||||
url := os.Getenv("RABBITMQ_URL")
|
||||
if url == "" {
|
||||
url = "amqp://guest:guest@localhost:5672/"
|
||||
}
|
||||
|
||||
conn, err := amqp.Dial(url)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("failed to open channel: %w", err)
|
||||
}
|
||||
|
||||
q, err := ch.QueueDeclare("bales_tasks", true, false, false, false, nil)
|
||||
if err != nil {
|
||||
ch.Close()
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("failed to declare queue: %w", err)
|
||||
}
|
||||
|
||||
return &RabbitMQ{
|
||||
conn: conn,
|
||||
channel: ch,
|
||||
queue: q,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Publish(ctx context.Context, body []byte) error {
|
||||
return r.channel.PublishWithContext(ctx, "", r.queue.Name, false, false, amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: body,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Consume() ([]byte, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
msgs, err := r.channel.Consume(r.queue.Name, "", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
select {
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("channel closed")
|
||||
}
|
||||
msg.Ack(false)
|
||||
return msg.Body, nil
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("timeout waiting for message")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Close() {
|
||||
if r.channel != nil {
|
||||
r.channel.Close()
|
||||
}
|
||||
if r.conn != nil {
|
||||
r.conn.Close()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user