From 351e4b4592d4c9e5691c10dff4dc47d3f33e4920 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 1 Dec 2020 14:23:28 +0300 Subject: [PATCH] [#222] Support Inhume and Delete in object service Signed-off-by: Alex Vanin --- cmd/neofs-node/object.go | 27 ++++++++++++++++++++------ pkg/core/object/fmt.go | 16 +++++++++------ pkg/core/object/fmt_test.go | 13 ++++++++++--- pkg/services/object/put/distributed.go | 6 +++--- pkg/services/object_manager/gc/gc.go | 4 ++-- 5 files changed, 46 insertions(+), 20 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 8afd19c90..72541c962 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -177,17 +177,27 @@ type localObjectRemover struct { log *logger.Logger } -func (r *localObjectRemover) Delete(addr *objectSDK.Address) error { +type localObjectInhumer struct { + storage *engine.StorageEngine + + log *logger.Logger +} + +func (r *localObjectRemover) Delete(addr ...*objectSDK.Address) error { _, err := r.storage.Delete(new(engine.DeletePrm). - WithAddress(addr), + WithAddress(addr...), ) return err } -func (r *localObjectRemover) DeleteObjects(list ...*objectSDK.Address) { - for _, a := range list { - if err := r.Delete(a); err != nil { +func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) { + prm := new(engine.InhumePrm) + + for _, a := range addr { + prm.WithTarget(a, ts) + + if _, err := r.storage.Inhume(prm); err != nil { r.log.Error("could not delete object", zap.Stringer("address", a), zap.String("error", err.Error()), @@ -213,6 +223,11 @@ func initObjectService(c *cfg) { log: c.log, } + objInhumer := &localObjectInhumer{ + storage: ls, + log: c.log, + } + objGC := gc.New( gc.WithLogger(c.log), gc.WithRemover(objRemover), @@ -287,7 +302,7 @@ func initObjectService(c *cfg) { putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), putsvc.WithLocalAddressSource(c), putsvc.WithFormatValidatorOpts( - objectCore.WithDeleteHandler(objRemover), + objectCore.WithDeleteHandler(objInhumer), ), putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPool(c.cfgObject.pool.put), diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index 2de56c0cf..042fbef30 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -23,7 +23,7 @@ type cfg struct { // DeleteHandler is an interface of delete queue processor. type DeleteHandler interface { - DeleteObjects(...*object.Address) + DeleteObjects(*object.Address, ...*object.Address) } var errNilObject = errors.New("object is nil") @@ -108,14 +108,14 @@ func (v *FormatValidator) checkOwnerKey(id *owner.ID, key []byte) error { } // ValidateContent validates payload content according to object type. -func (v *FormatValidator) ValidateContent(t object.Type, payload []byte) error { - switch t { +func (v *FormatValidator) ValidateContent(o *object.Object) error { + switch o.Type() { case object.TypeTombstone: - if len(payload) == 0 { + if len(o.Payload()) == 0 { return errors.Errorf("(%T) empty payload in tombstone", v) } - content, err := TombstoneContentFromBytes(payload) + content, err := TombstoneContentFromBytes(o.Payload()) if err != nil { return errors.Wrapf(err, "(%T) could not parse tombstone content", err) } @@ -128,8 +128,12 @@ func (v *FormatValidator) ValidateContent(t object.Type, payload []byte) error { } } + tsAddr := new(object.Address) + tsAddr.SetContainerID(o.ContainerID()) + tsAddr.SetObjectID(o.ID()) + if v.deleteHandler != nil { - v.deleteHandler.DeleteObjects(addrList...) + v.deleteHandler.DeleteObjects(tsAddr, addrList...) } } diff --git a/pkg/core/object/fmt_test.go b/pkg/core/object/fmt_test.go index 18200a151..63243bdf2 100644 --- a/pkg/core/object/fmt_test.go +++ b/pkg/core/object/fmt_test.go @@ -105,7 +105,10 @@ func TestFormatValidator_Validate(t *testing.T) { }) t.Run("tombstone content", func(t *testing.T) { - require.Error(t, v.ValidateContent(object.TypeTombstone, nil)) + obj := NewRaw() + obj.SetType(object.TypeTombstone) + + require.Error(t, v.ValidateContent(obj.Object().SDK())) addr := object.NewAddress() @@ -115,7 +118,9 @@ func TestFormatValidator_Validate(t *testing.T) { data, err := content.MarshalBinary() require.NoError(t, err) - require.Error(t, v.ValidateContent(object.TypeTombstone, data)) + obj.SetPayload(data) + + require.Error(t, v.ValidateContent(obj.Object().SDK())) addr.SetContainerID(testContainerID(t)) addr.SetObjectID(testObjectID(t)) @@ -123,6 +128,8 @@ func TestFormatValidator_Validate(t *testing.T) { data, err = content.MarshalBinary() require.NoError(t, err) - require.NoError(t, v.ValidateContent(object.TypeTombstone, data)) + obj.SetPayload(data) + + require.NoError(t, v.ValidateContent(obj.Object().SDK())) }) } diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index f20885e1d..823f03180 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -63,12 +63,12 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { payload = append(payload, t.chunks[i]...) } - if err := t.fmt.ValidateContent(t.obj.Type(), payload); err != nil { + t.obj.SetPayload(payload) + + if err := t.fmt.ValidateContent(t.obj.Object().SDK()); err != nil { return nil, errors.Wrapf(err, "(%T) could not validate payload content", t) } - t.obj.SetPayload(payload) - loop: for { addrs := traverser.Next() diff --git a/pkg/services/object_manager/gc/gc.go b/pkg/services/object_manager/gc/gc.go index 1f75870c3..72515aa9d 100644 --- a/pkg/services/object_manager/gc/gc.go +++ b/pkg/services/object_manager/gc/gc.go @@ -35,8 +35,8 @@ type cfg struct { // Remover is an interface of the component that stores objects. type Remover interface { - // Delete removes (or marks to remove) object from physical storage. - Delete(*object.Address) error + // Delete removes object from physical storage. + Delete(...*object.Address) error } func defaultCfg() *cfg {