package azsb import ( "context" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/ThreeDotsLabs/watermill/message" ) func (a *AzBus) Publish(topic string, messages ...*message.Message) error { if a.closed { return fmt.Errorf("publisher is closed") } if len(messages) == 0 { return nil } sender, err := a.client.NewSender(topic, nil) if err != nil { return fmt.Errorf("failed to create sender for topic %s: %w", topic, err) } defer sender.Close(context.Background()) sbMessages := new(azservicebus.MessageBatch) for _, msg := range messages { sbMsg := &azservicebus.Message{ Body: msg.Payload, } // Copy metadata as application properties if msg.Metadata != nil { sbMsg.ApplicationProperties = make(map[string]interface{}) for key, value := range msg.Metadata { sbMsg.ApplicationProperties[key] = value } } // Set message ID if available if msg.UUID != "" { sbMsg.MessageID = &msg.UUID } err = sbMessages.AddMessage(sbMsg, nil) if err != nil { return err } } if err = sender.SendMessageBatch(context.Background(), sbMessages, nil); err != nil { a.logger.Error(). Err(err). Str("topic", topic). Int32("message_count", sbMessages.NumMessages()). Msg("failed to send messages to azure service bus") return fmt.Errorf("failed to send messages: %w", err) } a.logger.Debug(). Str("topic", topic). Int32("message_count", sbMessages.NumMessages()). Msg("published messages to azure service bus") return nil }