diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index ed43fc41f..85d1b08ba 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -389,8 +389,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc return result, true } - buf := make([]byte, addressKeySize) - obj, err := db.get(tx, addr, buf, true, false, currEpoch) + obj, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch) if err != nil { return result, false } @@ -401,17 +400,26 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc case v2object.FilterHeaderVersion: data = []byte(obj.Version().String()) case v2object.FilterHeaderHomomorphicHash: + if isECChunk { + return result, false // EC chunk and EC parent hashes are incomparable + } cs, _ := obj.PayloadHomomorphicHash() data = cs.Value() case v2object.FilterHeaderCreationEpoch: data = make([]byte, 8) binary.LittleEndian.PutUint64(data, obj.CreationEpoch()) case v2object.FilterHeaderPayloadLength: + if isECChunk { + return result, false // EC chunk and EC parent payload lengths are incomparable + } data = make([]byte, 8) binary.LittleEndian.PutUint64(data, obj.PayloadSize()) case v2object.FilterHeaderOwnerID: data = []byte(obj.OwnerID().EncodeToString()) case v2object.FilterHeaderPayloadHash: + if isECChunk { + return result, false // EC chunk and EC parent payload hashes are incomparable + } cs, _ := obj.PayloadChecksum() data = cs.Value() default: // user attribute @@ -439,6 +447,29 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc return result, true } +func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) { + buf := make([]byte, addressKeySize) + obj, err := db.get(tx, addr, buf, true, false, currEpoch) + if err != nil { + var ecInfoError *objectSDK.ECInfoError + if errors.As(err, &ecInfoError) { + for _, chunk := range ecInfoError.ECInfo().Chunks { + var objID oid.ID + if err = objID.ReadFromV2(chunk.ID); err != nil { + continue + } + addr.SetObject(objID) + obj, err = db.get(tx, addr, buf, true, false, currEpoch) + if err == nil { + return obj, true, nil + } + } + } + return nil, false, err + } + return obj, false, nil +} + func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) { objectAttributes := obj.Attributes() if ech := obj.ECHeader(); ech != nil { diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index fcd5d3a90..0c6ebc863 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -762,6 +762,56 @@ func TestDB_SelectOwnerID(t *testing.T) { }) } +func TestDB_SelectECWithFastAndSlowFilters(t *testing.T) { + t.Parallel() + + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + + cnr := cidtest.ID() + ecChunk1 := oidtest.ID() + ecChunk2 := oidtest.ID() + ecParent := oidtest.ID() + var ecParentAddr oid.Address + ecParentAddr.SetContainer(cnr) + ecParentAddr.SetObject(ecParent) + var ecParentAttr []objectSDK.Attribute + var attr objectSDK.Attribute + attr.SetKey(objectSDK.AttributeFilePath) + attr.SetValue("/1/2/3") + ecParentAttr = append(ecParentAttr, attr) + + chunkObj := testutil.GenerateObjectWithCID(cnr) + chunkObj.SetContainerID(cnr) + chunkObj.SetID(ecChunk1) + chunkObj.SetPayload([]byte{0, 1, 2, 3, 4}) + chunkObj.SetPayloadSize(uint64(5)) + chunkObj.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent, Attributes: ecParentAttr}, 0, 3, []byte{}, 0)) + + chunkObj2 := testutil.GenerateObjectWithCID(cnr) + chunkObj2.SetContainerID(cnr) + chunkObj2.SetID(ecChunk2) + chunkObj2.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + chunkObj2.SetPayloadSize(uint64(10)) + chunkObj2.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent, Attributes: ecParentAttr}, 1, 3, []byte{}, 0)) + + // put object with EC + + var prm meta.PutPrm + prm.SetObject(chunkObj) + _, err := db.Put(context.Background(), prm) + require.NoError(t, err) + + prm.SetObject(chunkObj2) + _, err = db.Put(context.Background(), prm) + require.NoError(t, err) + + fs := objectSDK.SearchFilters{} + fs.AddRootFilter() + fs.AddFilter(objectSDK.AttributeFilePath, "/1/2/3", objectSDK.MatchCommonPrefix) + testSelect(t, db, cnr, fs, ecParentAddr) +} + type testTarget struct { objects []*objectSDK.Object }