Roman Loginov
ef2b75597c
Fallback path to search is needed because some software may keep FileName attribute and ignore FilePath attribute during file upload. Therefore, if this feature is enabled under certain conditions (for more information, see gate-configuration.md) a search will be performed for the FileName attribute. Signed-off-by: Roman Loginov <r.loginov@yadro.com>
461 lines
13 KiB
Go
461 lines
13 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/panjf2000/ants/v2"
|
|
"github.com/valyala/fasthttp"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type Config interface {
|
|
DefaultTimestamp() bool
|
|
ZipCompression() bool
|
|
ClientCut() bool
|
|
IndexPageEnabled() bool
|
|
IndexPageTemplate() string
|
|
BufferMaxSizeForPut() uint64
|
|
NamespaceHeader() string
|
|
EnableFilepathFallback() bool
|
|
}
|
|
|
|
// PrmContainer groups parameters of FrostFS.Container operation.
|
|
type PrmContainer struct {
|
|
// Container identifier.
|
|
ContainerID cid.ID
|
|
}
|
|
|
|
// PrmAuth groups authentication parameters for the FrostFS operation.
|
|
type PrmAuth struct {
|
|
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
|
|
BearerToken *bearer.Token
|
|
}
|
|
|
|
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
|
|
type PrmObjectHead struct {
|
|
// Authentication parameters.
|
|
PrmAuth
|
|
|
|
// Address to read the object header from.
|
|
Address oid.Address
|
|
}
|
|
|
|
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
|
|
type PrmObjectGet struct {
|
|
// Authentication parameters.
|
|
PrmAuth
|
|
|
|
// Address to read the object header from.
|
|
Address oid.Address
|
|
}
|
|
|
|
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
|
type PrmObjectRange struct {
|
|
// Authentication parameters.
|
|
PrmAuth
|
|
|
|
// Address to read the object header from.
|
|
Address oid.Address
|
|
|
|
// Offset-length range of the object payload to be read.
|
|
PayloadRange [2]uint64
|
|
}
|
|
|
|
// Object represents FrostFS object.
|
|
type Object struct {
|
|
// Object header (doesn't contain payload).
|
|
Header object.Object
|
|
|
|
// Object payload part encapsulated in io.Reader primitive.
|
|
// Returns ErrAccessDenied on read access violation.
|
|
Payload io.ReadCloser
|
|
}
|
|
|
|
// PrmObjectCreate groups parameters of FrostFS.CreateObject operation.
|
|
type PrmObjectCreate struct {
|
|
// Authentication parameters.
|
|
PrmAuth
|
|
|
|
Object *object.Object
|
|
|
|
// Object payload encapsulated in io.Reader primitive.
|
|
Payload io.Reader
|
|
|
|
// Enables client side object preparing.
|
|
ClientCut bool
|
|
|
|
// Disables using Tillich-Zémor hash for payload.
|
|
WithoutHomomorphicHash bool
|
|
|
|
// Sets max buffer size to read payload.
|
|
BufferMaxSize uint64
|
|
}
|
|
|
|
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
|
|
type PrmObjectSearch struct {
|
|
// Authentication parameters.
|
|
PrmAuth
|
|
|
|
// Container to select the objects from.
|
|
Container cid.ID
|
|
|
|
Filters object.SearchFilters
|
|
}
|
|
|
|
type PrmInitMultiObjectReader struct {
|
|
// payload range
|
|
Off, Ln uint64
|
|
|
|
Addr oid.Address
|
|
Bearer *bearer.Token
|
|
}
|
|
|
|
type ResObjectSearch interface {
|
|
Read(buf []oid.ID) (int, error)
|
|
Iterate(f func(oid.ID) bool) error
|
|
Close()
|
|
}
|
|
|
|
var (
|
|
// ErrAccessDenied is returned from FrostFS in case of access violation.
|
|
ErrAccessDenied = errors.New("access denied")
|
|
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
|
|
ErrGatewayTimeout = errors.New("gateway timeout")
|
|
)
|
|
|
|
// FrostFS represents virtual connection to FrostFS network.
|
|
type FrostFS interface {
|
|
Container(context.Context, PrmContainer) (*container.Container, error)
|
|
HeadObject(context.Context, PrmObjectHead) (*object.Object, error)
|
|
GetObject(context.Context, PrmObjectGet) (*Object, error)
|
|
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
|
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
|
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
|
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
|
|
|
|
utils.EpochInfoFetcher
|
|
}
|
|
|
|
type ContainerResolver interface {
|
|
Resolve(ctx context.Context, name string) (*cid.ID, error)
|
|
}
|
|
|
|
type Handler struct {
|
|
log *zap.Logger
|
|
frostfs FrostFS
|
|
ownerID *user.ID
|
|
config Config
|
|
containerResolver ContainerResolver
|
|
tree *tree.Tree
|
|
cache *cache.BucketCache
|
|
workerPool *ants.Pool
|
|
}
|
|
|
|
type AppParams struct {
|
|
Logger *zap.Logger
|
|
FrostFS FrostFS
|
|
Owner *user.ID
|
|
Resolver ContainerResolver
|
|
Cache *cache.BucketCache
|
|
}
|
|
|
|
func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler {
|
|
return &Handler{
|
|
log: params.Logger,
|
|
frostfs: params.FrostFS,
|
|
ownerID: params.Owner,
|
|
config: config,
|
|
containerResolver: params.Resolver,
|
|
tree: tree,
|
|
cache: params.Cache,
|
|
workerPool: workerPool,
|
|
}
|
|
}
|
|
|
|
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
|
// prepares request and object address to it.
|
|
func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
|
idCnr, _ := c.UserValue("cid").(string)
|
|
idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
|
|
|
|
ctx := utils.GetContextFromRequest(c)
|
|
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
log := reqLog.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
|
|
|
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
objID := new(oid.ID)
|
|
if err = objID.DecodeString(idObj); err != nil {
|
|
log.Error(logs.WrongObjectID, zap.Error(err))
|
|
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
addr := newAddress(bktInfo.CID, *objID)
|
|
|
|
f(ctx, *h.newRequest(c, log), addr)
|
|
}
|
|
|
|
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
|
// resolves object address from S3-like path <bucket name>/<object key>.
|
|
func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
|
bucketname := c.UserValue("cid").(string)
|
|
key := c.UserValue("oid").(string)
|
|
|
|
ctx := utils.GetContextFromRequest(c)
|
|
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
log := reqLog.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
|
|
|
unescapedKey, err := url.QueryUnescape(key)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
|
|
if err != nil {
|
|
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
|
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
|
|
} else {
|
|
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
|
|
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
|
}
|
|
return
|
|
}
|
|
if foundOid.DeleteMarker {
|
|
log.Error(logs.ObjectWasDeleted)
|
|
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
|
return
|
|
}
|
|
addr := newAddress(bktInfo.CID, foundOid.OID)
|
|
|
|
f(ctx, *h.newRequest(c, log), addr)
|
|
}
|
|
|
|
// byAttribute is a wrapper similar to byNativeAddress.
|
|
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
|
scid, _ := c.UserValue("cid").(string)
|
|
key, _ := c.UserValue("attr_key").(string)
|
|
val, _ := c.UserValue("attr_val").(string)
|
|
|
|
ctx := utils.GetContextFromRequest(c)
|
|
log := utils.GetReqLogOrDefault(ctx, h.log)
|
|
|
|
key, err := url.QueryUnescape(key)
|
|
if err != nil {
|
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Error(err))
|
|
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
val, err = url.QueryUnescape(val)
|
|
if err != nil {
|
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Error(err))
|
|
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
log = log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
|
|
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val)
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
response.Error(c, err.Error(), fasthttp.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var addrObj oid.Address
|
|
addrObj.SetContainer(bktInfo.CID)
|
|
addrObj.SetObject(objID)
|
|
|
|
f(ctx, *h.newRequest(c, log), addrObj)
|
|
}
|
|
|
|
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
|
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
|
if err != nil {
|
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
|
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
|
|
}
|
|
defer res.Close()
|
|
|
|
buf := make([]oid.ID, 1)
|
|
|
|
n, err := res.Read(buf)
|
|
if n == 0 {
|
|
switch {
|
|
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
|
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName)
|
|
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
|
|
case errors.Is(err, io.EOF):
|
|
log.Error(logs.ObjectNotFound, zap.Error(err))
|
|
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
|
default:
|
|
log.Error(logs.ReadObjectListFailed, zap.Error(err))
|
|
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
|
}
|
|
}
|
|
|
|
return buf[0], nil
|
|
}
|
|
|
|
func (h *Handler) needSearchByFileName(key, val string) bool {
|
|
if key != attrFilePath || !h.config.EnableFilepathFallback() {
|
|
return false
|
|
}
|
|
|
|
return strings.HasPrefix(val, "/") && strings.Count(val, "/") == 1 || !strings.Contains(val, "/")
|
|
}
|
|
|
|
// resolveContainer decode container id, if it's not a valid container id
|
|
// then trey to resolve name using provided resolver.
|
|
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
|
|
cnrID := new(cid.ID)
|
|
err := cnrID.DecodeString(containerID)
|
|
if err != nil {
|
|
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
|
if err != nil && strings.Contains(err.Error(), "not found") {
|
|
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
|
}
|
|
}
|
|
return cnrID, err
|
|
}
|
|
|
|
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
|
ns, err := middleware.GetNamespace(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil {
|
|
return bktInfo, nil
|
|
}
|
|
|
|
cnrID, err := h.resolveContainer(ctx, containerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bktInfo, err := h.readContainer(ctx, *cnrID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = h.cache.Put(bktInfo); err != nil {
|
|
log.Warn(logs.CouldntPutBucketIntoCache,
|
|
zap.String("bucket name", bktInfo.Name),
|
|
zap.Stringer("bucket cid", bktInfo.CID),
|
|
zap.Error(err))
|
|
}
|
|
|
|
return bktInfo, nil
|
|
}
|
|
|
|
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
|
|
prm := PrmContainer{ContainerID: cnrID}
|
|
res, err := h.frostfs.Container(ctx, prm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
|
|
}
|
|
|
|
bktInfo := &data.BucketInfo{
|
|
CID: cnrID,
|
|
Name: cnrID.EncodeToString(),
|
|
}
|
|
|
|
if domain := container.ReadDomain(*res); domain.Name() != "" {
|
|
bktInfo.Name = domain.Name()
|
|
bktInfo.Zone = domain.Zone()
|
|
}
|
|
|
|
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(*res)
|
|
|
|
return bktInfo, err
|
|
}
|
|
|
|
func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
|
|
if !h.config.IndexPageEnabled() {
|
|
c.SetStatusCode(fasthttp.StatusNotFound)
|
|
return
|
|
}
|
|
|
|
cidURLParam := c.UserValue("cid").(string)
|
|
oidURLParam := c.UserValue("oid").(string)
|
|
|
|
ctx := utils.GetContextFromRequest(c)
|
|
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
|
|
|
|
unescapedKey, err := url.QueryUnescape(oidURLParam)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
listFunc := h.getDirObjectsS3
|
|
isNativeList := false
|
|
|
|
err = h.tree.CheckSettingsNodeExist(ctx, bktInfo)
|
|
if err != nil {
|
|
if errors.Is(err, tree.ErrNodeNotFound) {
|
|
// tree probe failed, try to use native
|
|
listFunc = h.getDirObjectsNative
|
|
isNativeList = true
|
|
} else {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
h.browseObjects(c, browseParams{
|
|
bucketInfo: bktInfo,
|
|
prefix: unescapedKey,
|
|
listObjects: listFunc,
|
|
isNative: isNativeList,
|
|
})
|
|
}
|