forked from TrueCloudLab/frostfs-node
[#70] core/object: Process a delete group at tombstone
Send object group to delete queue processor after tombstone content validation. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
276ed6c04b
commit
798fca9354
3 changed files with 74 additions and 6 deletions
|
@ -6,9 +6,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
|
||||||
|
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore"
|
||||||
objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
|
objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
|
||||||
|
@ -29,7 +31,9 @@ import (
|
||||||
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
|
||||||
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type objectSvc struct {
|
type objectSvc struct {
|
||||||
|
@ -157,6 +161,19 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
|
||||||
return s.rngHash.GetRangeHash(ctx, req)
|
return s.rngHash.GetRangeHash(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type deleteHandler struct {
|
||||||
|
log *logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *deleteHandler) DeleteObjects(list ...*objectSDK.Address) {
|
||||||
|
for i := range list {
|
||||||
|
s.log.Info("object is marked for removal",
|
||||||
|
zap.String("CID", base58.Encode(list[i].GetContainerID().ToV2().GetValue())),
|
||||||
|
zap.String("ID", base58.Encode(list[i].GetObjectID().ToV2().GetValue())),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func initObjectService(c *cfg) {
|
func initObjectService(c *cfg) {
|
||||||
ls := localstore.New(
|
ls := localstore.New(
|
||||||
c.cfgObject.blobstorage,
|
c.cfgObject.blobstorage,
|
||||||
|
@ -177,6 +194,9 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
||||||
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
|
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
|
||||||
putsvc.WithLocalAddressSource(c),
|
putsvc.WithLocalAddressSource(c),
|
||||||
|
putsvc.WithFormatValidatorOpts(
|
||||||
|
objectCore.WithDeleteHandler(&deleteHandler{c.log}),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
sPutV2 := putsvcV2.NewService(
|
sPutV2 := putsvcV2.NewService(
|
||||||
|
|
|
@ -10,7 +10,21 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// FormatValidator represents object format validator.
|
// FormatValidator represents object format validator.
|
||||||
type FormatValidator struct{}
|
type FormatValidator struct {
|
||||||
|
*cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// FormatValidatorOption represents FormatValidator constructor option.
|
||||||
|
type FormatValidatorOption func(*cfg)
|
||||||
|
|
||||||
|
type cfg struct {
|
||||||
|
deleteHandler DeleteHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteHandler is an interface of delete queue processor.
|
||||||
|
type DeleteHandler interface {
|
||||||
|
DeleteObjects(...*object.Address)
|
||||||
|
}
|
||||||
|
|
||||||
var errNilObject = errors.New("object is nil")
|
var errNilObject = errors.New("object is nil")
|
||||||
|
|
||||||
|
@ -18,9 +32,21 @@ var errNilID = errors.New("missing identifier")
|
||||||
|
|
||||||
var errNilCID = errors.New("missing container identifier")
|
var errNilCID = errors.New("missing container identifier")
|
||||||
|
|
||||||
|
func defaultCfg() *cfg {
|
||||||
|
return new(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
// NewFormatValidator creates, initializes and returns FormatValidator instance.
|
// NewFormatValidator creates, initializes and returns FormatValidator instance.
|
||||||
func NewFormatValidator() *FormatValidator {
|
func NewFormatValidator(opts ...FormatValidatorOption) *FormatValidator {
|
||||||
return new(FormatValidator)
|
cfg := defaultCfg()
|
||||||
|
|
||||||
|
for i := range opts {
|
||||||
|
opts[i](cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &FormatValidator{
|
||||||
|
cfg: cfg,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate validates object format.
|
// Validate validates object format.
|
||||||
|
@ -94,12 +120,25 @@ func (v *FormatValidator) ValidateContent(t object.Type, payload []byte) error {
|
||||||
return errors.Wrapf(err, "(%T) could not parse tombstone content", err)
|
return errors.Wrapf(err, "(%T) could not parse tombstone content", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, addr := range content.GetAddressList() {
|
addrList := content.GetAddressList()
|
||||||
|
|
||||||
|
for _, addr := range addrList {
|
||||||
if addr.GetContainerID() == nil || addr.GetObjectID() == nil {
|
if addr.GetContainerID() == nil || addr.GetObjectID() == nil {
|
||||||
return errors.Errorf("(%T) empty address reference in tombstone", v)
|
return errors.Errorf("(%T) empty address reference in tombstone", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if v.deleteHandler != nil {
|
||||||
|
v.deleteHandler.DeleteObjects(addrList...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithDeleteHandler returns option to set delete queue processor.
|
||||||
|
func WithDeleteHandler(v DeleteHandler) FormatValidatorOption {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.deleteHandler = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -38,12 +38,13 @@ type cfg struct {
|
||||||
localAddrSrc network.LocalAddressSource
|
localAddrSrc network.LocalAddressSource
|
||||||
|
|
||||||
fmtValidator *object.FormatValidator
|
fmtValidator *object.FormatValidator
|
||||||
|
|
||||||
|
fmtValidatorOpts []object.FormatValidatorOption
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
return &cfg{
|
return &cfg{
|
||||||
workerPool: new(util.SyncWorkerPool),
|
workerPool: new(util.SyncWorkerPool),
|
||||||
fmtValidator: object.NewFormatValidator(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +55,8 @@ func NewService(opts ...Option) *Service {
|
||||||
opts[i](c)
|
opts[i](c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
|
||||||
|
|
||||||
return &Service{
|
return &Service{
|
||||||
cfg: c,
|
cfg: c,
|
||||||
}
|
}
|
||||||
|
@ -107,3 +110,9 @@ func WithLocalAddressSource(v network.LocalAddressSource) Option {
|
||||||
c.localAddrSrc = v
|
c.localAddrSrc = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithFormatValidatorOpts(v ...object.FormatValidatorOption) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.fmtValidatorOpts = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue