forked from TrueCloudLab/frostfs-node
[#1408] metabase: Fix EC search with slow and fast filters
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
f83f7feb8c
commit
434048e8d9
2 changed files with 83 additions and 2 deletions
|
@ -389,8 +389,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
|||
return result, true
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
obj, err := db.get(tx, addr, buf, true, false, currEpoch)
|
||||
obj, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch)
|
||||
if err != nil {
|
||||
return result, false
|
||||
}
|
||||
|
@ -401,17 +400,26 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
|||
case v2object.FilterHeaderVersion:
|
||||
data = []byte(obj.Version().String())
|
||||
case v2object.FilterHeaderHomomorphicHash:
|
||||
if isECChunk {
|
||||
return result, false // EC chunk and EC parent hashes are incomparable
|
||||
}
|
||||
cs, _ := obj.PayloadHomomorphicHash()
|
||||
data = cs.Value()
|
||||
case v2object.FilterHeaderCreationEpoch:
|
||||
data = make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, obj.CreationEpoch())
|
||||
case v2object.FilterHeaderPayloadLength:
|
||||
if isECChunk {
|
||||
return result, false // EC chunk and EC parent payload lengths are incomparable
|
||||
}
|
||||
data = make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
|
||||
case v2object.FilterHeaderOwnerID:
|
||||
data = []byte(obj.OwnerID().EncodeToString())
|
||||
case v2object.FilterHeaderPayloadHash:
|
||||
if isECChunk {
|
||||
return result, false // EC chunk and EC parent payload hashes are incomparable
|
||||
}
|
||||
cs, _ := obj.PayloadChecksum()
|
||||
data = cs.Value()
|
||||
default: // user attribute
|
||||
|
@ -439,6 +447,29 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
|||
return result, true
|
||||
}
|
||||
|
||||
func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) {
|
||||
buf := make([]byte, addressKeySize)
|
||||
obj, err := db.get(tx, addr, buf, true, false, currEpoch)
|
||||
if err != nil {
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
if errors.As(err, &ecInfoError) {
|
||||
for _, chunk := range ecInfoError.ECInfo().Chunks {
|
||||
var objID oid.ID
|
||||
if err = objID.ReadFromV2(chunk.ID); err != nil {
|
||||
continue
|
||||
}
|
||||
addr.SetObject(objID)
|
||||
obj, err = db.get(tx, addr, buf, true, false, currEpoch)
|
||||
if err == nil {
|
||||
return obj, true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, false, err
|
||||
}
|
||||
return obj, false, nil
|
||||
}
|
||||
|
||||
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
||||
objectAttributes := obj.Attributes()
|
||||
if ech := obj.ECHeader(); ech != nil {
|
||||
|
|
|
@ -762,6 +762,56 @@ func TestDB_SelectOwnerID(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestDB_SelectECWithFastAndSlowFilters(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ecChunk1 := oidtest.ID()
|
||||
ecChunk2 := oidtest.ID()
|
||||
ecParent := oidtest.ID()
|
||||
var ecParentAddr oid.Address
|
||||
ecParentAddr.SetContainer(cnr)
|
||||
ecParentAddr.SetObject(ecParent)
|
||||
var ecParentAttr []objectSDK.Attribute
|
||||
var attr objectSDK.Attribute
|
||||
attr.SetKey(objectSDK.AttributeFilePath)
|
||||
attr.SetValue("/1/2/3")
|
||||
ecParentAttr = append(ecParentAttr, attr)
|
||||
|
||||
chunkObj := testutil.GenerateObjectWithCID(cnr)
|
||||
chunkObj.SetContainerID(cnr)
|
||||
chunkObj.SetID(ecChunk1)
|
||||
chunkObj.SetPayload([]byte{0, 1, 2, 3, 4})
|
||||
chunkObj.SetPayloadSize(uint64(5))
|
||||
chunkObj.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent, Attributes: ecParentAttr}, 0, 3, []byte{}, 0))
|
||||
|
||||
chunkObj2 := testutil.GenerateObjectWithCID(cnr)
|
||||
chunkObj2.SetContainerID(cnr)
|
||||
chunkObj2.SetID(ecChunk2)
|
||||
chunkObj2.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
|
||||
chunkObj2.SetPayloadSize(uint64(10))
|
||||
chunkObj2.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent, Attributes: ecParentAttr}, 1, 3, []byte{}, 0))
|
||||
|
||||
// put object with EC
|
||||
|
||||
var prm meta.PutPrm
|
||||
prm.SetObject(chunkObj)
|
||||
_, err := db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
prm.SetObject(chunkObj2)
|
||||
_, err = db.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
fs := objectSDK.SearchFilters{}
|
||||
fs.AddRootFilter()
|
||||
fs.AddFilter(objectSDK.AttributeFilePath, "/1/2/3", objectSDK.MatchCommonPrefix)
|
||||
testSelect(t, db, cnr, fs, ecParentAddr)
|
||||
}
|
||||
|
||||
type testTarget struct {
|
||||
objects []*objectSDK.Object
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue