diff --git a/backend/application/application.go b/backend/application/application.go index e3c3ad41..07a2fa85 100644 --- a/backend/application/application.go +++ b/backend/application/application.go @@ -64,7 +64,9 @@ import ( singleagentImpl "github.com/coze-dev/coze-studio/backend/crossdomain/impl/singleagent" variablesImpl "github.com/coze-dev/coze-studio/backend/crossdomain/impl/variables" workflowImpl "github.com/coze-dev/coze-studio/backend/crossdomain/impl/workflow" + "github.com/coze-dev/coze-studio/backend/infra/contract/eventbus" "github.com/coze-dev/coze-studio/backend/infra/impl/checkpoint" + implEventbus "github.com/coze-dev/coze-studio/backend/infra/impl/eventbus" ) type eventbusImpl struct { @@ -144,6 +146,7 @@ func Init(ctx context.Context) (err error) { func initEventBus(infra *appinfra.AppDependencies) *eventbusImpl { e := &eventbusImpl{} + eventbus.SetDefaultSVC(implEventbus.NewConsumerService()) e.resourceEventBus = search.NewResourceEventBus(infra.ResourceEventProducer) e.projectEventBus = search.NewProjectEventBus(infra.AppEventProducer) diff --git a/backend/application/knowledge/init.go b/backend/application/knowledge/init.go index 09888d0b..de7b710f 100644 --- a/backend/application/knowledge/init.go +++ b/backend/application/knowledge/init.go @@ -175,7 +175,7 @@ func InitService(c *ServiceComponents) (*KnowledgeApplicationService, error) { ModelFactory: chatmodelImpl.NewDefaultFactory(), }) - if err = eventbus.RegisterConsumer(nameServer, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, knowledgeEventHandler); err != nil { + if err = eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, knowledgeEventHandler); err != nil { return nil, fmt.Errorf("register knowledge consumer failed, err=%w", err) } diff --git a/backend/application/search/init.go b/backend/application/search/init.go index 430e0c30..d19123ae 100644 --- a/backend/application/search/init.go +++ b/backend/application/search/init.go @@ -71,14 +71,14 @@ func InitService(ctx context.Context, s *ServiceComponents) (*SearchApplicationS logs.Infof("start search domain consumer...") nameServer := os.Getenv(consts.MQServer) - err := eventbus.RegisterConsumer(nameServer, consts.RMQTopicApp, consts.RMQConsumeGroupApp, searchConsumer) + err := eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicApp, consts.RMQConsumeGroupApp, searchConsumer) if err != nil { return nil, fmt.Errorf("register search consumer failed, err=%w", err) } searchResourceConsumer := search.NewResourceHandler(ctx, s.ESClient) - err = eventbus.RegisterConsumer(nameServer, consts.RMQTopicResource, consts.RMQConsumeGroupResource, searchResourceConsumer) + err = eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicResource, consts.RMQConsumeGroupResource, searchResourceConsumer) if err != nil { return nil, fmt.Errorf("register search consumer failed, err=%w", err) } diff --git a/backend/domain/knowledge/service/knowledge_integration_test.go b/backend/domain/knowledge/service/knowledge_integration_test.go index c5f7586b..20042ca6 100644 --- a/backend/domain/knowledge/service/knowledge_integration_test.go +++ b/backend/domain/knowledge/service/knowledge_integration_test.go @@ -178,7 +178,7 @@ func (suite *KnowledgeTestSuite) SetupSuite() { suite.handler = knowledgeEventHandler - err = eventbus.RegisterConsumer(rmqEndpoint, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, suite) + err = eventbus.DefaultSVC().RegisterConsumer(rmqEndpoint, consts.RMQTopicKnowledge, consts.RMQConsumeGroupKnowledge, suite) if err != nil { panic(err) } diff --git a/backend/domain/workflow/internal/compose/test/question_answer_test.go b/backend/domain/workflow/internal/compose/test/question_answer_test.go index 549a7071..6468e0d1 100644 --- a/backend/domain/workflow/internal/compose/test/question_answer_test.go +++ b/backend/domain/workflow/internal/compose/test/question_answer_test.go @@ -46,8 +46,8 @@ import ( "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/exit" "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes/qa" repo2 "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/repo" - "github.com/coze-dev/coze-studio/backend/infra/impl/cache/redis" schema2 "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/schema" + "github.com/coze-dev/coze-studio/backend/infra/impl/cache/redis" "github.com/coze-dev/coze-studio/backend/infra/impl/checkpoint" mock "github.com/coze-dev/coze-studio/backend/internal/mock/infra/contract/idgen" storageMock "github.com/coze-dev/coze-studio/backend/internal/mock/infra/contract/storage" diff --git a/backend/infra/contract/eventbus/eventbus.go b/backend/infra/contract/eventbus/eventbus.go index 92f12729..27e8a546 100644 --- a/backend/infra/contract/eventbus/eventbus.go +++ b/backend/infra/contract/eventbus/eventbus.go @@ -24,7 +24,20 @@ type Producer interface { BatchSend(ctx context.Context, bodyArr [][]byte, opts ...SendOpt) error } -type Consumer interface{} +var defaultSVC ConsumerService + +func SetDefaultSVC(svc ConsumerService) { + defaultSVC = svc +} + +func GetDefaultSVC() ConsumerService { + return defaultSVC +} + +type ConsumerService interface { + RegisterConsumer(nameServer, topic, group string, consumerHandler ConsumerHandler, opts ...ConsumerOpt) error +} + type ConsumerHandler interface { HandleMessage(ctx context.Context, msg *Message) error } diff --git a/backend/infra/impl/eventbus/eventbus.go b/backend/infra/impl/eventbus/eventbus.go index 7d045915..3abac197 100644 --- a/backend/infra/impl/eventbus/eventbus.go +++ b/backend/infra/impl/eventbus/eventbus.go @@ -29,13 +29,23 @@ import ( type ( Producer = eventbus.Producer - Consumer = eventbus.Consumer + ConsumerService = eventbus.ConsumerService ConsumerHandler = eventbus.ConsumerHandler ConsumerOpt = eventbus.ConsumerOpt Message = eventbus.Message ) -func RegisterConsumer(nameServer, topic, group string, consumerHandler eventbus.ConsumerHandler, opts ...eventbus.ConsumerOpt) error { +type consumerServiceImpl struct{} + +func NewConsumerService() ConsumerService { + return &consumerServiceImpl{} +} + +func DefaultSVC() ConsumerService { + return eventbus.GetDefaultSVC() +} + +func (consumerServiceImpl) RegisterConsumer(nameServer, topic, group string, consumerHandler eventbus.ConsumerHandler, opts ...eventbus.ConsumerOpt) error { tp := os.Getenv(consts.MQTypeKey) switch tp { case "nsq":