frostfs-http-gw/internal/handler/handler.go

438 lines
12 KiB
Go
Raw Normal View History

package handler
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/panjf2000/ants/v2"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
type Config interface {
DefaultTimestamp() bool
ZipCompression() bool
ClientCut() bool
IndexPageEnabled() bool
IndexPageTemplate() string
BufferMaxSizeForPut() uint64
NamespaceHeader() string
}
// PrmContainer groups parameters of FrostFS.Container operation.
type PrmContainer struct {
// Container identifier.
ContainerID cid.ID
}
// PrmAuth groups authentication parameters for the FrostFS operation.
type PrmAuth struct {
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
BearerToken *bearer.Token
}
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
type PrmObjectHead struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
}
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
type PrmObjectGet struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
}
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
type PrmObjectRange struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
// Offset-length range of the object payload to be read.
PayloadRange [2]uint64
}
// Object represents FrostFS object.
type Object struct {
// Object header (doesn't contain payload).
Header object.Object
// Object payload part encapsulated in io.Reader primitive.
// Returns ErrAccessDenied on read access violation.
Payload io.ReadCloser
}
// PrmObjectCreate groups parameters of FrostFS.CreateObject operation.
type PrmObjectCreate struct {
// Authentication parameters.
PrmAuth
Object *object.Object
// Object payload encapsulated in io.Reader primitive.
Payload io.Reader
// Enables client side object preparing.
ClientCut bool
// Disables using Tillich-Zémor hash for payload.
WithoutHomomorphicHash bool
// Sets max buffer size to read payload.
BufferMaxSize uint64
}
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
type PrmObjectSearch struct {
// Authentication parameters.
PrmAuth
// Container to select the objects from.
Container cid.ID
Filters object.SearchFilters
}
type PrmInitMultiObjectReader struct {
// payload range
Off, Ln uint64
Addr oid.Address
Bearer *bearer.Token
}
type ResObjectSearch interface {
Read(buf []oid.ID) (int, error)
Iterate(f func(oid.ID) bool) error
Close()
}
var (
// ErrAccessDenied is returned from FrostFS in case of access violation.
ErrAccessDenied = errors.New("access denied")
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
ErrGatewayTimeout = errors.New("gateway timeout")
)
// FrostFS represents virtual connection to FrostFS network.
type FrostFS interface {
Container(context.Context, PrmContainer) (*container.Container, error)
HeadObject(context.Context, PrmObjectHead) (*object.Object, error)
GetObject(context.Context, PrmObjectGet) (*Object, error)
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
utils.EpochInfoFetcher
}
type ContainerResolver interface {
Resolve(ctx context.Context, name string) (*cid.ID, error)
}
type Handler struct {
log *zap.Logger
frostfs FrostFS
ownerID *user.ID
config Config
containerResolver ContainerResolver
tree *tree.Tree
cache *cache.BucketCache
workerPool *ants.Pool
}
type AppParams struct {
Logger *zap.Logger
FrostFS FrostFS
Owner *user.ID
Resolver ContainerResolver
Cache *cache.BucketCache
}
func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler {
return &Handler{
log: params.Logger,
frostfs: params.FrostFS,
ownerID: params.Owner,
config: config,
containerResolver: params.Resolver,
tree: tree,
cache: params.Cache,
workerPool: workerPool,
}
}
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it.
func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
idCnr, _ := c.UserValue("cid").(string)
idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("cid", idCnr), zap.String("oid", idObj))
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
objID := new(oid.ID)
if err = objID.DecodeString(idObj); err != nil {
log.Error(logs.WrongObjectID, zap.Error(err))
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
return
}
addr := newAddress(bktInfo.CID, *objID)
f(ctx, *h.newRequest(c, log), addr)
}
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// resolves object address from S3-like path <bucket name>/<object key>.
func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
bucketname := c.UserValue("cid").(string)
key := c.UserValue("oid").(string)
ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("bucketname", bucketname), zap.String("key", key))
unescapedKey, err := url.QueryUnescape(key)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
if err != nil {
if errors.Is(err, tree.ErrNodeAccessDenied) {
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
} else {
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
}
return
}
if foundOid.DeleteMarker {
log.Error(logs.ObjectWasDeleted)
response.Error(c, "object deleted", fasthttp.StatusNotFound)
return
}
addr := newAddress(bktInfo.CID, foundOid.OID)
f(ctx, *h.newRequest(c, log), addr)
}
// byAttribute is a wrapper similar to byNativeAddress.
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
scid, _ := c.UserValue("cid").(string)
key, _ := c.UserValue("attr_key").(string)
val, _ := c.UserValue("attr_val").(string)
ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log)
key, err := url.QueryUnescape(key)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Error(err))
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
return
}
val, err = url.QueryUnescape(val)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Error(err))
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
return
}
log = log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
bktInfo, err := h.getBucketInfo(ctx, scid, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
return
}
defer res.Close()
buf := make([]oid.ID, 1)
n, err := res.Read(buf)
if n == 0 {
if errors.Is(err, io.EOF) {
log.Error(logs.ObjectNotFound, zap.Error(err))
response.Error(c, "object not found", fasthttp.StatusNotFound)
return
}
log.Error(logs.ReadObjectListFailed, zap.Error(err))
response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest)
return
}
var addrObj oid.Address
addrObj.SetContainer(bktInfo.CID)
addrObj.SetObject(buf[0])
f(ctx, *h.newRequest(c, log), addrObj)
}
// resolveContainer decode container id, if it's not a valid container id
// then trey to resolve name using provided resolver.
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
cnrID := new(cid.ID)
err := cnrID.DecodeString(containerID)
if err != nil {
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
if err != nil && strings.Contains(err.Error(), "not found") {
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
}
}
return cnrID, err
}
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
ns, err := middleware.GetNamespace(ctx)
if err != nil {
return nil, err
}
if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil {
return bktInfo, nil
}
cnrID, err := h.resolveContainer(ctx, containerName)
if err != nil {
return nil, err
}
bktInfo, err := h.readContainer(ctx, *cnrID)
if err != nil {
return nil, err
}
if err = h.cache.Put(bktInfo); err != nil {
log.Warn(logs.CouldntPutBucketIntoCache,
zap.String("bucket name", bktInfo.Name),
zap.Stringer("bucket cid", bktInfo.CID),
zap.Error(err))
}
return bktInfo, nil
}
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
prm := PrmContainer{ContainerID: cnrID}
res, err := h.frostfs.Container(ctx, prm)
if err != nil {
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
}
bktInfo := &data.BucketInfo{
CID: cnrID,
Name: cnrID.EncodeToString(),
}
if domain := container.ReadDomain(*res); domain.Name() != "" {
bktInfo.Name = domain.Name()
bktInfo.Zone = domain.Zone()
}
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(*res)
return bktInfo, err
}
func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
if !h.config.IndexPageEnabled() {
c.SetStatusCode(fasthttp.StatusNotFound)
return
}
cidURLParam := c.UserValue("cid").(string)
oidURLParam := c.UserValue("oid").(string)
ctx := utils.GetContextFromRequest(c)
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
log := reqLog.With(zap.String("cid", cidURLParam), zap.String("oid", oidURLParam))
unescapedKey, err := url.QueryUnescape(oidURLParam)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
listFunc := h.getDirObjectsS3
isNativeList := false
err = h.tree.CheckSettingsNodeExist(ctx, bktInfo)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
// tree probe failed, try to use native
listFunc = h.getDirObjectsNative
isNativeList = true
} else {
logAndSendBucketError(c, log, err)
return
}
}
h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: unescapedKey,
listObjects: listFunc,
isNative: isNativeList,
})
}