feat: refactor the mq’s RegisterConsumer method to make it easier to inject (#581)

This commit is contained in:
Ryo 2025-08-05 18:05:32 +08:00 committed by GitHub
parent c11780b28d
commit c9c900adb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 34 additions and 8 deletions

View File

@ -64,7 +64,9 @@ import (
singleagentImpl "github.com/coze-dev/coze-studio/backend/crossdomain/impl/singleagent"
variablesImpl "github.com/coze-dev/coze-studio/backend/crossdomain/impl/variables"
workflowImpl "github.com/coze-dev/coze-studio/backend/crossdomain/impl/workflow"
"github.com/coze-dev/coze-studio/backend/infra/contract/eventbus"
"github.com/coze-dev/coze-studio/backend/infra/impl/checkpoint"
implEventbus "github.com/coze-dev/coze-studio/backend/infra/impl/eventbus"
)
type eventbusImpl struct {
@ -144,6 +146,7 @@ func Init(ctx context.Context) (err error) {
func initEventBus(infra *appinfra.AppDependencies) *eventbusImpl {
e := &eventbusImpl{}
eventbus.SetDefaultSVC(implEventbus.NewConsumerService())
e.resourceEventBus = search.NewResourceEventBus(infra.ResourceEventProducer)
e.projectEventBus = search.NewProjectEventBus(infra.AppEventProducer)

View File

@ -175,7 +175,7 @@ func InitService(c *ServiceComponents) (*KnowledgeApplicationService, error) {
ModelFactory: chatmodelImpl.NewDefaultFactory(),
})
if err = eventbus.RegisterConsumer(nameServer, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, knowledgeEventHandler); err != nil {
if err = eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, knowledgeEventHandler); err != nil {
return nil, fmt.Errorf("register knowledge consumer failed, err=%w", err)
}

View File

@ -71,14 +71,14 @@ func InitService(ctx context.Context, s *ServiceComponents) (*SearchApplicationS
logs.Infof("start search domain consumer...")
nameServer := os.Getenv(consts.MQServer)
err := eventbus.RegisterConsumer(nameServer, consts.RMQTopicApp, consts.RMQConsumeGroupApp, searchConsumer)
err := eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicApp, consts.RMQConsumeGroupApp, searchConsumer)
if err != nil {
return nil, fmt.Errorf("register search consumer failed, err=%w", err)
}
searchResourceConsumer := search.NewResourceHandler(ctx, s.ESClient)
err = eventbus.RegisterConsumer(nameServer, consts.RMQTopicResource, consts.RMQConsumeGroupResource, searchResourceConsumer)
err = eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicResource, consts.RMQConsumeGroupResource, searchResourceConsumer)
if err != nil {
return nil, fmt.Errorf("register search consumer failed, err=%w", err)
}

View File

@ -178,7 +178,7 @@ func (suite *KnowledgeTestSuite) SetupSuite() {
suite.handler = knowledgeEventHandler
err = eventbus.RegisterConsumer(rmqEndpoint, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, suite)
err = eventbus.DefaultSVC().RegisterConsumer(rmqEndpoint, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, suite)
if err != nil {
panic(err)
}

View File

@ -46,8 +46,8 @@ import (
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/exit"
"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/qa"
repo2 "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/repo"
"github.com/coze-dev/coze-studio/backend/infra/impl/cache/redis"
schema2 "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/schema"
"github.com/coze-dev/coze-studio/backend/infra/impl/cache/redis"
"github.com/coze-dev/coze-studio/backend/infra/impl/checkpoint"
mock "github.com/coze-dev/coze-studio/backend/internal/mock/infra/contract/idgen"
storageMock "github.com/coze-dev/coze-studio/backend/internal/mock/infra/contract/storage"

View File

@ -24,7 +24,20 @@ type Producer interface {
BatchSend(ctx context.Context, bodyArr [][]byte, opts ...SendOpt) error
}
type Consumer interface{}
var defaultSVC ConsumerService
func SetDefaultSVC(svc ConsumerService) {
defaultSVC = svc
}
func GetDefaultSVC() ConsumerService {
return defaultSVC
}
type ConsumerService interface {
RegisterConsumer(nameServer, topic, group string, consumerHandler ConsumerHandler, opts ...ConsumerOpt) error
}
type ConsumerHandler interface {
HandleMessage(ctx context.Context, msg *Message) error
}

View File

@ -29,13 +29,23 @@ import (
type (
Producer = eventbus.Producer
Consumer = eventbus.Consumer
ConsumerService = eventbus.ConsumerService
ConsumerHandler = eventbus.ConsumerHandler
ConsumerOpt = eventbus.ConsumerOpt
Message = eventbus.Message
)
func RegisterConsumer(nameServer, topic, group string, consumerHandler eventbus.ConsumerHandler, opts ...eventbus.ConsumerOpt) error {
type consumerServiceImpl struct{}
func NewConsumerService() ConsumerService {
return &consumerServiceImpl{}
}
func DefaultSVC() ConsumerService {
return eventbus.GetDefaultSVC()
}
func (consumerServiceImpl) RegisterConsumer(nameServer, topic, group string, consumerHandler eventbus.ConsumerHandler, opts ...eventbus.ConsumerOpt) error {
tp := os.Getenv(consts.MQTypeKey)
switch tp {
case "nsq":