From 263a75b1c001001f7719e19802dbb39eeca6b76c Mon Sep 17 00:00:00 2001 From: Ryo Date: Wed, 27 Aug 2025 12:06:55 +0800 Subject: [PATCH] feat(infra): add file listing support (#1836) --- backend/infra/contract/storage/storage.go | 28 ++++ backend/infra/impl/storage/minio/minio.go | 87 +++++------ .../infra/impl/storage/minio/minio_imagex.go | 81 ++++++++++ backend/infra/impl/storage/s3/s3.go | 138 ++++++++++++------ backend/infra/impl/storage/s3/s3_imagex.go | 81 ++++++++++ backend/infra/impl/storage/tos/tos.go | 127 ++++++++++------ backend/infra/impl/storage/tos/tos_imagex.go | 81 ++++++++++ .../infra/contract/storage/storage_mock.go | 30 ++++ 8 files changed, 512 insertions(+), 141 deletions(-) create mode 100644 backend/infra/impl/storage/minio/minio_imagex.go create mode 100644 backend/infra/impl/storage/s3/s3_imagex.go create mode 100644 backend/infra/impl/storage/tos/tos_imagex.go diff --git a/backend/infra/contract/storage/storage.go b/backend/infra/contract/storage/storage.go index 70b09a87..4473b215 100644 --- a/backend/infra/contract/storage/storage.go +++ b/backend/infra/contract/storage/storage.go @@ -19,6 +19,7 @@ package storage import ( "context" "io" + "time" ) //go:generate mockgen -destination ../../../internal/mock/infra/contract/storage/storage_mock.go -package mock -source storage.go Factory @@ -28,6 +29,13 @@ type Storage interface { GetObject(ctx context.Context, objectKey string) ([]byte, error) DeleteObject(ctx context.Context, objectKey string) error GetObjectUrl(ctx context.Context, objectKey string, opts ...GetOptFn) (string, error) + // ListObjects returns all objects with the specified prefix. + // It may return a large number of objects, consider using ListObjectsPaginated for better performance. + ListObjects(ctx context.Context, prefix string) ([]*FileInfo, error) + + // ListObjectsPaginated returns objects with pagination support. + // Use this method when dealing with large number of objects. + ListObjectsPaginated(ctx context.Context, input *ListObjectsPaginatedInput) (*ListObjectsPaginatedOutput, error) } type SecurityToken struct { @@ -37,3 +45,23 @@ type SecurityToken struct { ExpiredTime string `thrift:"expired_time,4" frugal:"4,default,string" json:"expired_time"` CurrentTime string `thrift:"current_time,5" frugal:"5,default,string" json:"current_time"` } + +type ListObjectsPaginatedInput struct { + Prefix string + PageSize int + Cursor string +} + +type ListObjectsPaginatedOutput struct { + Files []*FileInfo + Cursor string + // false: All results have been returned + // true: There are more results to return + IsTruncated bool +} +type FileInfo struct { + Key string + LastModified time.Time + ETag string + Size int64 +} diff --git a/backend/infra/impl/storage/minio/minio.go b/backend/infra/impl/storage/minio/minio.go index 12c7127f..6858d8ce 100644 --- a/backend/infra/impl/storage/minio/minio.go +++ b/backend/infra/impl/storage/minio/minio.go @@ -24,22 +24,17 @@ import ( "log" "math/rand" "net/url" - "os" - "strings" "time" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/coze-dev/coze-studio/backend/infra/contract/imagex" "github.com/coze-dev/coze-studio/backend/infra/contract/storage" "github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy" - "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" - "github.com/coze-dev/coze-studio/backend/types/consts" + "github.com/coze-dev/coze-studio/backend/pkg/logs" ) type minioClient struct { - host string client *minio.Client accessKeyID string secretAccessKey string @@ -47,11 +42,12 @@ type minioClient struct { endpoint string } -func NewStorageImagex(ctx context.Context, endpoint, accessKeyID, secretAccessKey, bucketName string, useSSL bool) (imagex.ImageX, error) { +func New(ctx context.Context, endpoint, accessKeyID, secretAccessKey, bucketName string, useSSL bool) (storage.Storage, error) { m, err := getMinioClient(ctx, endpoint, accessKeyID, secretAccessKey, bucketName, useSSL) if err != nil { return nil, err } + return m, nil } @@ -76,14 +72,8 @@ func getMinioClient(_ context.Context, endpoint, accessKeyID, secretAccessKey, b if err != nil { return nil, fmt.Errorf("init minio client failed %v", err) } - return m, nil -} -func New(ctx context.Context, endpoint, accessKeyID, secretAccessKey, bucketName string, useSSL bool) (storage.Storage, error) { - m, err := getMinioClient(ctx, endpoint, accessKeyID, secretAccessKey, bucketName, useSSL) - if err != nil { - return nil, err - } + // m.test() return m, nil } @@ -109,6 +99,8 @@ func (m *minioClient) test() { ctx := context.Background() objectName := fmt.Sprintf("test-file-%d.txt", rand.Int()) + m.ListObjects(ctx, "") + err := m.PutObject(ctx, objectName, []byte("hello content"), storage.WithContentType("text/plain")) if err != nil { log.Fatalf("upload file failed: %v", err) @@ -223,47 +215,48 @@ func (m *minioClient) GetObjectUrl(ctx context.Context, objectKey string, opts . return presignedURL.String(), nil } -func (m *minioClient) GetUploadHost(ctx context.Context) string { - currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx) - if !ok { - return "" +func (m *minioClient) ListObjectsPaginated(ctx context.Context, input *storage.ListObjectsPaginatedInput) (*storage.ListObjectsPaginatedOutput, error) { + if input == nil { + return nil, fmt.Errorf("input cannot be nil") } - return currentHost + consts.ApplyUploadActionURI -} - -func (m *minioClient) GetServerID() string { - return "" -} - -func (m *minioClient) GetUploadAuth(ctx context.Context, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { - scheme := strings.ToLower(os.Getenv(consts.StorageUploadHTTPScheme)) - if scheme == "" { - scheme = "http" + if input.PageSize <= 0 { + return nil, fmt.Errorf("page size must be positive") } - return &imagex.SecurityToken{ - AccessKeyID: "", - SecretAccessKey: "", - SessionToken: "", - ExpiredTime: time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"), - CurrentTime: time.Now().Format("2006-01-02 15:04:05"), - HostScheme: scheme, - }, nil -} -func (m *minioClient) GetResourceURL(ctx context.Context, uri string, opts ...imagex.GetResourceOpt) (*imagex.ResourceURL, error) { - url, err := m.GetObjectUrl(ctx, uri) + files, err := m.ListObjects(ctx, input.Prefix) if err != nil { return nil, err } - return &imagex.ResourceURL{ - URL: url, + + return &storage.ListObjectsPaginatedOutput{ + Files: files, + IsTruncated: false, + Cursor: "", }, nil } -func (m *minioClient) Upload(ctx context.Context, data []byte, opts ...imagex.UploadAuthOpt) (*imagex.UploadResult, error) { - return nil, nil -} +func (m *minioClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { + opts := minio.ListObjectsOptions{ + Prefix: prefix, + Recursive: true, + } -func (m *minioClient) GetUploadAuthWithExpire(ctx context.Context, expire time.Duration, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { - return nil, nil + objectCh := m.client.ListObjects(ctx, m.bucketName, opts) + + var files []*storage.FileInfo + for object := range objectCh { + if object.Err != nil { + return nil, object.Err + } + files = append(files, &storage.FileInfo{ + Key: object.Key, + LastModified: object.LastModified, + ETag: object.ETag, + Size: object.Size, + }) + + logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size) + } + + return files, nil } diff --git a/backend/infra/impl/storage/minio/minio_imagex.go b/backend/infra/impl/storage/minio/minio_imagex.go new file mode 100644 index 00000000..34b87f89 --- /dev/null +++ b/backend/infra/impl/storage/minio/minio_imagex.go @@ -0,0 +1,81 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package minio + +import ( + "context" + "os" + "strings" + "time" + + "github.com/coze-dev/coze-studio/backend/infra/contract/imagex" + "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" + "github.com/coze-dev/coze-studio/backend/types/consts" +) + +func NewStorageImagex(ctx context.Context, endpoint, accessKeyID, secretAccessKey, bucketName string, useSSL bool) (imagex.ImageX, error) { + m, err := getMinioClient(ctx, endpoint, accessKeyID, secretAccessKey, bucketName, useSSL) + if err != nil { + return nil, err + } + return m, nil +} + +func (m *minioClient) GetUploadHost(ctx context.Context) string { + currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx) + if !ok { + return "" + } + return currentHost + consts.ApplyUploadActionURI +} + +func (m *minioClient) GetServerID() string { + return "" +} + +func (m *minioClient) GetUploadAuth(ctx context.Context, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { + scheme := strings.ToLower(os.Getenv(consts.StorageUploadHTTPScheme)) + if scheme == "" { + scheme = "http" + } + return &imagex.SecurityToken{ + AccessKeyID: "", + SecretAccessKey: "", + SessionToken: "", + ExpiredTime: time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"), + CurrentTime: time.Now().Format("2006-01-02 15:04:05"), + HostScheme: scheme, + }, nil +} + +func (m *minioClient) GetResourceURL(ctx context.Context, uri string, opts ...imagex.GetResourceOpt) (*imagex.ResourceURL, error) { + url, err := m.GetObjectUrl(ctx, uri) + if err != nil { + return nil, err + } + return &imagex.ResourceURL{ + URL: url, + }, nil +} + +func (m *minioClient) Upload(ctx context.Context, data []byte, opts ...imagex.UploadAuthOpt) (*imagex.UploadResult, error) { + return nil, nil +} + +func (m *minioClient) GetUploadAuthWithExpire(ctx context.Context, expire time.Duration, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { + return nil, nil +} diff --git a/backend/infra/impl/storage/s3/s3.go b/backend/infra/impl/storage/s3/s3.go index 1b848d20..e21ef0fb 100644 --- a/backend/infra/impl/storage/s3/s3.go +++ b/backend/infra/impl/storage/s3/s3.go @@ -28,14 +28,9 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/coze-dev/coze-studio/backend/infra/contract/imagex" "github.com/coze-dev/coze-studio/backend/infra/contract/storage" "github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy" - "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" - "github.com/coze-dev/coze-studio/backend/pkg/errorx" "github.com/coze-dev/coze-studio/backend/pkg/logs" - "github.com/coze-dev/coze-studio/backend/types/consts" - "github.com/coze-dev/coze-studio/backend/types/errno" ) type s3Client struct { @@ -43,7 +38,7 @@ type s3Client struct { bucketName string } -func NewStorageImagex(ctx context.Context, ak, sk, bucketName, endpoint, region string) (imagex.ImageX, error) { +func New(ctx context.Context, ak, sk, bucketName, endpoint, region string) (storage.Storage, error) { t, err := getS3Client(ctx, ak, sk, bucketName, endpoint, region) if err != nil { return nil, err @@ -90,14 +85,6 @@ func getS3Client(ctx context.Context, ak, sk, bucketName, endpoint, region strin return t, nil } -func New(ctx context.Context, ak, sk, bucketName, endpoint, region string) (storage.Storage, error) { - t, err := getS3Client(ctx, ak, sk, bucketName, endpoint, region) - if err != nil { - return nil, err - } - return t, nil -} - func (t *s3Client) test() { // test upload objectKey := fmt.Sprintf("test-%s.txt", time.Now().Format("20060102150405")) @@ -252,47 +239,106 @@ func (t *s3Client) GetObjectUrl(ctx context.Context, objectKey string, opts ...s return req.URL, nil } -func (i *s3Client) GetUploadHost(ctx context.Context) string { - currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx) - if !ok { - return "" +func (t *s3Client) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { + client := t.client + bucket := t.bucketName + const ( + DefaultPageSize = 100 + MaxListObjects = 10000 + ) + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int32(DefaultPageSize), } - return currentHost + consts.ApplyUploadActionURI -} -func (t *s3Client) GetServerID() string { - return "" -} + paginator := s3.NewListObjectsV2Paginator(client, input) -func (t *s3Client) GetUploadAuth(ctx context.Context, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { - scheme, ok := ctxcache.Get[string](ctx, consts.RequestSchemeKeyInCtx) - if !ok { - return nil, errorx.New(errno.ErrUploadHostSchemaNotExistCode) + var files []*storage.FileInfo + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get page, %v", err) + } + for _, obj := range page.Contents { + f := &storage.FileInfo{} + if obj.Key != nil { + f.Key = *obj.Key + } + if obj.LastModified != nil { + f.LastModified = *obj.LastModified + } + if obj.ETag != nil { + f.ETag = *obj.ETag + } + if obj.Size != nil { + f.Size = *obj.Size + } + + files = append(files, f) + + } + + if len(files) >= MaxListObjects { + logs.CtxErrorf(ctx, "[ListObjects] max list objects reached, total: %d", len(files)) + break + } } - return &imagex.SecurityToken{ - AccessKeyID: "", - SecretAccessKey: "", - SessionToken: "", - ExpiredTime: time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"), - CurrentTime: time.Now().Format("2006-01-02 15:04:05"), - HostScheme: scheme, - }, nil + + return files, nil } -func (t *s3Client) GetResourceURL(ctx context.Context, uri string, opts ...imagex.GetResourceOpt) (*imagex.ResourceURL, error) { - url, err := t.GetObjectUrl(ctx, uri) +func (t *s3Client) ListObjectsPaginated(ctx context.Context, input *storage.ListObjectsPaginatedInput) (*storage.ListObjectsPaginatedOutput, error) { + if input == nil { + return nil, fmt.Errorf("input cannot be nil") + } + if input.PageSize <= 0 { + return nil, fmt.Errorf("page size must be positive") + } + + client := t.client + bucket := t.bucketName + + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(input.Prefix), + MaxKeys: aws.Int32(int32(input.PageSize)), + ContinuationToken: aws.String(input.Cursor), + } + + p, err := client.ListObjectsV2(ctx, listObjectsInput) if err != nil { return nil, err } - return &imagex.ResourceURL{ - URL: url, - }, nil -} -func (t *s3Client) Upload(ctx context.Context, data []byte, opts ...imagex.UploadAuthOpt) (*imagex.UploadResult, error) { - return nil, nil -} + var files []*storage.FileInfo + for _, obj := range p.Contents { + f := &storage.FileInfo{} + if obj.Key != nil { + f.Key = *obj.Key + } + if obj.LastModified != nil { + f.LastModified = *obj.LastModified + } + if obj.ETag != nil { + f.ETag = *obj.ETag + } + if obj.Size != nil { + f.Size = *obj.Size + } + files = append(files, f) + } -func (t *s3Client) GetUploadAuthWithExpire(ctx context.Context, expire time.Duration, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { - return nil, nil + output := &storage.ListObjectsPaginatedOutput{ + Files: files, + } + if p.IsTruncated != nil { + output.IsTruncated = *p.IsTruncated + } + if p.NextContinuationToken != nil { + output.Cursor = *p.NextContinuationToken + } + + return output, nil } diff --git a/backend/infra/impl/storage/s3/s3_imagex.go b/backend/infra/impl/storage/s3/s3_imagex.go new file mode 100644 index 00000000..8eb1fb64 --- /dev/null +++ b/backend/infra/impl/storage/s3/s3_imagex.go @@ -0,0 +1,81 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package s3 + +import ( + "context" + "time" + + "github.com/coze-dev/coze-studio/backend/infra/contract/imagex" + "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" + "github.com/coze-dev/coze-studio/backend/pkg/errorx" + "github.com/coze-dev/coze-studio/backend/types/consts" + "github.com/coze-dev/coze-studio/backend/types/errno" +) + +func NewStorageImagex(ctx context.Context, ak, sk, bucketName, endpoint, region string) (imagex.ImageX, error) { + t, err := getS3Client(ctx, ak, sk, bucketName, endpoint, region) + if err != nil { + return nil, err + } + return t, nil +} + +func (t *s3Client) GetUploadAuth(ctx context.Context, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { + scheme, ok := ctxcache.Get[string](ctx, consts.RequestSchemeKeyInCtx) + if !ok { + return nil, errorx.New(errno.ErrUploadHostSchemaNotExistCode) + } + return &imagex.SecurityToken{ + AccessKeyID: "", + SecretAccessKey: "", + SessionToken: "", + ExpiredTime: time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"), + CurrentTime: time.Now().Format("2006-01-02 15:04:05"), + HostScheme: scheme, + }, nil +} + +func (t *s3Client) GetResourceURL(ctx context.Context, uri string, opts ...imagex.GetResourceOpt) (*imagex.ResourceURL, error) { + url, err := t.GetObjectUrl(ctx, uri) + if err != nil { + return nil, err + } + return &imagex.ResourceURL{ + URL: url, + }, nil +} + +func (t *s3Client) Upload(ctx context.Context, data []byte, opts ...imagex.UploadAuthOpt) (*imagex.UploadResult, error) { + return nil, nil +} + +func (t *s3Client) GetUploadAuthWithExpire(ctx context.Context, expire time.Duration, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { + return nil, nil +} + +func (i *s3Client) GetUploadHost(ctx context.Context) string { + currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx) + if !ok { + return "" + } + return currentHost + consts.ApplyUploadActionURI +} + +func (t *s3Client) GetServerID() string { + return "" +} diff --git a/backend/infra/impl/storage/tos/tos.go b/backend/infra/impl/storage/tos/tos.go index d7ea0168..4a383fb3 100644 --- a/backend/infra/impl/storage/tos/tos.go +++ b/backend/infra/impl/storage/tos/tos.go @@ -22,20 +22,16 @@ import ( "fmt" "io" "net/http" - "os" "strings" "time" "github.com/volcengine/ve-tos-golang-sdk/v2/tos" "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum" - "github.com/coze-dev/coze-studio/backend/infra/contract/imagex" "github.com/coze-dev/coze-studio/backend/infra/contract/storage" "github.com/coze-dev/coze-studio/backend/infra/impl/storage/proxy" - "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" "github.com/coze-dev/coze-studio/backend/pkg/lang/conv" "github.com/coze-dev/coze-studio/backend/pkg/logs" - "github.com/coze-dev/coze-studio/backend/types/consts" ) type tosClient struct { @@ -43,11 +39,12 @@ type tosClient struct { bucketName string } -func NewStorageImagex(ctx context.Context, ak, sk, bucketName, endpoint, region string) (imagex.ImageX, error) { +func New(ctx context.Context, ak, sk, bucketName, endpoint, region string) (storage.Storage, error) { t, err := getTosClient(ctx, ak, sk, bucketName, endpoint, region) if err != nil { return nil, err } + // t.test() return t, nil } @@ -69,19 +66,15 @@ func getTosClient(ctx context.Context, ak, sk, bucketName, endpoint, region stri if err != nil { return nil, err } - return t, nil -} -func New(ctx context.Context, ak, sk, bucketName, endpoint, region string) (storage.Storage, error) { - t, err := getTosClient(ctx, ak, sk, bucketName, endpoint, region) - if err != nil { - return nil, err - } - // t.test() return t, nil } func (t *tosClient) test() { + // test list objects + ctx := context.Background() + t.ListObjects(ctx, "") + // test upload objectKey := fmt.Sprintf("test-%s.txt", time.Now().Format("20060102150405")) err := t.PutObject(context.Background(), objectKey, []byte("hello world")) @@ -129,7 +122,7 @@ func (t *tosClient) CheckAndCreateBucket(ctx context.Context) error { if serverErr.StatusCode == http.StatusNotFound { // Bucket does not exist logs.CtxInfof(ctx, "Bucket not found.") - resp, err := client.CreateBucketV2(context.Background(), &tos.CreateBucketV2Input{ + resp, err := client.CreateBucketV2(ctx, &tos.CreateBucketV2Input{ Bucket: bucketName, ACL: enum.ACLPrivate, }) @@ -183,6 +176,7 @@ func (t *tosClient) PutObjectWithReader(ctx context.Context, objectKey string, c } _, err := client.PutObjectV2(ctx, input) + return err } @@ -246,47 +240,84 @@ func (t *tosClient) GetObjectUrl(ctx context.Context, objectKey string, opts ... return output.SignedUrl, nil } -func (i *tosClient) GetUploadHost(ctx context.Context) string { - currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx) - if !ok { - return "" +func (t *tosClient) ListObjectsPaginated(ctx context.Context, input *storage.ListObjectsPaginatedInput) (*storage.ListObjectsPaginatedOutput, error) { + if input == nil { + return nil, fmt.Errorf("input cannot be nil") } - return currentHost + consts.ApplyUploadActionURI -} - -func (t *tosClient) GetServerID() string { - return "" -} - -func (t *tosClient) GetUploadAuth(ctx context.Context, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { - scheme := strings.ToLower(os.Getenv(consts.StorageUploadHTTPScheme)) - if scheme == "" { - scheme = "http" + if input.PageSize <= 0 { + return nil, fmt.Errorf("page size must be positive") } - return &imagex.SecurityToken{ - AccessKeyID: "", - SecretAccessKey: "", - SessionToken: "", - ExpiredTime: time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"), - CurrentTime: time.Now().Format("2006-01-02 15:04:05"), - HostScheme: scheme, - }, nil -} -func (t *tosClient) GetResourceURL(ctx context.Context, uri string, opts ...imagex.GetResourceOpt) (*imagex.ResourceURL, error) { - url, err := t.GetObjectUrl(ctx, uri) + output, err := t.client.ListObjectsV2(ctx, &tos.ListObjectsV2Input{ + Bucket: t.bucketName, + ListObjectsInput: tos.ListObjectsInput{ + MaxKeys: int(input.PageSize), + Marker: input.Cursor, + Prefix: input.Prefix, + }, + }) if err != nil { - return nil, err + return nil, fmt.Errorf("list objects failed, err: %w", err) } - return &imagex.ResourceURL{ - URL: url, + + files := make([]*storage.FileInfo, 0, len(output.Contents)) + for _, obj := range output.Contents { + if obj.Size == 0 && strings.HasSuffix(obj.Key, "/") { + logs.CtxDebugf(ctx, "[ListObjectsPaginated] skip dir: %s", obj.Key) + continue + } + + files = append(files, &storage.FileInfo{ + Key: obj.Key, + LastModified: obj.LastModified, + ETag: obj.ETag, + Size: obj.Size, + }) + } + + return &storage.ListObjectsPaginatedOutput{ + Files: files, + Cursor: output.NextMarker, + IsTruncated: output.IsTruncated, }, nil } -func (t *tosClient) Upload(ctx context.Context, data []byte, opts ...imagex.UploadAuthOpt) (*imagex.UploadResult, error) { - return nil, nil -} +func (t *tosClient) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { + const ( + DefaultPageSize = 100 + MaxListObjects = 10000 + ) -func (t *tosClient) GetUploadAuthWithExpire(ctx context.Context, expire time.Duration, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { - return nil, nil + files := make([]*storage.FileInfo, 0, DefaultPageSize) + cursor := "" + + for { + output, err := t.ListObjectsPaginated(ctx, &storage.ListObjectsPaginatedInput{ + Prefix: prefix, + PageSize: DefaultPageSize, + Cursor: cursor, + }) + if err != nil { + return nil, fmt.Errorf("list objects failed, prefix = %v, err: %v", prefix, err) + } + + for _, object := range output.Files { + logs.CtxDebugf(ctx, "key = %s, lastModified = %s, eTag = %s, size = %d", object.Key, object.LastModified, object.ETag, object.Size) + files = append(files, object) + } + + cursor = output.Cursor + logs.CtxDebugf(ctx, "IsTruncated = %v, Cursor = %s", output.IsTruncated, output.Cursor) + + if len(files) >= MaxListObjects { + logs.CtxErrorf(ctx, "[ListObjects] max list objects reached, total: %d", len(files)) + break + } + + if !output.IsTruncated || output.Cursor == "" { + break + } + } + + return files, nil } diff --git a/backend/infra/impl/storage/tos/tos_imagex.go b/backend/infra/impl/storage/tos/tos_imagex.go new file mode 100644 index 00000000..889ef344 --- /dev/null +++ b/backend/infra/impl/storage/tos/tos_imagex.go @@ -0,0 +1,81 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tos + +import ( + "context" + "os" + "strings" + "time" + + "github.com/coze-dev/coze-studio/backend/infra/contract/imagex" + "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" + "github.com/coze-dev/coze-studio/backend/types/consts" +) + +func NewStorageImagex(ctx context.Context, ak, sk, bucketName, endpoint, region string) (imagex.ImageX, error) { + t, err := getTosClient(ctx, ak, sk, bucketName, endpoint, region) + if err != nil { + return nil, err + } + return t, nil +} + +func (i *tosClient) GetUploadHost(ctx context.Context) string { + currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx) + if !ok { + return "" + } + return currentHost + consts.ApplyUploadActionURI +} + +func (t *tosClient) GetServerID() string { + return "" +} + +func (t *tosClient) GetUploadAuth(ctx context.Context, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { + scheme := strings.ToLower(os.Getenv(consts.StorageUploadHTTPScheme)) + if scheme == "" { + scheme = "http" + } + return &imagex.SecurityToken{ + AccessKeyID: "", + SecretAccessKey: "", + SessionToken: "", + ExpiredTime: time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"), + CurrentTime: time.Now().Format("2006-01-02 15:04:05"), + HostScheme: scheme, + }, nil +} + +func (t *tosClient) GetResourceURL(ctx context.Context, uri string, opts ...imagex.GetResourceOpt) (*imagex.ResourceURL, error) { + url, err := t.GetObjectUrl(ctx, uri) + if err != nil { + return nil, err + } + return &imagex.ResourceURL{ + URL: url, + }, nil +} + +func (t *tosClient) Upload(ctx context.Context, data []byte, opts ...imagex.UploadAuthOpt) (*imagex.UploadResult, error) { + return nil, nil +} + +func (t *tosClient) GetUploadAuthWithExpire(ctx context.Context, expire time.Duration, opt ...imagex.UploadAuthOpt) (*imagex.SecurityToken, error) { + return nil, nil +} diff --git a/backend/internal/mock/infra/contract/storage/storage_mock.go b/backend/internal/mock/infra/contract/storage/storage_mock.go index d1cc1862..45cd6188 100644 --- a/backend/internal/mock/infra/contract/storage/storage_mock.go +++ b/backend/internal/mock/infra/contract/storage/storage_mock.go @@ -91,6 +91,36 @@ func (mr *MockStorageMockRecorder) GetObjectUrl(ctx, objectKey any, opts ...any) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectUrl", reflect.TypeOf((*MockStorage)(nil).GetObjectUrl), varargs...) } +// ListObjects mocks base method. +func (m *MockStorage) ListObjects(ctx context.Context, prefix string) ([]*storage.FileInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListObjects", ctx, prefix) + ret0, _ := ret[0].([]*storage.FileInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListObjects indicates an expected call of ListObjects. +func (mr *MockStorageMockRecorder) ListObjects(ctx, prefix any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjects", reflect.TypeOf((*MockStorage)(nil).ListObjects), ctx, prefix) +} + +// ListObjectsPaginated mocks base method. +func (m *MockStorage) ListObjectsPaginated(ctx context.Context, input *storage.ListObjectsPaginatedInput) (*storage.ListObjectsPaginatedOutput, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListObjectsPaginated", ctx, input) + ret0, _ := ret[0].(*storage.ListObjectsPaginatedOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListObjectsPaginated indicates an expected call of ListObjectsPaginated. +func (mr *MockStorageMockRecorder) ListObjectsPaginated(ctx, input any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginated", reflect.TypeOf((*MockStorage)(nil).ListObjectsPaginated), ctx, input) +} + // PutObject mocks base method. func (m *MockStorage) PutObject(ctx context.Context, objectKey string, content []byte, opts ...storage.PutOptFn) error { m.ctrl.T.Helper()