[#135] frostfs: Add SEARCH operation

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-06-13 12:34:19 +03:00
parent 0590f84d68
commit 7a380fa46c
4 changed files with 80 additions and 86 deletions

View file

@ -38,14 +38,15 @@ type (
IsDir bool
IsDeleteMarker bool
Bucket string
Name string
Size uint64
ContentType string
Created time.Time
HashSum string
Owner user.ID
Headers map[string]string
Bucket string
Name string
Size uint64
ContentType string
Created time.Time
CreationEpoch uint64
HashSum string
Owner user.ID
Headers map[string]string
}
// NotificationInfo store info to send s3 notification.

View file

@ -128,6 +128,22 @@ type PrmObjectDelete struct {
Object oid.ID
}
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
type PrmObjectSearch struct {
// Authentication parameters.
PrmAuth
// Container to select the objects from.
Container cid.ID
// Key-value object attribute which should be
// presented in selected objects. Optional, empty key means any.
ExactAttribute [2]string
// File prefix of the selected objects. Optional, empty value means any.
FilePrefix string
}
var (
// ErrAccessDenied is returned from FrostFS in case of access violation.
ErrAccessDenied = errors.New("access denied")
@ -215,6 +231,15 @@ type FrostFS interface {
// It returns any error encountered which prevented the removal request from being sent.
DeleteObject(context.Context, PrmObjectDelete) error
// SearchObjects performs object search from the NeoFS container according
// to the specified parameters. It searches user's objects only.
//
// It returns ErrAccessDenied on selection access violation.
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the objects from being selected.
SearchObjects(context.Context, PrmObjectSearch) ([]oid.ID, error)
// TimeToEpoch computes current epoch and the epoch that corresponds to the provided now and future time.
// Note:
// * future time must be after the now

View file

@ -88,14 +88,15 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
CID: bkt.CID,
IsDir: false,
Bucket: bkt.Name,
Name: filepathFromObject(meta),
Created: creation,
ContentType: mimeType,
Headers: headers,
Owner: *meta.OwnerID(),
Size: meta.PayloadSize(),
HashSum: hex.EncodeToString(payloadChecksum.Value()),
Bucket: bkt.Name,
Name: filepathFromObject(meta),
Created: creation,
ContentType: mimeType,
Headers: headers,
Owner: *meta.OwnerID(),
Size: meta.PayloadSize(),
CreationEpoch: meta.CreationEpoch(),
HashSum: hex.EncodeToString(payloadChecksum.Value()),
}
}

View file

@ -1,7 +1,6 @@
package frostfs
import (
"bytes"
"context"
"errors"
"fmt"
@ -12,8 +11,6 @@ import (
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/authmate"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
errorsFrost "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
@ -380,6 +377,43 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e
return handleObjectError("mark object removal via connection pool", err)
}
// SearchObjects implements frostfs.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prm.ExactAttribute[0] != "" {
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
}
if prm.FilePrefix != "" {
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
}
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(prm.Container)
prmSearch.SetFilters(filters)
if prm.BearerToken != nil {
prmSearch.UseBearer(*prm.BearerToken)
} else {
prmSearch.UseKey(prm.PrivateKey)
}
res, err := x.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, handleObjectError("init object search via connection pool", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
return buf, handleObjectError("read object list", err)
}
// ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS.
type ResolverFrostFS struct {
@ -422,73 +456,6 @@ func handleObjectError(msg string, err error) error {
return fmt.Errorf("%s: %w", msg, err)
}
// AuthmateFrostFS is a mediator which implements authmate.FrostFS through pool.Pool.
type AuthmateFrostFS struct {
frostFS *FrostFS
}
// NewAuthmateFrostFS creates new AuthmateFrostFS using provided pool.Pool.
func NewAuthmateFrostFS(p *pool.Pool) *AuthmateFrostFS {
return &AuthmateFrostFS{frostFS: NewFrostFS(p)}
}
// ContainerExists implements authmate.FrostFS interface method.
func (x *AuthmateFrostFS) ContainerExists(ctx context.Context, idCnr cid.ID) error {
_, err := x.frostFS.Container(ctx, idCnr)
if err != nil {
return fmt.Errorf("get container via connection pool: %w", err)
}
return nil
}
// TimeToEpoch implements authmate.FrostFS interface method.
func (x *AuthmateFrostFS) TimeToEpoch(ctx context.Context, futureTime time.Time) (uint64, uint64, error) {
return x.frostFS.TimeToEpoch(ctx, time.Now(), futureTime)
}
// CreateContainer implements authmate.FrostFS interface method.
func (x *AuthmateFrostFS) CreateContainer(ctx context.Context, prm authmate.PrmContainerCreate) (cid.ID, error) {
basicACL := acl.Private
// allow reading objects to OTHERS in order to provide read access to S3 gateways
basicACL.AllowOp(acl.OpObjectGet, acl.RoleOthers)
return x.frostFS.CreateContainer(ctx, layer.PrmContainerCreate{
Creator: prm.Owner,
Policy: prm.Policy,
Name: prm.FriendlyName,
BasicACL: basicACL,
})
}
// ReadObjectPayload implements authmate.FrostFS interface method.
func (x *AuthmateFrostFS) ReadObjectPayload(ctx context.Context, addr oid.Address) ([]byte, error) {
res, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{
Container: addr.Container(),
Object: addr.Object(),
WithPayload: true,
})
if err != nil {
return nil, err
}
defer res.Payload.Close()
return io.ReadAll(res.Payload)
}
// CreateObject implements authmate.FrostFS interface method.
func (x *AuthmateFrostFS) CreateObject(ctx context.Context, prm tokens.PrmObjectCreate) (oid.ID, error) {
return x.frostFS.CreateObject(ctx, layer.PrmObjectCreate{
Creator: prm.Creator,
Container: prm.Container,
Filepath: prm.Filepath,
Attributes: [][2]string{
{objectv2.SysAttributeExpEpoch, strconv.FormatUint(prm.ExpirationEpoch, 10)}},
Payload: bytes.NewReader(prm.Payload),
})
}
// PoolStatistic is a mediator which implements authmate.FrostFS through pool.Pool.
type PoolStatistic struct {
pool *pool.Pool