package handler import ( "context" "errors" "fmt" "io" "net/url" "strings" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/panjf2000/ants/v2" "github.com/valyala/fasthttp" "go.uber.org/zap" ) type Config interface { DefaultTimestamp() bool ArchiveCompression() bool ClientCut() bool IndexPageEnabled() bool IndexPageTemplate() string BufferMaxSizeForPut() uint64 NamespaceHeader() string EnableFilepathFallback() bool } // PrmContainer groups parameters of FrostFS.Container operation. type PrmContainer struct { // Container identifier. ContainerID cid.ID } // PrmAuth groups authentication parameters for the FrostFS operation. type PrmAuth struct { // Bearer token to be used for the operation. Overlaps PrivateKey. Optional. BearerToken *bearer.Token } // PrmObjectHead groups parameters of FrostFS.HeadObject operation. type PrmObjectHead struct { // Authentication parameters. PrmAuth // Address to read the object header from. Address oid.Address } // PrmObjectGet groups parameters of FrostFS.GetObject operation. type PrmObjectGet struct { // Authentication parameters. PrmAuth // Address to read the object header from. Address oid.Address } // PrmObjectRange groups parameters of FrostFS.RangeObject operation. type PrmObjectRange struct { // Authentication parameters. PrmAuth // Address to read the object header from. Address oid.Address // Offset-length range of the object payload to be read. PayloadRange [2]uint64 } // Object represents FrostFS object. type Object struct { // Object header (doesn't contain payload). Header object.Object // Object payload part encapsulated in io.Reader primitive. // Returns ErrAccessDenied on read access violation. Payload io.ReadCloser } // PrmObjectCreate groups parameters of FrostFS.CreateObject operation. type PrmObjectCreate struct { // Authentication parameters. PrmAuth Object *object.Object // Object payload encapsulated in io.Reader primitive. Payload io.Reader // Enables client side object preparing. ClientCut bool // Disables using Tillich-ZĂ©mor hash for payload. WithoutHomomorphicHash bool // Sets max buffer size to read payload. BufferMaxSize uint64 } // PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation. type PrmObjectSearch struct { // Authentication parameters. PrmAuth // Container to select the objects from. Container cid.ID Filters object.SearchFilters } type PrmInitMultiObjectReader struct { // payload range Off, Ln uint64 Addr oid.Address Bearer *bearer.Token } type ResObjectSearch interface { Read(buf []oid.ID) (int, error) Iterate(f func(oid.ID) bool) error Close() } var ( // ErrAccessDenied is returned from FrostFS in case of access violation. ErrAccessDenied = errors.New("access denied") // ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc. ErrGatewayTimeout = errors.New("gateway timeout") // ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded. ErrQuotaLimitReached = errors.New("quota limit reached") // ErrContainerNotFound is returned from FrostFS in case of container was not found. ErrContainerNotFound = errors.New("container not found") // ErrObjectNotFound is returned from FrostFS in case of object was not found. ErrObjectNotFound = errors.New("object not found") ) // FrostFS represents virtual connection to FrostFS network. type FrostFS interface { Container(context.Context, PrmContainer) (*container.Container, error) HeadObject(context.Context, PrmObjectHead) (*object.Object, error) GetObject(context.Context, PrmObjectGet) (*Object, error) RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error) utils.EpochInfoFetcher } type ContainerResolver interface { Resolve(ctx context.Context, name string) (*cid.ID, error) } type Handler struct { log *zap.Logger frostfs FrostFS ownerID *user.ID config Config containerResolver ContainerResolver tree layer.TreeService cache *cache.BucketCache workerPool *ants.Pool } type AppParams struct { Logger *zap.Logger FrostFS FrostFS Owner *user.ID Resolver ContainerResolver Cache *cache.BucketCache } func New(params *AppParams, config Config, tree layer.TreeService, workerPool *ants.Pool) *Handler { return &Handler{ log: params.Logger, frostfs: params.FrostFS, ownerID: params.Owner, config: config, containerResolver: params.Resolver, tree: tree, cache: params.Cache, workerPool: workerPool, } } // byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. func (h *Handler) byNativeAddress(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, objID oid.ID, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) { ctx, span := tracing.StartSpanFromContext(ctx, "handler.byNativeAddress") defer span.End() addr := newAddress(cnrID, objID) handler(ctx, req, addr) } // byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that // resolves object address from S3-like path /. func (h *Handler) byS3Path(ctx context.Context, req *fasthttp.RequestCtx, cnrID cid.ID, path string, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) { ctx, span := tracing.StartSpanFromContext(ctx, "handler.byS3Path") defer span.End() foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path) if err != nil { h.logAndSendError(ctx, req, logs.FailedToGetLatestVersionOfObject, err, zap.String("path", path)) return } if foundOID.IsDeleteMarker { h.logAndSendError(ctx, req, logs.ObjectWasDeleted, ErrObjectNotFound) return } addr := newAddress(cnrID, foundOID.OID) handler(ctx, req, addr) } // byAttribute is a wrapper similar to byNativeAddress. func (h *Handler) byAttribute(ctx context.Context, req *fasthttp.RequestCtx, handler func(context.Context, *fasthttp.RequestCtx, oid.Address)) { cidParam, _ := req.UserValue("cid").(string) key, _ := req.UserValue("attr_key").(string) val, _ := req.UserValue("attr_val").(string) key, err := url.QueryUnescape(key) if err != nil { h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_key", key)) return } val, err = url.QueryUnescape(val) if err != nil { h.logAndSendError(ctx, req, logs.FailedToUnescapeQuery, err, zap.String("cid", cidParam), zap.String("attr_val", key)) return } val = prepareAtribute(key, val) ctx = utils.SetReqLog(ctx, h.reqLogger(ctx).With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val))) bktInfo, err := h.getBucketInfo(ctx, cidParam) if err != nil { h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err) return } objID, err := h.findObjectByAttribute(ctx, bktInfo.CID, key, val) if err != nil { if errors.Is(err, io.EOF) { err = fmt.Errorf("%w: %s", ErrObjectNotFound, err.Error()) } h.logAndSendError(ctx, req, logs.FailedToFindObjectByAttribute, err) return } var addr oid.Address addr.SetContainer(bktInfo.CID) addr.SetObject(objID) handler(ctx, req, addr) } func (h *Handler) findObjectByAttribute(ctx context.Context, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) { res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual) if err != nil { return oid.ID{}, fmt.Errorf("search objects: %w", err) } defer res.Close() buf := make([]oid.ID, 1) n, err := res.Read(buf) if n == 0 { switch { case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal): h.reqLogger(ctx).Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage)) attrVal = prepareAtribute(attrFileName, attrVal) return h.findObjectByAttribute(ctx, cnrID, attrFileName, attrVal) case errors.Is(err, io.EOF): h.reqLogger(ctx).Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return oid.ID{}, fmt.Errorf("object not found: %w", err) default: h.reqLogger(ctx).Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return oid.ID{}, fmt.Errorf("read object list failed: %w", err) } } return buf[0], nil } func (h *Handler) needSearchByFileName(key, val string) bool { if key != attrFilePath || !h.config.EnableFilepathFallback() { return false } return strings.HasPrefix(val, "/") && strings.Count(val, "/") == 1 || !strings.Contains(val, "/") } func prepareAtribute(attrKey, attrVal string) string { if attrKey == attrFileName { return prepareFileName(attrVal) } if attrKey == attrFilePath { return prepareFilePath(attrVal) } return attrVal } func prepareFileName(fileName string) string { if strings.HasPrefix(fileName, "/") { return fileName[1:] } return fileName } func prepareFilePath(filePath string) string { if !strings.HasPrefix(filePath, "/") { return "/" + filePath } return filePath } // resolveContainer decode container id, if it's not a valid container id // then trey to resolve name using provided resolver. func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) { cnrID := new(cid.ID) err := cnrID.DecodeString(containerID) if err != nil { cnrID, err = h.containerResolver.Resolve(ctx, containerID) if err != nil && strings.Contains(err.Error(), "not found") { err = fmt.Errorf("%w: %s", ErrContainerNotFound, err.Error()) } } return cnrID, err } func (h *Handler) getBucketInfo(ctx context.Context, containerName string) (*data.BucketInfo, error) { ns, err := middleware.GetNamespace(ctx) if err != nil { return nil, err } if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil { return bktInfo, nil } cnrID, err := h.resolveContainer(ctx, containerName) if err != nil { return nil, fmt.Errorf("resolve container: %w", err) } bktInfo, err := h.readContainer(ctx, *cnrID) if err != nil { return nil, fmt.Errorf("read container: %w", err) } if err = h.cache.Put(bktInfo); err != nil { h.reqLogger(ctx).Warn(logs.CouldntPutBucketIntoCache, zap.String("bucket name", bktInfo.Name), zap.Stringer("bucket cid", bktInfo.CID), zap.Error(err), logs.TagField(logs.TagDatapath)) } return bktInfo, nil } func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) { prm := PrmContainer{ContainerID: cnrID} res, err := h.frostfs.Container(ctx, prm) if err != nil { return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err) } bktInfo := &data.BucketInfo{ CID: cnrID, Name: cnrID.EncodeToString(), } if domain := container.ReadDomain(*res); domain.Name() != "" { bktInfo.Name = domain.Name() bktInfo.Zone = domain.Zone() } bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(*res) bktInfo.PlacementPolicy = res.PlacementPolicy() return bktInfo, err } func (h *Handler) browseIndex(ctx context.Context, req *fasthttp.RequestCtx, cidParam, oidParam string, isNativeList bool) { ctx, span := tracing.StartSpanFromContext(ctx, "handler.browseIndex") defer span.End() if !h.config.IndexPageEnabled() { req.SetStatusCode(fasthttp.StatusNotFound) return } unescapedKey, err := url.QueryUnescape(oidParam) if err != nil { h.logAndSendError(ctx, req, logs.FailedToUnescapeOIDParam, err) return } bktInfo, err := h.getBucketInfo(ctx, cidParam) if err != nil { h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err) return } listFunc := h.getDirObjectsS3 if isNativeList { // tree probe failed, trying to use native listFunc = h.getDirObjectsNative } h.browseObjects(ctx, req, browseParams{ bucketInfo: bktInfo, prefix: unescapedKey, listObjects: listFunc, isNative: isNativeList, }) }