feat(tos): knowledge node support use tos proxy dowload file
This commit is contained in:
parent
67d9687fb8
commit
5300947c43
|
|
@ -21,7 +21,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/volcengine/ve-tos-golang-sdk/v2/tos"
|
"github.com/volcengine/ve-tos-golang-sdk/v2/tos"
|
||||||
|
|
@ -72,7 +75,6 @@ func getTosClient(ctx context.Context, ak, sk, bucketName, endpoint, region stri
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, ak, sk, bucketName, endpoint, region string) (storage.Storage, error) {
|
func New(ctx context.Context, ak, sk, bucketName, endpoint, region string) (storage.Storage, error) {
|
||||||
|
|
||||||
t, err := getTosClient(ctx, ak, sk, bucketName, endpoint, region)
|
t, err := getTosClient(ctx, ak, sk, bucketName, endpoint, region)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -211,17 +213,45 @@ func (t *tosClient) GetObjectUrl(ctx context.Context, objectKey string, opts ...
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// url parse
|
||||||
|
url, err := url.Parse(output.SignedUrl)
|
||||||
|
if err != nil {
|
||||||
|
logs.CtxWarnf(ctx, "[GetObjectUrl] url parse failed, err: %v", err)
|
||||||
|
return output.SignedUrl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyPort := os.Getenv(consts.MinIOProxyEndpoint) // :8889
|
||||||
|
if len(proxyPort) > 0 {
|
||||||
|
currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx)
|
||||||
|
if !ok {
|
||||||
|
return output.SignedUrl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
currentScheme, ok := ctxcache.Get[string](ctx, consts.RequestSchemeKeyInCtx)
|
||||||
|
if !ok {
|
||||||
|
return output.SignedUrl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
host, _, err := net.SplitHostPort(currentHost)
|
||||||
|
if err != nil {
|
||||||
|
host = currentHost
|
||||||
|
}
|
||||||
|
minioProxyHost := host + proxyPort
|
||||||
|
url.Host = minioProxyHost
|
||||||
|
url.Scheme = currentScheme
|
||||||
|
// logs.CtxDebugf(ctx, "[GetObjectUrl] reset \n ORG.URL = %s \n TOS.URL = %s", output.SignedUrl, url.String())
|
||||||
|
return url.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
return output.SignedUrl, nil
|
return output.SignedUrl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *tosClient) GetUploadHost(ctx context.Context) string {
|
func (i *tosClient) GetUploadHost(ctx context.Context) string {
|
||||||
|
|
||||||
currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx)
|
currentHost, ok := ctxcache.Get[string](ctx, consts.HostKeyInCtx)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
return currentHost + consts.ApplyUploadActionURI
|
return currentHost + consts.ApplyUploadActionURI
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tosClient) GetServerID() string {
|
func (t *tosClient) GetServerID() string {
|
||||||
|
|
|
||||||
|
|
@ -161,7 +161,13 @@ func setCrashOutput() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func asyncStartMinioProxyServer(ctx context.Context) {
|
func asyncStartMinioProxyServer(ctx context.Context) {
|
||||||
|
storageType := getEnv(consts.StorageType, "minio")
|
||||||
proxyURL := getEnv(consts.MinIOAPIHost, "http://localhost:9000")
|
proxyURL := getEnv(consts.MinIOAPIHost, "http://localhost:9000")
|
||||||
|
|
||||||
|
if storageType == "tos" {
|
||||||
|
proxyURL = getEnv(consts.TOSBucketEndpoint, "https://opencoze.tos-cn-beijing.volces.com")
|
||||||
|
}
|
||||||
|
|
||||||
minioProxyEndpoint := getEnv(consts.MinIOProxyEndpoint, "")
|
minioProxyEndpoint := getEnv(consts.MinIOProxyEndpoint, "")
|
||||||
if len(minioProxyEndpoint) == 0 {
|
if len(minioProxyEndpoint) == 0 {
|
||||||
return
|
return
|
||||||
|
|
@ -176,7 +182,6 @@ func asyncStartMinioProxyServer(ctx context.Context) {
|
||||||
proxy := httputil.NewSingleHostReverseProxy(target)
|
proxy := httputil.NewSingleHostReverseProxy(target)
|
||||||
originDirector := proxy.Director
|
originDirector := proxy.Director
|
||||||
proxy.Director = func(req *http.Request) {
|
proxy.Director = func(req *http.Request) {
|
||||||
|
|
||||||
q := req.URL.Query()
|
q := req.URL.Query()
|
||||||
q.Del("x-wf-file_name")
|
q.Del("x-wf-file_name")
|
||||||
req.URL.RawQuery = q.Encode()
|
req.URL.RawQuery = q.Encode()
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ const (
|
||||||
TOSSecretKey = "TOS_SECRET_KEY"
|
TOSSecretKey = "TOS_SECRET_KEY"
|
||||||
TOSRegion = "TOS_REGION"
|
TOSRegion = "TOS_REGION"
|
||||||
TOSEndpoint = "TOS_ENDPOINT"
|
TOSEndpoint = "TOS_ENDPOINT"
|
||||||
|
TOSBucketEndpoint = "TOS_BUCKET_ENDPOINT"
|
||||||
|
|
||||||
HostKeyInCtx = "HOST_KEY_IN_CTX"
|
HostKeyInCtx = "HOST_KEY_IN_CTX"
|
||||||
RequestSchemeKeyInCtx = "REQUEST_SCHEME_IN_CTX"
|
RequestSchemeKeyInCtx = "REQUEST_SCHEME_IN_CTX"
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ export MINIO_API_HOST="http://${MINIO_ENDPOINT}"
|
||||||
export TOS_ACCESS_KEY=
|
export TOS_ACCESS_KEY=
|
||||||
export TOS_SECRET_KEY=
|
export TOS_SECRET_KEY=
|
||||||
export TOS_ENDPOINT=https://tos-cn-beijing.volces.com
|
export TOS_ENDPOINT=https://tos-cn-beijing.volces.com
|
||||||
|
export TOS_BUCKET_ENDPOINT=https://opencoze.tos-cn-beijing.volces.com
|
||||||
export TOS_REGION=cn-beijing
|
export TOS_REGION=cn-beijing
|
||||||
|
|
||||||
# Elasticsearch
|
# Elasticsearch
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue