[#545] Drop object search

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-08-22 11:02:58 +03:00 committed by Alex Vanin
parent e4ec983213
commit 24e1f7e1dd
5 changed files with 12 additions and 134 deletions

View file

@ -200,7 +200,7 @@ func TestDeleteMarkers(t *testing.T) {
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty") require.Len(t, versions.Version, 0, "versions must be empty")
require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName, objName), 0, "shouldn't be any object in neofs") require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 0, "shouldn't be any object in neofs")
} }
func TestDeleteObjectFromListCache(t *testing.T) { func TestDeleteObjectFromListCache(t *testing.T) {

View file

@ -195,20 +195,11 @@ func existInMockedNeoFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo *d
return tc.Layer().GetObject(tc.Context(), p) == nil return tc.Layer().GetObject(tc.Context(), p) == nil
} }
func listOIDsFromMockedNeoFS(t *testing.T, tc *handlerContext, bktName, objectName string) []oid.ID { func listOIDsFromMockedNeoFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName) bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
require.NoError(t, err) require.NoError(t, err)
p := layer.PrmObjectSelect{ return tc.MockedPool().AllObjects(bktInfo.CID)
Container: bktInfo.CID,
ExactAttribute: [2]string{
object.AttributeFileName, objectName,
},
}
ids, err := tc.MockedPool().SelectObjects(tc.Context(), p)
require.NoError(t, err)
return ids
} }
func assertStatus(t *testing.T, w *httptest.ResponseRecorder, status int) { func assertStatus(t *testing.T, w *httptest.ResponseRecorder, status int) {

View file

@ -49,22 +49,6 @@ type PrmAuth struct {
PrivateKey *ecdsa.PrivateKey PrivateKey *ecdsa.PrivateKey
} }
// PrmObjectSelect groups parameters of NeoFS.SelectObjects operation.
type PrmObjectSelect struct {
// Authentication parameters.
PrmAuth
// Container to select the objects from.
Container cid.ID
// Key-value object attribute which should be
// presented in selected objects. Optional, empty key means any.
ExactAttribute [2]string
// File prefix of the selected objects. Optional, empty value means any.
FilePrefix string
}
// PrmObjectRead groups parameters of NeoFS.ReadObject operation. // PrmObjectRead groups parameters of NeoFS.ReadObject operation.
type PrmObjectRead struct { type PrmObjectRead struct {
// Authentication parameters. // Authentication parameters.
@ -184,15 +168,6 @@ type NeoFS interface {
// It returns any error encountered which prevented the removal request from being sent. // It returns any error encountered which prevented the removal request from being sent.
DeleteContainer(context.Context, cid.ID, *session.Container) error DeleteContainer(context.Context, cid.ID, *session.Container) error
// SelectObjects performs object selection from the NeoFS container according
// to the specified parameters. It selects user's objects only.
//
// It returns ErrAccessDenied on selection access violation.
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the objects from being selected.
SelectObjects(context.Context, PrmObjectSelect) ([]oid.ID, error)
// ReadObject reads a part of the object from the NeoFS container by identifier. // ReadObject reads a part of the object from the NeoFS container by identifier.
// Exact part is returned according to the parameters: // Exact part is returned according to the parameters:
// * with header only: empty payload (both in-mem and reader parts are nil); // * with header only: empty payload (both in-mem and reader parts are nil);

View file

@ -7,7 +7,6 @@ import (
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"io" "io"
"strings"
"time" "time"
objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object" objectv2 "github.com/nspcc-dev/neofs-api-go/v2/object"
@ -28,8 +27,6 @@ type TestNeoFS struct {
currentEpoch uint64 currentEpoch uint64
} }
const objectSystemAttributeName = "S3-System-name"
func NewTestNeoFS() *TestNeoFS { func NewTestNeoFS() *TestNeoFS {
return &TestNeoFS{ return &TestNeoFS{
objects: make(map[string]*object.Object), objects: make(map[string]*object.Object),
@ -126,48 +123,6 @@ func (t *TestNeoFS) UserContainers(_ context.Context, _ user.ID) ([]cid.ID, erro
return res, nil return res, nil
} }
func (t *TestNeoFS) SelectObjects(_ context.Context, prm PrmObjectSelect) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prm.FilePrefix != "" {
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
}
if prm.ExactAttribute[0] != "" {
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
}
cidStr := prm.Container.EncodeToString()
var res []oid.ID
if len(filters) == 1 {
for k, v := range t.objects {
if strings.Contains(k, cidStr) {
id, _ := v.ID()
res = append(res, id)
}
}
return res, nil
}
filter := filters[1]
if len(filters) != 2 || filter.Operation() != object.MatchStringEqual ||
(filter.Header() != object.AttributeFileName && filter.Header() != objectSystemAttributeName) {
return nil, fmt.Errorf("usupported filters")
}
for k, v := range t.objects {
if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) {
id, _ := v.ID()
res = append(res, id)
}
}
return res, nil
}
func (t *TestNeoFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) { func (t *TestNeoFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -264,12 +219,16 @@ func (t *TestNeoFS) TimeToEpoch(_ context.Context, futureTime time.Time) (uint64
return t.currentEpoch, t.currentEpoch + uint64(futureTime.Second()), nil return t.currentEpoch, t.currentEpoch + uint64(futureTime.Second()), nil
} }
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { func (t *TestNeoFS) AllObjects(cnrID cid.ID) []oid.ID {
for _, attr := range attributes { result := make([]oid.ID, 0, len(t.objects))
if attr.Key() == filter.Header() && attr.Value() == filter.Value() {
return true for _, val := range t.objects {
objCnrID, _ := val.ContainerID()
objObjID, _ := val.ID()
if cnrID.Equals(objCnrID) {
result = append(result, objObjID)
} }
} }
return false return result
} }

View file

@ -279,53 +279,6 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
return idObj, nil return idObj, nil
} }
// SelectObjects implements neofs.NeoFS interface method.
func (x *NeoFS) SelectObjects(ctx context.Context, prm layer.PrmObjectSelect) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prm.ExactAttribute[0] != "" {
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
}
if prm.FilePrefix != "" {
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
}
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(prm.Container)
prmSearch.SetFilters(filters)
if prm.BearerToken != nil {
prmSearch.UseBearer(*prm.BearerToken)
} else {
prmSearch.UseKey(prm.PrivateKey)
}
res, err := x.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, fmt.Errorf("init object search via connection pool: %w", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
if err != nil {
if reason, ok := isErrAccessDenied(err); ok {
return nil, fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason)
}
return nil, fmt.Errorf("read object list: %w", err)
}
return buf, nil
}
// wraps io.ReadCloser and transforms Read errors related to access violation // wraps io.ReadCloser and transforms Read errors related to access violation
// to neofs.ErrAccessDenied. // to neofs.ErrAccessDenied.
type payloadReader struct { type payloadReader struct {