[#9999] searchSvc: Check container is S3 container
For non S3 containers it is expected to use attributes index for some attributes. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
4ec2f8e502
commit
72b92f50ae
8 changed files with 42 additions and 13 deletions
|
@ -174,7 +174,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sPutV2 := createPutSvcV2(sPut, keyStorage)
|
sPutV2 := createPutSvcV2(sPut, keyStorage)
|
||||||
|
|
||||||
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache)
|
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||||
|
|
||||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||||
|
|
||||||
|
@ -366,7 +366,7 @@ func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Servic
|
||||||
return patchsvc.NewService(sPut.Config, sGet)
|
return patchsvc.NewService(sPut.Config, sGet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, containerSource containercore.Source) *searchsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
return searchsvc.New(
|
return searchsvc.New(
|
||||||
|
@ -377,6 +377,7 @@ func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Trav
|
||||||
),
|
),
|
||||||
c.netMapSource,
|
c.netMapSource,
|
||||||
keyStorage,
|
keyStorage,
|
||||||
|
containerSource,
|
||||||
searchsvc.WithLogger(c.log),
|
searchsvc.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
addrs, err := Select(context.Background(), e, cnr, fs)
|
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, addrs)
|
require.Empty(t, addrs)
|
||||||
})
|
})
|
||||||
|
@ -78,7 +78,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
_, err = e.Inhume(context.Background(), inhumePrm)
|
_, err = e.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
addrs, err := Select(context.Background(), e, cnr, fs)
|
addrs, err := Select(context.Background(), e, cnr, false, fs)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, addrs)
|
require.Empty(t, addrs)
|
||||||
})
|
})
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
type SelectPrm struct {
|
type SelectPrm struct {
|
||||||
cnr cid.ID
|
cnr cid.ID
|
||||||
filters objectSDK.SearchFilters
|
filters objectSDK.SearchFilters
|
||||||
|
isS3Container bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectRes groups the resulting values of Select operation.
|
// SelectRes groups the resulting values of Select operation.
|
||||||
|
@ -24,8 +25,9 @@ type SelectRes struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithContainerID is a Select option to set the container id to search in.
|
// WithContainerID is a Select option to set the container id to search in.
|
||||||
func (p *SelectPrm) WithContainerID(cnr cid.ID) {
|
func (p *SelectPrm) WithContainerID(cnr cid.ID, isS3Container bool) {
|
||||||
p.cnr = cnr
|
p.cnr = cnr
|
||||||
|
p.isS3Container = isS3Container
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithFilters is a Select option to set the object filters.
|
// WithFilters is a Select option to set the object filters.
|
||||||
|
@ -71,6 +73,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
|
||||||
var shPrm shard.SelectPrm
|
var shPrm shard.SelectPrm
|
||||||
shPrm.SetContainerID(prm.cnr)
|
shPrm.SetContainerID(prm.cnr)
|
||||||
shPrm.SetFilters(prm.filters)
|
shPrm.SetFilters(prm.filters)
|
||||||
|
shPrm.SetIsS3Container(prm.isS3Container)
|
||||||
|
|
||||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||||
res, err := sh.Select(ctx, shPrm)
|
res, err := sh.Select(ctx, shPrm)
|
||||||
|
@ -144,9 +147,9 @@ func (e *StorageEngine) list(ctx context.Context, limit uint64) (SelectRes, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select selects objects from local storage using provided filters.
|
// Select selects objects from local storage using provided filters.
|
||||||
func Select(ctx context.Context, storage *StorageEngine, cnr cid.ID, fs objectSDK.SearchFilters) ([]oid.Address, error) {
|
func Select(ctx context.Context, storage *StorageEngine, cnr cid.ID, isS3Container bool, fs objectSDK.SearchFilters) ([]oid.Address, error) {
|
||||||
var selectPrm SelectPrm
|
var selectPrm SelectPrm
|
||||||
selectPrm.WithContainerID(cnr)
|
selectPrm.WithContainerID(cnr, isS3Container)
|
||||||
selectPrm.WithFilters(fs)
|
selectPrm.WithFilters(fs)
|
||||||
|
|
||||||
res, err := storage.Select(ctx, selectPrm)
|
res, err := storage.Select(ctx, selectPrm)
|
||||||
|
|
|
@ -50,7 +50,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) {
|
||||||
|
|
||||||
b.Run("search", func(b *testing.B) {
|
b.Run("search", func(b *testing.B) {
|
||||||
var prm SelectPrm
|
var prm SelectPrm
|
||||||
prm.WithContainerID(cid)
|
prm.WithContainerID(cid, true)
|
||||||
|
|
||||||
var fs objectSDK.SearchFilters
|
var fs objectSDK.SearchFilters
|
||||||
fs.AddFilter(pilorama.AttributeFilename, strconv.Itoa(objCount/2), objectSDK.MatchStringEqual)
|
fs.AddFilter(pilorama.AttributeFilename, strconv.Itoa(objCount/2), objectSDK.MatchStringEqual)
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
type SelectPrm struct {
|
type SelectPrm struct {
|
||||||
cnr cid.ID
|
cnr cid.ID
|
||||||
filters objectSDK.SearchFilters
|
filters objectSDK.SearchFilters
|
||||||
|
isS3Container bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectRes groups the resulting values of Select operation.
|
// SelectRes groups the resulting values of Select operation.
|
||||||
|
@ -34,6 +35,10 @@ func (p *SelectPrm) SetFilters(fs objectSDK.SearchFilters) {
|
||||||
p.filters = fs
|
p.filters = fs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *SelectPrm) SetIsS3Container(v bool) {
|
||||||
|
p.isS3Container = v
|
||||||
|
}
|
||||||
|
|
||||||
// AddressList returns list of addresses of the selected objects.
|
// AddressList returns list of addresses of the selected objects.
|
||||||
func (r SelectRes) AddressList() []oid.Address {
|
func (r SelectRes) AddressList() []oid.Address {
|
||||||
return r.addrList
|
return r.addrList
|
||||||
|
@ -61,6 +66,7 @@ func (s *Shard) Select(ctx context.Context, prm SelectPrm) (SelectRes, error) {
|
||||||
var selectPrm meta.SelectPrm
|
var selectPrm meta.SelectPrm
|
||||||
selectPrm.SetFilters(prm.filters)
|
selectPrm.SetFilters(prm.filters)
|
||||||
selectPrm.SetContainerID(prm.cnr)
|
selectPrm.SetContainerID(prm.cnr)
|
||||||
|
selectPrm.SetUseAttributeIndex(!prm.isS3Container)
|
||||||
|
|
||||||
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
mRes, err := s.metaBase.Select(ctx, selectPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -112,3 +113,12 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) getContainer() (containerSDK.Container, error) {
|
||||||
|
cnrID := exec.containerID()
|
||||||
|
cnr, err := exec.svc.containerSource.Get(cnrID)
|
||||||
|
if err != nil {
|
||||||
|
return containerSDK.Container{}, err
|
||||||
|
}
|
||||||
|
return cnr.Value, nil
|
||||||
|
}
|
||||||
|
|
|
@ -54,6 +54,8 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
keyStore *util.KeyStorage
|
keyStore *util.KeyStorage
|
||||||
|
|
||||||
|
containerSource container.Source
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
|
@ -63,6 +65,7 @@ func New(e *engine.StorageEngine,
|
||||||
tg *util.TraverserGenerator,
|
tg *util.TraverserGenerator,
|
||||||
ns netmap.Source,
|
ns netmap.Source,
|
||||||
ks *util.KeyStorage,
|
ks *util.KeyStorage,
|
||||||
|
cs container.Source,
|
||||||
opts ...Option,
|
opts ...Option,
|
||||||
) *Service {
|
) *Service {
|
||||||
c := &cfg{
|
c := &cfg{
|
||||||
|
@ -76,6 +79,7 @@ func New(e *engine.StorageEngine,
|
||||||
traverserGenerator: tg,
|
traverserGenerator: tg,
|
||||||
currentEpochReceiver: ns,
|
currentEpochReceiver: ns,
|
||||||
keyStore: ks,
|
keyStore: ks,
|
||||||
|
containerSource: cs,
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -112,9 +113,13 @@ func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *storageEngineWrapper) search(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
func (e *storageEngineWrapper) search(ctx context.Context, exec *execCtx) ([]oid.ID, error) {
|
||||||
|
cnr, err := exec.getContainer()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
var selectPrm engine.SelectPrm
|
var selectPrm engine.SelectPrm
|
||||||
selectPrm.WithFilters(exec.searchFilters())
|
selectPrm.WithFilters(exec.searchFilters())
|
||||||
selectPrm.WithContainerID(exec.containerID())
|
selectPrm.WithContainerID(exec.containerID(), container.IsS3Container(cnr))
|
||||||
|
|
||||||
r, err := e.storage.Select(ctx, selectPrm)
|
r, err := e.storage.Select(ctx, selectPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue