[#151] index page: Move browse handlers to browse.go
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
b188457a15
commit
9c1842f899
2 changed files with 127 additions and 125 deletions
|
@ -7,11 +7,15 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/docker/go-units"
|
"github.com/docker/go-units"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -140,6 +144,129 @@ func urlencode(path string) string {
|
||||||
return res.String()
|
return res.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
||||||
|
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var objects = make([]ResponseObject, 0, len(nodes))
|
||||||
|
for _, node := range nodes {
|
||||||
|
meta := node.GetMeta()
|
||||||
|
if meta == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var attrs = make(map[string]string, len(meta))
|
||||||
|
for _, m := range meta {
|
||||||
|
attrs[m.GetKey()] = string(m.GetValue())
|
||||||
|
}
|
||||||
|
obj := newListObjectsResponseS3(attrs)
|
||||||
|
obj.FilePath = prefix + obj.FileName
|
||||||
|
objects = append(objects, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
||||||
|
basePath := strings.TrimRightFunc(prefix, func(r rune) bool {
|
||||||
|
return r != '/'
|
||||||
|
})
|
||||||
|
objectIDs, err := h.search(ctx, bucketInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer objectIDs.Close()
|
||||||
|
|
||||||
|
objects, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) headDirObjects(ctx context.Context, cID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
|
||||||
|
const initialSliceCapacity = 100
|
||||||
|
|
||||||
|
var (
|
||||||
|
log = h.log.With(
|
||||||
|
zap.String("cid", cID.EncodeToString()),
|
||||||
|
zap.String("prefix", basePath),
|
||||||
|
)
|
||||||
|
mu = sync.Mutex{}
|
||||||
|
wg = sync.WaitGroup{}
|
||||||
|
errChan = make(chan error)
|
||||||
|
|
||||||
|
addr oid.Address
|
||||||
|
objects = make([]ResponseObject, 0, initialSliceCapacity)
|
||||||
|
dirs = sync.Map{}
|
||||||
|
auth = PrmAuth{
|
||||||
|
BearerToken: bearerToken(ctx),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for err := range errChan {
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.FailedToHeadObject, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
addr.SetContainer(cID)
|
||||||
|
err := objectIDs.Iterate(func(id oid.ID) bool {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
addr.SetObject(id)
|
||||||
|
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
|
||||||
|
PrmAuth: auth,
|
||||||
|
Address: addr,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
errChan <- err
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
attrs := loadAttributes(obj.Attributes())
|
||||||
|
attrs[attrOID] = id.EncodeToString()
|
||||||
|
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
|
||||||
|
|
||||||
|
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
|
||||||
|
if dirname == "" {
|
||||||
|
mu.Lock()
|
||||||
|
objects = append(objects, newListObjectsResponseNative(attrs))
|
||||||
|
mu.Unlock()
|
||||||
|
} else if _, ok := dirs.Load(dirname); !ok {
|
||||||
|
mu.Lock()
|
||||||
|
objects = append(objects, ResponseObject{
|
||||||
|
FileName: dirname,
|
||||||
|
FilePath: basePath + dirname,
|
||||||
|
IsDir: true,
|
||||||
|
})
|
||||||
|
mu.Unlock()
|
||||||
|
dirs.Store(dirname, true)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(errChan)
|
||||||
|
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
|
||||||
type browseParams struct {
|
type browseParams struct {
|
||||||
bucketInfo *data.BucketInfo
|
bucketInfo *data.BucketInfo
|
||||||
template string
|
template string
|
||||||
|
|
|
@ -6,9 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
@ -432,126 +430,3 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
||||||
|
|
||||||
return bktInfo, err
|
return bktInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
|
||||||
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var objects = make([]ResponseObject, 0, len(nodes))
|
|
||||||
for _, node := range nodes {
|
|
||||||
meta := node.GetMeta()
|
|
||||||
if meta == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var attrs = make(map[string]string, len(meta))
|
|
||||||
for _, m := range meta {
|
|
||||||
attrs[m.GetKey()] = string(m.GetValue())
|
|
||||||
}
|
|
||||||
obj := newListObjectsResponseS3(attrs)
|
|
||||||
obj.FilePath = prefix + obj.FileName
|
|
||||||
objects = append(objects, obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
return objects, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
|
||||||
basePath := strings.TrimRightFunc(prefix, func(r rune) bool {
|
|
||||||
return r != '/'
|
|
||||||
})
|
|
||||||
objectIDs, err := h.search(ctx, bucketInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer objectIDs.Close()
|
|
||||||
|
|
||||||
objects, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return objects, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) headDirObjects(ctx context.Context, cID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
|
|
||||||
const initialSliceCapacity = 100
|
|
||||||
|
|
||||||
var (
|
|
||||||
log = h.log.With(
|
|
||||||
zap.String("cid", cID.EncodeToString()),
|
|
||||||
zap.String("prefix", basePath),
|
|
||||||
)
|
|
||||||
mu = sync.Mutex{}
|
|
||||||
wg = sync.WaitGroup{}
|
|
||||||
errChan = make(chan error)
|
|
||||||
|
|
||||||
addr oid.Address
|
|
||||||
objects = make([]ResponseObject, 0, initialSliceCapacity)
|
|
||||||
dirs = sync.Map{}
|
|
||||||
auth = PrmAuth{
|
|
||||||
BearerToken: bearerToken(ctx),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for err := range errChan {
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.FailedToHeadObject, zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
addr.SetContainer(cID)
|
|
||||||
err := objectIDs.Iterate(func(id oid.ID) bool {
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
addr.SetObject(id)
|
|
||||||
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
|
|
||||||
PrmAuth: auth,
|
|
||||||
Address: addr,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
errChan <- err
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
attrs := loadAttributes(obj.Attributes())
|
|
||||||
attrs[attrOID] = id.EncodeToString()
|
|
||||||
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
|
|
||||||
|
|
||||||
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
|
|
||||||
if dirname == "" {
|
|
||||||
mu.Lock()
|
|
||||||
objects = append(objects, newListObjectsResponseNative(attrs))
|
|
||||||
mu.Unlock()
|
|
||||||
} else if _, ok := dirs.Load(dirname); !ok {
|
|
||||||
mu.Lock()
|
|
||||||
objects = append(objects, ResponseObject{
|
|
||||||
FileName: dirname,
|
|
||||||
FilePath: basePath + dirname,
|
|
||||||
IsDir: true,
|
|
||||||
})
|
|
||||||
mu.Unlock()
|
|
||||||
dirs.Store(dirname, true)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
close(errChan)
|
|
||||||
|
|
||||||
return objects, nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue