[#1147] node: Use public fields for shard.ExistsPrm

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-05-30 09:26:06 +03:00 committed by Evgenii Stratonikov
parent c1af13b47e
commit 92e19feb57
9 changed files with 26 additions and 30 deletions

View file

@ -83,7 +83,7 @@ func (e *StorageEngine) delete(ctx context.Context, prm DeletePrm) (DeleteRes, e
// 2. Otherwise, search for all objects with a particular SplitID and delete them too. // 2. Otherwise, search for all objects with a particular SplitID and delete them too.
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.SetAddress(prm.addr) existsPrm.Address = prm.addr
resExists, err := sh.Exists(ctx, existsPrm) resExists, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {

View file

@ -63,7 +63,10 @@ func benchmarkExists(b *testing.B, shardNum int) {
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ok, _, err := e.exists(context.Background(), addr, oid.Address{}) var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok { if err != nil || ok {
b.Fatalf("%t %v", ok, err) b.Fatalf("%t %v", ok, err)
} }

View file

@ -8,18 +8,16 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
) )
func (e *StorageEngine) exists(ctx context.Context, addr oid.Address, parentAddr oid.Address) (bool, bool, error) { // exists return in the first value true if object exists.
var shPrm shard.ExistsPrm // Second return value marks is parent object locked.
shPrm.SetAddress(addr) func (e *StorageEngine) exists(ctx context.Context, shPrm shard.ExistsPrm) (bool, bool, error) {
shPrm.SetParentAddress(parentAddr)
alreadyRemoved := false alreadyRemoved := false
exists := false exists := false
locked := false locked := false
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(shPrm.Address, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Exists(ctx, shPrm) res, err := sh.Exists(ctx, shPrm)
if err != nil { if err != nil {
if client.IsErrObjectAlreadyRemoved(err) { if client.IsErrObjectAlreadyRemoved(err) {

View file

@ -142,7 +142,7 @@ func (e *StorageEngine) inhumeAddr(ctx context.Context, addr oid.Address, prm sh
}() }()
if checkExists { if checkExists {
existPrm.SetAddress(addr) existPrm.Address = addr
exRes, err := sh.Exists(ctx, existPrm) exRes, err := sh.Exists(ctx, existPrm)
if err != nil { if err != nil {
if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) { if client.IsErrObjectAlreadyRemoved(err) || shard.IsErrObjectExpired(err) {

View file

@ -78,7 +78,7 @@ func (e *StorageEngine) lockSingle(ctx context.Context, idCnr cid.ID, locker, lo
if checkExists { if checkExists {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addrLocked) existsPrm.Address = addrLocked
exRes, err := sh.Exists(ctx, existsPrm) exRes, err := sh.Exists(ctx, existsPrm)
if err != nil { if err != nil {

View file

@ -85,7 +85,10 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
parent.SetObject(prm.obj.ECHeader().Parent()) parent.SetObject(prm.obj.ECHeader().Parent())
parent.SetContainer(addr.Container()) parent.SetContainer(addr.Container())
} }
existed, locked, err := e.exists(ctx, addr, parent) var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = parent
existed, locked, err := e.exists(ctx, shPrm)
if err != nil { if err != nil {
return err return err
} }
@ -138,7 +141,7 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool uti
defer close(exitCh) defer close(exitCh)
var existPrm shard.ExistsPrm var existPrm shard.ExistsPrm
existPrm.SetAddress(addr) existPrm.Address = addr
exists, err := sh.Exists(ctx, existPrm) exists, err := sh.Exists(ctx, existPrm)
if err != nil { if err != nil {

View file

@ -115,7 +115,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
found := false found := false
for i := range shards { for i := range shards {
var existsPrm shard.ExistsPrm var existsPrm shard.ExistsPrm
existsPrm.SetAddress(addr) existsPrm.Address = addr
res, err := shards[i].Exists(ctx, existsPrm) res, err := shards[i].Exists(ctx, existsPrm)
if err != nil { if err != nil {

View file

@ -13,8 +13,10 @@ import (
// ExistsPrm groups the parameters of Exists operation. // ExistsPrm groups the parameters of Exists operation.
type ExistsPrm struct { type ExistsPrm struct {
addr oid.Address // Exists option to set object checked for existence.
paddr oid.Address Address oid.Address
// Exists option to set parent object checked for existence.
ParentAddress oid.Address
} }
// ExistsRes groups the resulting values of Exists operation. // ExistsRes groups the resulting values of Exists operation.
@ -23,16 +25,6 @@ type ExistsRes struct {
lc bool lc bool
} }
// SetAddress is an Exists option to set object checked for existence.
func (p *ExistsPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetParentAddress is an Exists option to set parent object checked for existence.
func (p *ExistsPrm) SetParentAddress(addr oid.Address) {
p.paddr = addr
}
// Exists returns the fact that the object is in the shard. // Exists returns the fact that the object is in the shard.
func (p ExistsRes) Exists() bool { func (p ExistsRes) Exists() bool {
return p.ex return p.ex
@ -55,7 +47,7 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists", ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Exists",
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard_id", s.ID().String()), attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()), attribute.String("address", prm.Address.EncodeToString()),
)) ))
defer span.End() defer span.End()
@ -70,15 +62,15 @@ func (s *Shard) Exists(ctx context.Context, prm ExistsPrm) (ExistsRes, error) {
return ExistsRes{}, ErrShardDisabled return ExistsRes{}, ErrShardDisabled
} else if s.info.Mode.NoMetabase() { } else if s.info.Mode.NoMetabase() {
var p common.ExistsPrm var p common.ExistsPrm
p.Address = prm.addr p.Address = prm.Address
var res common.ExistsRes var res common.ExistsRes
res, err = s.blobStor.Exists(ctx, p) res, err = s.blobStor.Exists(ctx, p)
exists = res.Exists exists = res.Exists
} else { } else {
var existsPrm meta.ExistsPrm var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.addr) existsPrm.SetAddress(prm.Address)
existsPrm.SetParent(prm.paddr) existsPrm.SetParent(prm.ParentAddress)
var res meta.ExistsRes var res meta.ExistsRes
res, err = s.metaBase.Exists(ctx, existsPrm) res, err = s.metaBase.Exists(ctx, existsPrm)

View file

@ -72,7 +72,7 @@ func TestShardReload(t *testing.T) {
checkHasObjects := func(t *testing.T, exists bool) { checkHasObjects := func(t *testing.T, exists bool) {
for i := range objects { for i := range objects {
var prm ExistsPrm var prm ExistsPrm
prm.SetAddress(objects[i].addr) prm.Address = objects[i].addr
res, err := sh.Exists(context.Background(), prm) res, err := sh.Exists(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)