From 31371771e8ab29be63cd2b07c652d0d532b2a997 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 1 Dec 2020 10:55:08 +0300 Subject: [PATCH] [#222] Update Delete method in shard Signed-off-by: Alex Vanin --- pkg/local_object_storage/shard/delete.go | 85 ++++++++++++++++++------ 1 file changed, 66 insertions(+), 19 deletions(-) diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 25666397b..5a685767a 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -2,48 +2,95 @@ package shard import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" - "github.com/pkg/errors" "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. type DeletePrm struct { - addr *objectSDK.Address + addr []*objectSDK.Address } // DeleteRes groups resulting values of Delete operation. type DeleteRes struct{} -// WithAddress is a Delete option to set the address of the object to delete. +// WithAddress is a Delete option to set the addresses of the objects to delete. // // Option is required. -func (p *DeletePrm) WithAddress(addr *objectSDK.Address) *DeletePrm { +func (p *DeletePrm) WithAddress(addr ...*objectSDK.Address) *DeletePrm { if p != nil { - p.addr = addr + p.addr = append(p.addr, addr...) } return p } -// Delete marks object to delete from shard. -// -// Returns any error encountered that did not allow to completely -// mark the object to delete. +// Delete removes data from the shard's writeCache, metaBase and +// blobStor. func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) { - // mark object to delete in metabase - if err := s.metaBase.Delete(prm.addr); err != nil { - s.log.Warn("could not mark object to delete in metabase", - zap.String("error", err.Error()), - ) + ln := len(prm.addr) + delSmallPrm := new(blobstor.DeleteSmallPrm) + delBigPrm := new(blobstor.DeleteBigPrm) + + smalls := make(map[*objectSDK.Address]*blobovnicza.ID, ln) + + for i := range prm.addr { + delSmallPrm.SetAddress(prm.addr[i]) + delBigPrm.SetAddress(prm.addr[i]) + + if s.hasWriteCache() { + _, err := s.writeCache.DeleteSmall(delSmallPrm) + if err != nil { + _, _ = s.writeCache.DeleteBig(delBigPrm) + } + } + + blobovniczaID, err := s.metaBase.IsSmall(prm.addr[i]) + if err != nil { + s.log.Debug("can't get blobovniczaID from metabase", + zap.Stringer("object", prm.addr[i]), + zap.String("error", err.Error())) + + continue + } + + if blobovniczaID != nil { + smalls[prm.addr[i]] = blobovniczaID + } } - // form DeleteBig parameters - delBigPrm := new(blobstor.DeleteBigPrm) - delBigPrm.SetAddress(prm.addr) + err := s.metaBase.Delete(prm.addr...) + if err != nil { + return nil, err // stop on metabase error ? + } + + for i := range prm.addr { // delete small object + if id, ok := smalls[prm.addr[i]]; ok { + delSmallPrm.SetAddress(prm.addr[i]) + delSmallPrm.SetBlobovniczaID(id) + + _, err = s.blobStor.DeleteSmall(delSmallPrm) + if err != nil { + s.log.Debug("can't remove small object from blobStor", + zap.Stringer("object_address", prm.addr[i]), + zap.String("error", err.Error())) + } + + continue + } + + // delete big object + + delBigPrm.SetAddress(prm.addr[i]) + + _, err = s.blobStor.DeleteBig(delBigPrm) + if err != nil { + s.log.Debug("can't remove big object from blobStor", + zap.Stringer("object_address", prm.addr[i]), + zap.String("error", err.Error())) + } - if _, err := s.blobStor.DeleteBig(delBigPrm); err != nil { - return nil, errors.Wrap(err, "could not remove object from BLOB storage") } return nil, nil