66 lines
1.5 KiB
Go
66 lines
1.5 KiB
Go
package azsb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
)
|
|
|
|
func (a *AzBus) Publish(topic string, messages ...*message.Message) error {
|
|
if a.closed {
|
|
return fmt.Errorf("publisher is closed")
|
|
}
|
|
|
|
if len(messages) == 0 {
|
|
return nil
|
|
}
|
|
|
|
sender, err := a.client.NewSender(topic, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create sender for topic %s: %w", topic, err)
|
|
}
|
|
defer sender.Close(context.Background())
|
|
|
|
sbMessages := new(azservicebus.MessageBatch)
|
|
for _, msg := range messages {
|
|
sbMsg := &azservicebus.Message{
|
|
Body: msg.Payload,
|
|
}
|
|
|
|
// Copy metadata as application properties
|
|
if msg.Metadata != nil {
|
|
sbMsg.ApplicationProperties = make(map[string]interface{})
|
|
for key, value := range msg.Metadata {
|
|
sbMsg.ApplicationProperties[key] = value
|
|
}
|
|
}
|
|
|
|
// Set message ID if available
|
|
if msg.UUID != "" {
|
|
sbMsg.MessageID = &msg.UUID
|
|
}
|
|
|
|
err = sbMessages.AddMessage(sbMsg, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err = sender.SendMessageBatch(context.Background(), sbMessages, nil); err != nil {
|
|
a.logger.Error().
|
|
Err(err).
|
|
Str("topic", topic).
|
|
Int32("message_count", sbMessages.NumMessages()).
|
|
Msg("failed to send messages to azure service bus")
|
|
return fmt.Errorf("failed to send messages: %w", err)
|
|
}
|
|
|
|
a.logger.Debug().
|
|
Str("topic", topic).
|
|
Int32("message_count", sbMessages.NumMessages()).
|
|
Msg("published messages to azure service bus")
|
|
|
|
return nil
|
|
}
|