[#151] index page: Move getDirObjects handlers to browse.go
This commit is contained in:
parent
b188457a15
commit
7d4c03712a
2 changed files with 127 additions and 125 deletions
|
@ -7,11 +7,15 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/docker/go-units"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
|
@ -140,6 +144,129 @@ func urlencode(path string) string {
|
|||
return res.String()
|
||||
}
|
||||
|
||||
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
||||
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var objects = make([]ResponseObject, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
meta := node.GetMeta()
|
||||
if meta == nil {
|
||||
continue
|
||||
}
|
||||
var attrs = make(map[string]string, len(meta))
|
||||
for _, m := range meta {
|
||||
attrs[m.GetKey()] = string(m.GetValue())
|
||||
}
|
||||
obj := newListObjectsResponseS3(attrs)
|
||||
obj.FilePath = prefix + obj.FileName
|
||||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
||||
basePath := strings.TrimRightFunc(prefix, func(r rune) bool {
|
||||
return r != '/'
|
||||
})
|
||||
objectIDs, err := h.search(ctx, bucketInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer objectIDs.Close()
|
||||
|
||||
objects, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (h *Handler) headDirObjects(ctx context.Context, cID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
|
||||
const initialSliceCapacity = 100
|
||||
|
||||
var (
|
||||
log = h.log.With(
|
||||
zap.String("cid", cID.EncodeToString()),
|
||||
zap.String("prefix", basePath),
|
||||
)
|
||||
mu = sync.Mutex{}
|
||||
wg = sync.WaitGroup{}
|
||||
errChan = make(chan error)
|
||||
|
||||
addr oid.Address
|
||||
objects = make([]ResponseObject, 0, initialSliceCapacity)
|
||||
dirs = sync.Map{}
|
||||
auth = PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
}
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
for err := range errChan {
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToHeadObject, zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
addr.SetContainer(cID)
|
||||
err := objectIDs.Iterate(func(id oid.ID) bool {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
addr.SetObject(id)
|
||||
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
|
||||
PrmAuth: auth,
|
||||
Address: addr,
|
||||
})
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
attrs := loadAttributes(obj.Attributes())
|
||||
attrs[attrOID] = id.EncodeToString()
|
||||
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
|
||||
|
||||
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
|
||||
if dirname == "" {
|
||||
mu.Lock()
|
||||
objects = append(objects, newListObjectsResponseNative(attrs))
|
||||
mu.Unlock()
|
||||
} else if _, ok := dirs.Load(dirname); !ok {
|
||||
mu.Lock()
|
||||
objects = append(objects, ResponseObject{
|
||||
FileName: dirname,
|
||||
FilePath: basePath + dirname,
|
||||
IsDir: true,
|
||||
})
|
||||
mu.Unlock()
|
||||
dirs.Store(dirname, true)
|
||||
}
|
||||
}()
|
||||
|
||||
return false
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
type browseParams struct {
|
||||
bucketInfo *data.BucketInfo
|
||||
template string
|
||||
|
|
|
@ -6,9 +6,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
|
@ -432,126 +430,3 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
|||
|
||||
return bktInfo, err
|
||||
}
|
||||
|
||||
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
||||
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var objects = make([]ResponseObject, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
meta := node.GetMeta()
|
||||
if meta == nil {
|
||||
continue
|
||||
}
|
||||
var attrs = make(map[string]string, len(meta))
|
||||
for _, m := range meta {
|
||||
attrs[m.GetKey()] = string(m.GetValue())
|
||||
}
|
||||
obj := newListObjectsResponseS3(attrs)
|
||||
obj.FilePath = prefix + obj.FileName
|
||||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
|
||||
basePath := strings.TrimRightFunc(prefix, func(r rune) bool {
|
||||
return r != '/'
|
||||
})
|
||||
objectIDs, err := h.search(ctx, bucketInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer objectIDs.Close()
|
||||
|
||||
objects, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (h *Handler) headDirObjects(ctx context.Context, cID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
|
||||
const initialSliceCapacity = 100
|
||||
|
||||
var (
|
||||
log = h.log.With(
|
||||
zap.String("cid", cID.EncodeToString()),
|
||||
zap.String("prefix", basePath),
|
||||
)
|
||||
mu = sync.Mutex{}
|
||||
wg = sync.WaitGroup{}
|
||||
errChan = make(chan error)
|
||||
|
||||
addr oid.Address
|
||||
objects = make([]ResponseObject, 0, initialSliceCapacity)
|
||||
dirs = sync.Map{}
|
||||
auth = PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
}
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
for err := range errChan {
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToHeadObject, zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
addr.SetContainer(cID)
|
||||
err := objectIDs.Iterate(func(id oid.ID) bool {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
addr.SetObject(id)
|
||||
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
|
||||
PrmAuth: auth,
|
||||
Address: addr,
|
||||
})
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
attrs := loadAttributes(obj.Attributes())
|
||||
attrs[attrOID] = id.EncodeToString()
|
||||
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
|
||||
|
||||
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
|
||||
if dirname == "" {
|
||||
mu.Lock()
|
||||
objects = append(objects, newListObjectsResponseNative(attrs))
|
||||
mu.Unlock()
|
||||
} else if _, ok := dirs.Load(dirname); !ok {
|
||||
mu.Lock()
|
||||
objects = append(objects, ResponseObject{
|
||||
FileName: dirname,
|
||||
FilePath: basePath + dirname,
|
||||
IsDir: true,
|
||||
})
|
||||
mu.Unlock()
|
||||
dirs.Store(dirname, true)
|
||||
}
|
||||
}()
|
||||
|
||||
return false
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue