forked from TrueCloudLab/frostfs-node
[#1972] node: Do not save objects if node not in a container
Do not use node's local storage if it is clear that an object will be removed anyway as a redundant. It requires moving the changing local storage logic from the validation step to the local target implementation. It allows performing any relations checks (e.g. object locking) only if a node is considered as a valid container member and is expected to store (stored previously) all the helper objects (e.g. `LOCK`, `TOMBSTONE`, etc). Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
a77392e9ce
commit
aab398f4f5
7 changed files with 158 additions and 117 deletions
|
@ -97,20 +97,6 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
|
||||||
return s.get.GetRangeHash(ctx, req)
|
return s.get.GetRangeHash(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
type localObjectInhumer struct {
|
|
||||||
storage *engine.StorageEngine
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *localObjectInhumer) DeleteObjects(ts oid.Address, addr ...oid.Address) error {
|
|
||||||
var prm engine.InhumePrm
|
|
||||||
prm.WithTarget(ts, addr...)
|
|
||||||
|
|
||||||
_, err := r.storage.Inhume(prm)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
type delNetInfo struct {
|
type delNetInfo struct {
|
||||||
netmap.State
|
netmap.State
|
||||||
tsLifetime uint64
|
tsLifetime uint64
|
||||||
|
@ -202,11 +188,6 @@ func initObjectService(c *cfg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objInhumer := &localObjectInhumer{
|
|
||||||
storage: ls,
|
|
||||||
log: c.log,
|
|
||||||
}
|
|
||||||
|
|
||||||
c.replicator = replicator.New(
|
c.replicator = replicator.New(
|
||||||
replicator.WithLogger(c.log),
|
replicator.WithLogger(c.log),
|
||||||
replicator.WithPutTimeout(
|
replicator.WithPutTimeout(
|
||||||
|
@ -254,7 +235,7 @@ func initObjectService(c *cfg) {
|
||||||
c.workers = append(c.workers, pol)
|
c.workers = append(c.workers, pol)
|
||||||
|
|
||||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||||
e: ls,
|
engine: ls,
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.cfgNotifications.enabled {
|
if c.cfgNotifications.enabled {
|
||||||
|
@ -274,10 +255,6 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
||||||
putsvc.WithNetworkMapSource(c.netMapSource),
|
putsvc.WithNetworkMapSource(c.netMapSource),
|
||||||
putsvc.WithNetmapKeys(c),
|
putsvc.WithNetmapKeys(c),
|
||||||
putsvc.WithFormatValidatorOpts(
|
|
||||||
objectCore.WithDeleteHandler(objInhumer),
|
|
||||||
objectCore.WithLocker(ls),
|
|
||||||
),
|
|
||||||
putsvc.WithNetworkState(c.cfgNetmap.state),
|
putsvc.WithNetworkState(c.cfgNetmap.state),
|
||||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
|
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote),
|
||||||
putsvc.WithLogger(c.log),
|
putsvc.WithLogger(c.log),
|
||||||
|
@ -561,6 +538,14 @@ type engineWithNotifications struct {
|
||||||
defaultTopic string
|
defaultTopic string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||||
|
return e.base.Delete(tombstone, toDelete)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e engineWithNotifications) Lock(locker oid.Address, toLock []oid.ID) error {
|
||||||
|
return e.base.Lock(locker, toLock)
|
||||||
|
}
|
||||||
|
|
||||||
func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
||||||
if err := e.base.Put(o); err != nil {
|
if err := e.base.Put(o); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -583,9 +568,28 @@ func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
type engineWithoutNotifications struct {
|
type engineWithoutNotifications struct {
|
||||||
e *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||||
|
var prm engine.InhumePrm
|
||||||
|
|
||||||
|
addrs := make([]oid.Address, len(toDelete))
|
||||||
|
for i := range addrs {
|
||||||
|
addrs[i].SetContainer(tombstone.Container())
|
||||||
|
addrs[i].SetObject(toDelete[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
prm.WithTarget(tombstone, addrs...)
|
||||||
|
|
||||||
|
_, err := e.engine.Inhume(prm)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) error {
|
||||||
|
return e.engine.Lock(locker.Container(), locker.Object(), toLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
|
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
|
||||||
return engine.Put(e.e, o)
|
return engine.Put(e.engine, o)
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,11 +26,7 @@ type FormatValidator struct {
|
||||||
type FormatValidatorOption func(*cfg)
|
type FormatValidatorOption func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
deleteHandler DeleteHandler
|
|
||||||
|
|
||||||
netState netmap.State
|
netState netmap.State
|
||||||
|
|
||||||
locker Locker
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteHandler is an interface of delete queue processor.
|
// DeleteHandler is an interface of delete queue processor.
|
||||||
|
@ -173,130 +169,141 @@ func (v *FormatValidator) checkOwnerKey(id user.ID, key neofsecdsa.PublicKey) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ContentMeta describes NeoFS meta information that brings object's payload if the object
|
||||||
|
// is one of:
|
||||||
|
// - object.TypeTombstone;
|
||||||
|
// - object.TypeStorageGroup;
|
||||||
|
// - object.TypeLock.
|
||||||
|
type ContentMeta struct {
|
||||||
|
typ object.Type
|
||||||
|
|
||||||
|
objs []oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns object's type.
|
||||||
|
func (i ContentMeta) Type() object.Type {
|
||||||
|
return i.typ
|
||||||
|
}
|
||||||
|
|
||||||
|
// Objects returns objects that the original object's payload affects:
|
||||||
|
// - inhumed objects, if the original object is a Tombstone;
|
||||||
|
// - locked objects, if the original object is a Lock;
|
||||||
|
// - members of a storage group, if the original object is a Storage group;
|
||||||
|
// - nil, if the original object is a Regular object.
|
||||||
|
func (i ContentMeta) Objects() []oid.ID {
|
||||||
|
return i.objs
|
||||||
|
}
|
||||||
|
|
||||||
// ValidateContent validates payload content according to the object type.
|
// ValidateContent validates payload content according to the object type.
|
||||||
func (v *FormatValidator) ValidateContent(o *object.Object) error {
|
func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) {
|
||||||
|
meta := ContentMeta{
|
||||||
|
typ: o.Type(),
|
||||||
|
}
|
||||||
|
|
||||||
switch o.Type() {
|
switch o.Type() {
|
||||||
case object.TypeRegular:
|
case object.TypeRegular:
|
||||||
// ignore regular objects, they do not need payload formatting
|
// ignore regular objects, they do not need payload formatting
|
||||||
case object.TypeTombstone:
|
case object.TypeTombstone:
|
||||||
if len(o.Payload()) == 0 {
|
if len(o.Payload()) == 0 {
|
||||||
return fmt.Errorf("(%T) empty payload in tombstone", v)
|
return ContentMeta{}, fmt.Errorf("(%T) empty payload in tombstone", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
tombstone := object.NewTombstone()
|
tombstone := object.NewTombstone()
|
||||||
|
|
||||||
if err := tombstone.Unmarshal(o.Payload()); err != nil {
|
if err := tombstone.Unmarshal(o.Payload()); err != nil {
|
||||||
return fmt.Errorf("(%T) could not unmarshal tombstone content: %w", v, err)
|
return ContentMeta{}, fmt.Errorf("(%T) could not unmarshal tombstone content: %w", v, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if the tombstone has the same expiration in the body and the header
|
// check if the tombstone has the same expiration in the body and the header
|
||||||
exp, err := expirationEpochAttribute(o)
|
exp, err := expirationEpochAttribute(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return ContentMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if exp != tombstone.ExpirationEpoch() {
|
if exp != tombstone.ExpirationEpoch() {
|
||||||
return errTombstoneExpiration
|
return ContentMeta{}, errTombstoneExpiration
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark all objects from the tombstone body as removed in the storage engine
|
// mark all objects from the tombstone body as removed in the storage engine
|
||||||
cnr, ok := o.ContainerID()
|
_, ok := o.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("missing container ID")
|
return ContentMeta{}, errors.New("missing container ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
idList := tombstone.Members()
|
idList := tombstone.Members()
|
||||||
addrList := make([]oid.Address, len(idList))
|
meta.objs = idList
|
||||||
|
|
||||||
for i := range idList {
|
|
||||||
addrList[i].SetContainer(cnr)
|
|
||||||
addrList[i].SetObject(idList[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.deleteHandler != nil {
|
|
||||||
err = v.deleteHandler.DeleteObjects(AddressOf(o), addrList...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("delete objects from %s object content: %w", o.Type(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case object.TypeStorageGroup:
|
case object.TypeStorageGroup:
|
||||||
if len(o.Payload()) == 0 {
|
if len(o.Payload()) == 0 {
|
||||||
return fmt.Errorf("(%T) empty payload in SG", v)
|
return ContentMeta{}, fmt.Errorf("(%T) empty payload in SG", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
var sg storagegroup.StorageGroup
|
var sg storagegroup.StorageGroup
|
||||||
|
|
||||||
if err := sg.Unmarshal(o.Payload()); err != nil {
|
if err := sg.Unmarshal(o.Payload()); err != nil {
|
||||||
return fmt.Errorf("(%T) could not unmarshal SG content: %w", v, err)
|
return ContentMeta{}, fmt.Errorf("(%T) could not unmarshal SG content: %w", v, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mm := sg.Members()
|
mm := sg.Members()
|
||||||
|
meta.objs = mm
|
||||||
|
|
||||||
lenMM := len(mm)
|
lenMM := len(mm)
|
||||||
if lenMM == 0 {
|
if lenMM == 0 {
|
||||||
return errEmptySGMembers
|
return ContentMeta{}, errEmptySGMembers
|
||||||
}
|
}
|
||||||
|
|
||||||
uniqueFilter := make(map[oid.ID]struct{}, lenMM)
|
uniqueFilter := make(map[oid.ID]struct{}, lenMM)
|
||||||
|
|
||||||
for i := 0; i < lenMM; i++ {
|
for i := 0; i < lenMM; i++ {
|
||||||
if _, alreadySeen := uniqueFilter[mm[i]]; alreadySeen {
|
if _, alreadySeen := uniqueFilter[mm[i]]; alreadySeen {
|
||||||
return fmt.Errorf("storage group contains non-unique member: %s", mm[i])
|
return ContentMeta{}, fmt.Errorf("storage group contains non-unique member: %s", mm[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
uniqueFilter[mm[i]] = struct{}{}
|
uniqueFilter[mm[i]] = struct{}{}
|
||||||
}
|
}
|
||||||
case object.TypeLock:
|
case object.TypeLock:
|
||||||
if len(o.Payload()) == 0 {
|
if len(o.Payload()) == 0 {
|
||||||
return errors.New("empty payload in lock")
|
return ContentMeta{}, errors.New("empty payload in lock")
|
||||||
}
|
}
|
||||||
|
|
||||||
cnr, ok := o.ContainerID()
|
_, ok := o.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("missing container")
|
return ContentMeta{}, errors.New("missing container")
|
||||||
}
|
}
|
||||||
|
|
||||||
id, ok := o.ID()
|
_, ok = o.ID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("missing ID")
|
return ContentMeta{}, errors.New("missing ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
// check that LOCK object has correct expiration epoch
|
// check that LOCK object has correct expiration epoch
|
||||||
lockExp, err := expirationEpochAttribute(o)
|
lockExp, err := expirationEpochAttribute(o)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("lock object expiration epoch: %w", err)
|
return ContentMeta{}, fmt.Errorf("lock object expiration epoch: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if currEpoch := v.netState.CurrentEpoch(); lockExp < currEpoch {
|
if currEpoch := v.netState.CurrentEpoch(); lockExp < currEpoch {
|
||||||
return fmt.Errorf("lock object expiration: %d; current: %d", lockExp, currEpoch)
|
return ContentMeta{}, fmt.Errorf("lock object expiration: %d; current: %d", lockExp, currEpoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
var lock object.Lock
|
var lock object.Lock
|
||||||
|
|
||||||
err = lock.Unmarshal(o.Payload())
|
err = lock.Unmarshal(o.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode lock payload: %w", err)
|
return ContentMeta{}, fmt.Errorf("decode lock payload: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.locker != nil {
|
|
||||||
num := lock.NumberOfMembers()
|
num := lock.NumberOfMembers()
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
return errors.New("missing locked members")
|
return ContentMeta{}, errors.New("missing locked members")
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark all objects from lock list as locked in the storage engine
|
meta.objs = make([]oid.ID, num)
|
||||||
locklist := make([]oid.ID, num)
|
lock.ReadMembers(meta.objs)
|
||||||
lock.ReadMembers(locklist)
|
|
||||||
|
|
||||||
err = v.locker.Lock(cnr, id, locklist)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("lock objects from %s object content: %w", o.Type(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
// ignore all other object types, they do not need payload formatting
|
// ignore all other object types, they do not need payload formatting
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return meta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errExpired = errors.New("object has expired")
|
var errExpired = errors.New("object has expired")
|
||||||
|
@ -373,17 +380,3 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
|
||||||
c.netState = netState
|
c.netState = netState
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDeleteHandler returns an option to set delete queue processor.
|
|
||||||
func WithDeleteHandler(v DeleteHandler) FormatValidatorOption {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.deleteHandler = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLocker returns an option to set object lock storage.
|
|
||||||
func WithLocker(v Locker) FormatValidatorOption {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.locker = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -114,7 +114,8 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
obj.SetType(object.TypeTombstone)
|
obj.SetType(object.TypeTombstone)
|
||||||
obj.SetContainerID(cidtest.ID())
|
obj.SetContainerID(cidtest.ID())
|
||||||
|
|
||||||
require.Error(t, v.ValidateContent(obj)) // no tombstone content
|
_, err := v.ValidateContent(obj)
|
||||||
|
require.Error(t, err) // no tombstone content
|
||||||
|
|
||||||
content := object.NewTombstone()
|
content := object.NewTombstone()
|
||||||
content.SetMembers([]oid.ID{oidtest.ID()})
|
content.SetMembers([]oid.ID{oidtest.ID()})
|
||||||
|
@ -124,7 +125,8 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
|
|
||||||
obj.SetPayload(data)
|
obj.SetPayload(data)
|
||||||
|
|
||||||
require.Error(t, v.ValidateContent(obj)) // no members in tombstone
|
_, err = v.ValidateContent(obj)
|
||||||
|
require.Error(t, err) // no members in tombstone
|
||||||
|
|
||||||
content.SetMembers([]oid.ID{oidtest.ID()})
|
content.SetMembers([]oid.ID{oidtest.ID()})
|
||||||
|
|
||||||
|
@ -133,7 +135,8 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
|
|
||||||
obj.SetPayload(data)
|
obj.SetPayload(data)
|
||||||
|
|
||||||
require.Error(t, v.ValidateContent(obj)) // no expiration epoch in tombstone
|
_, err = v.ValidateContent(obj)
|
||||||
|
require.Error(t, err) // no expiration epoch in tombstone
|
||||||
|
|
||||||
var expirationAttribute object.Attribute
|
var expirationAttribute object.Attribute
|
||||||
expirationAttribute.SetKey(objectV2.SysAttributeExpEpoch)
|
expirationAttribute.SetKey(objectV2.SysAttributeExpEpoch)
|
||||||
|
@ -141,15 +144,23 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
|
|
||||||
obj.SetAttributes(expirationAttribute)
|
obj.SetAttributes(expirationAttribute)
|
||||||
|
|
||||||
require.Error(t, v.ValidateContent(obj)) // different expiration values
|
_, err = v.ValidateContent(obj)
|
||||||
|
require.Error(t, err) // different expiration values
|
||||||
|
|
||||||
|
id := oidtest.ID()
|
||||||
|
|
||||||
content.SetExpirationEpoch(10)
|
content.SetExpirationEpoch(10)
|
||||||
|
content.SetMembers([]oid.ID{id})
|
||||||
data, err = content.Marshal()
|
data, err = content.Marshal()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
obj.SetPayload(data)
|
obj.SetPayload(data)
|
||||||
|
|
||||||
require.NoError(t, v.ValidateContent(obj)) // all good
|
contentGot, err := v.ValidateContent(obj)
|
||||||
|
require.NoError(t, err) // all good
|
||||||
|
|
||||||
|
require.EqualValues(t, []oid.ID{id}, contentGot.Objects())
|
||||||
|
require.Equal(t, object.TypeTombstone, contentGot.Type())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("storage group content", func(t *testing.T) {
|
t.Run("storage group content", func(t *testing.T) {
|
||||||
|
@ -157,7 +168,8 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
obj.SetType(object.TypeStorageGroup)
|
obj.SetType(object.TypeStorageGroup)
|
||||||
|
|
||||||
t.Run("empty payload", func(t *testing.T) {
|
t.Run("empty payload", func(t *testing.T) {
|
||||||
require.Error(t, v.ValidateContent(obj))
|
_, err := v.ValidateContent(obj)
|
||||||
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
var content storagegroup.StorageGroup
|
var content storagegroup.StorageGroup
|
||||||
|
@ -168,7 +180,9 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
obj.SetPayload(data)
|
obj.SetPayload(data)
|
||||||
require.ErrorIs(t, v.ValidateContent(obj), errEmptySGMembers)
|
|
||||||
|
_, err = v.ValidateContent(obj)
|
||||||
|
require.ErrorIs(t, err, errEmptySGMembers)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("non-unique members", func(t *testing.T) {
|
t.Run("non-unique members", func(t *testing.T) {
|
||||||
|
@ -180,17 +194,25 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
obj.SetPayload(data)
|
obj.SetPayload(data)
|
||||||
require.Error(t, v.ValidateContent(obj))
|
|
||||||
|
_, err = v.ValidateContent(obj)
|
||||||
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("correct SG", func(t *testing.T) {
|
t.Run("correct SG", func(t *testing.T) {
|
||||||
content.SetMembers([]oid.ID{oidtest.ID(), oidtest.ID()})
|
ids := []oid.ID{oidtest.ID(), oidtest.ID()}
|
||||||
|
content.SetMembers(ids)
|
||||||
|
|
||||||
data, err := content.Marshal()
|
data, err := content.Marshal()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
obj.SetPayload(data)
|
obj.SetPayload(data)
|
||||||
require.NoError(t, v.ValidateContent(obj))
|
|
||||||
|
content, err := v.ValidateContent(obj)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.EqualValues(t, ids, content.Objects())
|
||||||
|
require.Equal(t, object.TypeStorageGroup, content.Type())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type preparedObjectTarget interface {
|
type preparedObjectTarget interface {
|
||||||
WriteHeader(*objectSDK.Object) error
|
WriteObject(*objectSDK.Object, object.ContentMeta) error
|
||||||
Close() (*transformer.AccessIdentifiers, error)
|
Close() (*transformer.AccessIdentifiers, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ type distributedTarget struct {
|
||||||
remotePool, localPool util.WorkerPool
|
remotePool, localPool util.WorkerPool
|
||||||
|
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
|
objMeta object.ContentMeta
|
||||||
|
|
||||||
payload []byte
|
payload []byte
|
||||||
|
|
||||||
|
@ -120,7 +121,9 @@ func (t *distributedTarget) Write(p []byte) (n int, err error) {
|
||||||
func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
t.obj.SetPayload(t.payload)
|
t.obj.SetPayload(t.payload)
|
||||||
|
|
||||||
if err := t.fmt.ValidateContent(t.obj); err != nil {
|
var err error
|
||||||
|
|
||||||
|
if t.objMeta, err = t.fmt.ValidateContent(t.obj); err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
|
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +137,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
|
||||||
|
|
||||||
target := t.nodeTargetInitializer(node)
|
target := t.nodeTargetInitializer(node)
|
||||||
|
|
||||||
if err := target.WriteHeader(t.obj); err != nil {
|
if err := target.WriteObject(t.obj, t.objMeta); err != nil {
|
||||||
return fmt.Errorf("could not write header: %w", err)
|
return fmt.Errorf("could not write header: %w", err)
|
||||||
} else if _, err := target.Close(); err != nil {
|
} else if _, err := target.Close(); err != nil {
|
||||||
return fmt.Errorf("could not close object stream: %w", err)
|
return fmt.Errorf("could not close object stream: %w", err)
|
||||||
|
|
|
@ -3,31 +3,55 @@ package putsvc
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ObjectStorage is an object storage interface.
|
// ObjectStorage is an object storage interface.
|
||||||
type ObjectStorage interface {
|
type ObjectStorage interface {
|
||||||
// Put must save passed object
|
// Put must save passed object
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Put(o *objectSDK.Object) error
|
Put(*object.Object) error
|
||||||
|
// Delete must delete passed objects
|
||||||
|
// and return any appeared error.
|
||||||
|
Delete(tombstone oid.Address, toDelete []oid.ID) error
|
||||||
|
// Lock must lock passed objects
|
||||||
|
// and return any appeared error.
|
||||||
|
Lock(locker oid.Address, toLock []oid.ID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type localTarget struct {
|
type localTarget struct {
|
||||||
storage ObjectStorage
|
storage ObjectStorage
|
||||||
|
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
|
meta objectCore.ContentMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *localTarget) WriteHeader(obj *object.Object) error {
|
func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
|
||||||
t.obj = obj
|
t.obj = obj
|
||||||
|
t.meta = meta
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
|
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
|
switch t.meta.Type() {
|
||||||
|
case object.TypeTombstone:
|
||||||
|
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||||
|
}
|
||||||
|
case object.TypeLock:
|
||||||
|
err := t.storage.Lock(objectCore.AddressOf(t.obj), t.meta.Objects())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// objects that do not change meta storage
|
||||||
|
}
|
||||||
|
|
||||||
if err := t.storage.Put(t.obj); err != nil {
|
if err := t.storage.Put(t.obj); err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
return nil, fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||||
|
@ -42,7 +43,7 @@ type RemotePutPrm struct {
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *remoteTarget) WriteHeader(obj *object.Object) error {
|
func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
|
||||||
t.obj = obj
|
t.obj = obj
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -126,7 +127,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
||||||
return fmt.Errorf("parse client node info: %w", err)
|
return fmt.Errorf("parse client node info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.WriteHeader(p.obj); err != nil {
|
if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
|
||||||
return fmt.Errorf("(%T) could not send object header: %w", s, err)
|
return fmt.Errorf("(%T) could not send object header: %w", s, err)
|
||||||
} else if _, err := t.Close(); err != nil {
|
} else if _, err := t.Close(); err != nil {
|
||||||
return fmt.Errorf("(%T) could not send object: %w", s, err)
|
return fmt.Errorf("(%T) could not send object: %w", s, err)
|
||||||
|
|
|
@ -128,12 +128,6 @@ func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithFormatValidatorOpts(v ...object.FormatValidatorOption) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.fmtValidatorOpts = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithNetworkState(v netmap.State) Option {
|
func WithNetworkState(v netmap.State) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.networkState = v
|
c.networkState = v
|
||||||
|
|
Loading…
Reference in a new issue