From 798fca9354fec51b5fed4824bd1b39f2905c6250 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 3 Oct 2020 13:14:09 +0300 Subject: [PATCH] [#70] core/object: Process a delete group at tombstone Send object group to delete queue processor after tombstone content validation. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 20 +++++++++++++ pkg/core/object/fmt.go | 47 +++++++++++++++++++++++++++--- pkg/services/object/put/service.go | 13 +++++++-- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 344b2027f4..c5d06fa700 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -6,9 +6,11 @@ import ( "sync" "github.com/mr-tron/base58" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc" @@ -29,7 +31,9 @@ import ( searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/panjf2000/ants/v2" + "go.uber.org/zap" ) type objectSvc struct { @@ -157,6 +161,19 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe return s.rngHash.GetRangeHash(ctx, req) } +type deleteHandler struct { + log *logger.Logger +} + +func (s *deleteHandler) DeleteObjects(list ...*objectSDK.Address) { + for i := range list { + s.log.Info("object is marked for removal", + zap.String("CID", base58.Encode(list[i].GetContainerID().ToV2().GetValue())), + zap.String("ID", base58.Encode(list[i].GetObjectID().ToV2().GetValue())), + ) + } +} + func initObjectService(c *cfg) { ls := localstore.New( c.cfgObject.blobstorage, @@ -177,6 +194,9 @@ func initObjectService(c *cfg) { putsvc.WithContainerSource(c.cfgObject.cnrStorage), putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage), putsvc.WithLocalAddressSource(c), + putsvc.WithFormatValidatorOpts( + objectCore.WithDeleteHandler(&deleteHandler{c.log}), + ), ) sPutV2 := putsvcV2.NewService( diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index eb2431b077..c1bf35e4f8 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -10,7 +10,21 @@ import ( ) // FormatValidator represents object format validator. -type FormatValidator struct{} +type FormatValidator struct { + *cfg +} + +// FormatValidatorOption represents FormatValidator constructor option. +type FormatValidatorOption func(*cfg) + +type cfg struct { + deleteHandler DeleteHandler +} + +// DeleteHandler is an interface of delete queue processor. +type DeleteHandler interface { + DeleteObjects(...*object.Address) +} var errNilObject = errors.New("object is nil") @@ -18,9 +32,21 @@ var errNilID = errors.New("missing identifier") var errNilCID = errors.New("missing container identifier") +func defaultCfg() *cfg { + return new(cfg) +} + // NewFormatValidator creates, initializes and returns FormatValidator instance. -func NewFormatValidator() *FormatValidator { - return new(FormatValidator) +func NewFormatValidator(opts ...FormatValidatorOption) *FormatValidator { + cfg := defaultCfg() + + for i := range opts { + opts[i](cfg) + } + + return &FormatValidator{ + cfg: cfg, + } } // Validate validates object format. @@ -94,12 +120,25 @@ func (v *FormatValidator) ValidateContent(t object.Type, payload []byte) error { return errors.Wrapf(err, "(%T) could not parse tombstone content", err) } - for _, addr := range content.GetAddressList() { + addrList := content.GetAddressList() + + for _, addr := range addrList { if addr.GetContainerID() == nil || addr.GetObjectID() == nil { return errors.Errorf("(%T) empty address reference in tombstone", v) } } + + if v.deleteHandler != nil { + v.deleteHandler.DeleteObjects(addrList...) + } } return nil } + +// WithDeleteHandler returns option to set delete queue processor. +func WithDeleteHandler(v DeleteHandler) FormatValidatorOption { + return func(c *cfg) { + c.deleteHandler = v + } +} diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index c156e75e01..74b89b7947 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -38,12 +38,13 @@ type cfg struct { localAddrSrc network.LocalAddressSource fmtValidator *object.FormatValidator + + fmtValidatorOpts []object.FormatValidatorOption } func defaultCfg() *cfg { return &cfg{ - workerPool: new(util.SyncWorkerPool), - fmtValidator: object.NewFormatValidator(), + workerPool: new(util.SyncWorkerPool), } } @@ -54,6 +55,8 @@ func NewService(opts ...Option) *Service { opts[i](c) } + c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...) + return &Service{ cfg: c, } @@ -107,3 +110,9 @@ func WithLocalAddressSource(v network.LocalAddressSource) Option { c.localAddrSrc = v } } + +func WithFormatValidatorOpts(v ...object.FormatValidatorOption) Option { + return func(c *cfg) { + c.fmtValidatorOpts = v + } +}