forked from TrueCloudLab/frostfs-node
[#222] Support Inhume and Delete in object service
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
21708d5408
commit
351e4b4592
5 changed files with 46 additions and 20 deletions
|
@ -177,17 +177,27 @@ type localObjectRemover struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *localObjectRemover) Delete(addr *objectSDK.Address) error {
|
type localObjectInhumer struct {
|
||||||
|
storage *engine.StorageEngine
|
||||||
|
|
||||||
|
log *logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *localObjectRemover) Delete(addr ...*objectSDK.Address) error {
|
||||||
_, err := r.storage.Delete(new(engine.DeletePrm).
|
_, err := r.storage.Delete(new(engine.DeletePrm).
|
||||||
WithAddress(addr),
|
WithAddress(addr...),
|
||||||
)
|
)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *localObjectRemover) DeleteObjects(list ...*objectSDK.Address) {
|
func (r *localObjectInhumer) DeleteObjects(ts *objectSDK.Address, addr ...*objectSDK.Address) {
|
||||||
for _, a := range list {
|
prm := new(engine.InhumePrm)
|
||||||
if err := r.Delete(a); err != nil {
|
|
||||||
|
for _, a := range addr {
|
||||||
|
prm.WithTarget(a, ts)
|
||||||
|
|
||||||
|
if _, err := r.storage.Inhume(prm); err != nil {
|
||||||
r.log.Error("could not delete object",
|
r.log.Error("could not delete object",
|
||||||
zap.Stringer("address", a),
|
zap.Stringer("address", a),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -213,6 +223,11 @@ func initObjectService(c *cfg) {
|
||||||
log: c.log,
|
log: c.log,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
objInhumer := &localObjectInhumer{
|
||||||
|
storage: ls,
|
||||||
|
log: c.log,
|
||||||
|
}
|
||||||
|
|
||||||
objGC := gc.New(
|
objGC := gc.New(
|
||||||
gc.WithLogger(c.log),
|
gc.WithLogger(c.log),
|
||||||
gc.WithRemover(objRemover),
|
gc.WithRemover(objRemover),
|
||||||
|
@ -287,7 +302,7 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
|
putsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
|
||||||
putsvc.WithLocalAddressSource(c),
|
putsvc.WithLocalAddressSource(c),
|
||||||
putsvc.WithFormatValidatorOpts(
|
putsvc.WithFormatValidatorOpts(
|
||||||
objectCore.WithDeleteHandler(objRemover),
|
objectCore.WithDeleteHandler(objInhumer),
|
||||||
),
|
),
|
||||||
putsvc.WithNetworkState(c.cfgNetmap.state),
|
putsvc.WithNetworkState(c.cfgNetmap.state),
|
||||||
putsvc.WithWorkerPool(c.cfgObject.pool.put),
|
putsvc.WithWorkerPool(c.cfgObject.pool.put),
|
||||||
|
|
|
@ -23,7 +23,7 @@ type cfg struct {
|
||||||
|
|
||||||
// DeleteHandler is an interface of delete queue processor.
|
// DeleteHandler is an interface of delete queue processor.
|
||||||
type DeleteHandler interface {
|
type DeleteHandler interface {
|
||||||
DeleteObjects(...*object.Address)
|
DeleteObjects(*object.Address, ...*object.Address)
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNilObject = errors.New("object is nil")
|
var errNilObject = errors.New("object is nil")
|
||||||
|
@ -108,14 +108,14 @@ func (v *FormatValidator) checkOwnerKey(id *owner.ID, key []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValidateContent validates payload content according to object type.
|
// ValidateContent validates payload content according to object type.
|
||||||
func (v *FormatValidator) ValidateContent(t object.Type, payload []byte) error {
|
func (v *FormatValidator) ValidateContent(o *object.Object) error {
|
||||||
switch t {
|
switch o.Type() {
|
||||||
case object.TypeTombstone:
|
case object.TypeTombstone:
|
||||||
if len(payload) == 0 {
|
if len(o.Payload()) == 0 {
|
||||||
return errors.Errorf("(%T) empty payload in tombstone", v)
|
return errors.Errorf("(%T) empty payload in tombstone", v)
|
||||||
}
|
}
|
||||||
|
|
||||||
content, err := TombstoneContentFromBytes(payload)
|
content, err := TombstoneContentFromBytes(o.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "(%T) could not parse tombstone content", err)
|
return errors.Wrapf(err, "(%T) could not parse tombstone content", err)
|
||||||
}
|
}
|
||||||
|
@ -128,8 +128,12 @@ func (v *FormatValidator) ValidateContent(t object.Type, payload []byte) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsAddr := new(object.Address)
|
||||||
|
tsAddr.SetContainerID(o.ContainerID())
|
||||||
|
tsAddr.SetObjectID(o.ID())
|
||||||
|
|
||||||
if v.deleteHandler != nil {
|
if v.deleteHandler != nil {
|
||||||
v.deleteHandler.DeleteObjects(addrList...)
|
v.deleteHandler.DeleteObjects(tsAddr, addrList...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,10 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("tombstone content", func(t *testing.T) {
|
t.Run("tombstone content", func(t *testing.T) {
|
||||||
require.Error(t, v.ValidateContent(object.TypeTombstone, nil))
|
obj := NewRaw()
|
||||||
|
obj.SetType(object.TypeTombstone)
|
||||||
|
|
||||||
|
require.Error(t, v.ValidateContent(obj.Object().SDK()))
|
||||||
|
|
||||||
addr := object.NewAddress()
|
addr := object.NewAddress()
|
||||||
|
|
||||||
|
@ -115,7 +118,9 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
data, err := content.MarshalBinary()
|
data, err := content.MarshalBinary()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Error(t, v.ValidateContent(object.TypeTombstone, data))
|
obj.SetPayload(data)
|
||||||
|
|
||||||
|
require.Error(t, v.ValidateContent(obj.Object().SDK()))
|
||||||
|
|
||||||
addr.SetContainerID(testContainerID(t))
|
addr.SetContainerID(testContainerID(t))
|
||||||
addr.SetObjectID(testObjectID(t))
|
addr.SetObjectID(testObjectID(t))
|
||||||
|
@ -123,6 +128,8 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
data, err = content.MarshalBinary()
|
data, err = content.MarshalBinary()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, v.ValidateContent(object.TypeTombstone, data))
|
obj.SetPayload(data)
|
||||||
|
|
||||||
|
require.NoError(t, v.ValidateContent(obj.Object().SDK()))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,12 +63,12 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
payload = append(payload, t.chunks[i]...)
|
payload = append(payload, t.chunks[i]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.fmt.ValidateContent(t.obj.Type(), payload); err != nil {
|
t.obj.SetPayload(payload)
|
||||||
|
|
||||||
|
if err := t.fmt.ValidateContent(t.obj.Object().SDK()); err != nil {
|
||||||
return nil, errors.Wrapf(err, "(%T) could not validate payload content", t)
|
return nil, errors.Wrapf(err, "(%T) could not validate payload content", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.obj.SetPayload(payload)
|
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
addrs := traverser.Next()
|
addrs := traverser.Next()
|
||||||
|
|
|
@ -35,8 +35,8 @@ type cfg struct {
|
||||||
|
|
||||||
// Remover is an interface of the component that stores objects.
|
// Remover is an interface of the component that stores objects.
|
||||||
type Remover interface {
|
type Remover interface {
|
||||||
// Delete removes (or marks to remove) object from physical storage.
|
// Delete removes object from physical storage.
|
||||||
Delete(*object.Address) error
|
Delete(...*object.Address) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
|
Loading…
Reference in a new issue