[#1559] shard: Do not consult metabase in a degraded mode
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
339864b720
commit
c8911d72d0
17 changed files with 114 additions and 67 deletions
|
@ -193,7 +193,7 @@ func TestBlobstorFailback(t *testing.T) {
|
||||||
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
||||||
}
|
}
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 4, mode.Degraded)
|
checkShardState(t, e, id[0], 2, mode.Degraded)
|
||||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,11 +86,17 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
|
||||||
metaError error
|
metaError error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var hasDegraded bool
|
||||||
|
|
||||||
var shPrm shard.RngPrm
|
var shPrm shard.RngPrm
|
||||||
shPrm.SetAddress(prm.addr)
|
shPrm.SetAddress(prm.addr)
|
||||||
shPrm.SetRange(prm.off, prm.ln)
|
shPrm.SetRange(prm.off, prm.ln)
|
||||||
|
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
|
noMeta := sh.GetMode().NoMetabase()
|
||||||
|
hasDegraded = hasDegraded || noMeta
|
||||||
|
shPrm.SetIgnoreMeta(noMeta)
|
||||||
|
|
||||||
res, err := sh.GetRange(shPrm)
|
res, err := sh.GetRange(shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if res.HasMeta() {
|
if res.HasMeta() {
|
||||||
|
@ -140,7 +146,9 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
if shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) {
|
// If any shard is in a degraded mode, we should assume that metabase could store
|
||||||
|
// info about some object.
|
||||||
|
if shardWithMeta.Shard == nil && !hasDegraded || !shard.IsErrNotFound(outError) {
|
||||||
return RngRes{}, outError
|
return RngRes{}, outError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +158,11 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
|
||||||
shPrm.SetIgnoreMeta(true)
|
shPrm.SetIgnoreMeta(true)
|
||||||
|
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
|
if sh.GetMode().NoMetabase() {
|
||||||
|
// Already processed it without a metabase.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
res, err := sh.GetRange(shPrm)
|
res, err := sh.GetRange(shPrm)
|
||||||
if shard.IsErrOutOfRange(err) {
|
if shard.IsErrOutOfRange(err) {
|
||||||
var errOutOfRange apistatus.ObjectOutOfRange
|
var errOutOfRange apistatus.ObjectOutOfRange
|
||||||
|
@ -163,10 +176,11 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) {
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
return RngRes{}, outError
|
return RngRes{}, outError
|
||||||
}
|
}
|
||||||
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
|
if shardWithMeta.Shard != nil {
|
||||||
metaError,
|
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
|
||||||
zap.Stringer("address", prm.addr),
|
metaError,
|
||||||
)
|
zap.Stringer("address", prm.addr))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return RngRes{
|
return RngRes{
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -28,8 +27,11 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
|
||||||
// Delete removes data from the shard's writeCache, metaBase and
|
// Delete removes data from the shard's writeCache, metaBase and
|
||||||
// blobStor.
|
// blobStor.
|
||||||
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
if s.GetMode() != mode.ReadWrite {
|
m := s.GetMode()
|
||||||
|
if m.ReadOnly() {
|
||||||
return DeleteRes{}, ErrReadOnlyMode
|
return DeleteRes{}, ErrReadOnlyMode
|
||||||
|
} else if m.NoMetabase() {
|
||||||
|
return DeleteRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
ln := len(prm.addr)
|
ln := len(prm.addr)
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -56,7 +55,7 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
if s.info.Mode != mode.ReadOnly {
|
if !s.info.Mode.ReadOnly() {
|
||||||
return DumpRes{}, ErrMustBeReadOnly
|
return DumpRes{}, ErrMustBeReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,7 @@ package shard
|
||||||
import (
|
import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExistsPrm groups the parameters of Exists operation.
|
// ExistsPrm groups the parameters of Exists operation.
|
||||||
|
@ -35,27 +33,23 @@ func (p ExistsRes) Exists() bool {
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
||||||
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
||||||
var existsPrm meta.ExistsPrm
|
var exists bool
|
||||||
existsPrm.SetAddress(prm.addr)
|
var err error
|
||||||
|
|
||||||
res, err := s.metaBase.Exists(existsPrm)
|
if s.GetMode().NoMetabase() {
|
||||||
exists := res.Exists()
|
var p blobstor.ExistsPrm
|
||||||
if err != nil {
|
p.SetAddress(prm.addr)
|
||||||
// If the shard is in degraded mode, try to consult blobstor directly.
|
|
||||||
// Otherwise, just return an error.
|
|
||||||
if s.GetMode() == mode.Degraded {
|
|
||||||
var p blobstor.ExistsPrm
|
|
||||||
p.SetAddress(prm.addr)
|
|
||||||
|
|
||||||
res, bErr := s.blobStor.Exists(p)
|
var res blobstor.ExistsRes
|
||||||
if bErr == nil {
|
res, err = s.blobStor.Exists(p)
|
||||||
exists = res.Exists()
|
exists = res.Exists()
|
||||||
s.log.Warn("metabase existence check finished with error",
|
} else {
|
||||||
zap.Stringer("address", prm.addr),
|
var existsPrm meta.ExistsPrm
|
||||||
zap.String("error", err.Error()))
|
existsPrm.SetAddress(prm.addr)
|
||||||
err = nil
|
|
||||||
}
|
var res meta.ExistsRes
|
||||||
}
|
res, err = s.metaBase.Exists(existsPrm)
|
||||||
|
exists = res.Exists()
|
||||||
}
|
}
|
||||||
|
|
||||||
return ExistsRes{
|
return ExistsRes{
|
||||||
|
|
|
@ -63,16 +63,27 @@ func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
|
||||||
// otherwise object seems to be flushed to metabase
|
// otherwise object seems to be flushed to metabase
|
||||||
}
|
}
|
||||||
|
|
||||||
var headParams meta.GetPrm
|
var obj *objectSDK.Object
|
||||||
headParams.SetAddress(prm.addr)
|
var err error
|
||||||
headParams.SetRaw(prm.raw)
|
if s.GetMode().NoMetabase() {
|
||||||
|
var getPrm GetPrm
|
||||||
|
getPrm.SetAddress(prm.addr)
|
||||||
|
getPrm.SetIgnoreMeta(true)
|
||||||
|
|
||||||
res, err := s.metaBase.Get(headParams)
|
var res GetRes
|
||||||
if err != nil {
|
res, err = s.Get(getPrm)
|
||||||
return HeadRes{}, err
|
obj = res.Object()
|
||||||
|
} else {
|
||||||
|
var headParams meta.GetPrm
|
||||||
|
headParams.SetAddress(prm.addr)
|
||||||
|
headParams.SetRaw(prm.raw)
|
||||||
|
|
||||||
|
var res meta.GetRes
|
||||||
|
res, err = s.metaBase.Get(headParams)
|
||||||
|
obj = res.Header()
|
||||||
}
|
}
|
||||||
|
|
||||||
return HeadRes{
|
return HeadRes{
|
||||||
obj: res.Header(),
|
obj: obj,
|
||||||
}, nil
|
}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -62,8 +61,11 @@ var ErrLockObjectRemoval = meta.ErrLockObjectRemoval
|
||||||
//
|
//
|
||||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||||
func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||||
if s.GetMode() != mode.ReadWrite {
|
m := s.GetMode()
|
||||||
|
if m.ReadOnly() {
|
||||||
return InhumeRes{}, ErrReadOnlyMode
|
return InhumeRes{}, ErrReadOnlyMode
|
||||||
|
} else if m.NoMetabase() {
|
||||||
|
return InhumeRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
|
|
|
@ -110,6 +110,10 @@ func (s *Shard) ListContainers(_ ListContainersPrm) (ListContainersRes, error) {
|
||||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||||
// parameter set to zero.
|
// parameter set to zero.
|
||||||
func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||||
|
if s.GetMode().NoMetabase() {
|
||||||
|
return ListWithCursorRes{}, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
var metaPrm meta.ListPrm
|
var metaPrm meta.ListPrm
|
||||||
metaPrm.SetCount(prm.count)
|
metaPrm.SetCount(prm.count)
|
||||||
metaPrm.SetCursor(prm.cursor)
|
metaPrm.SetCursor(prm.cursor)
|
||||||
|
|
|
@ -3,7 +3,6 @@ package shard
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -15,8 +14,11 @@ import (
|
||||||
//
|
//
|
||||||
// Locked list should be unique. Panics if it is empty.
|
// Locked list should be unique. Panics if it is empty.
|
||||||
func (s *Shard) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
func (s *Shard) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||||
if s.GetMode() != mode.ReadWrite {
|
m := s.GetMode()
|
||||||
|
if m.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
|
} else if m.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.metaBase.Lock(idCnr, locker, locked)
|
err := s.metaBase.Lock(idCnr, locker, locked)
|
||||||
|
|
|
@ -10,6 +10,9 @@ import (
|
||||||
// that changes shard's memory due to the "read-only" shard's mode.
|
// that changes shard's memory due to the "read-only" shard's mode.
|
||||||
var ErrReadOnlyMode = errors.New("shard is in read-only mode")
|
var ErrReadOnlyMode = errors.New("shard is in read-only mode")
|
||||||
|
|
||||||
|
// ErrDegradedMode is returned when operation requiring metabase is executed in degraded mode.
|
||||||
|
var ErrDegradedMode = errors.New("shard is in degraded mode")
|
||||||
|
|
||||||
// SetMode sets mode of the shard.
|
// SetMode sets mode of the shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow
|
// Returns any error encountered that did not allow
|
||||||
|
|
|
@ -3,14 +3,14 @@ package mode
|
||||||
// Mode represents enumeration of Shard work modes.
|
// Mode represents enumeration of Shard work modes.
|
||||||
type Mode uint32
|
type Mode uint32
|
||||||
|
|
||||||
const (
|
// ReadWrite is a Mode value for shard that is available
|
||||||
// ReadWrite is a Mode value for shard that is available
|
// for read and write operations. Default shard mode.
|
||||||
// for read and write operations. Default shard mode.
|
const ReadWrite Mode = 0
|
||||||
ReadWrite Mode = iota
|
|
||||||
|
|
||||||
|
const (
|
||||||
// ReadOnly is a Mode value for shard that does not
|
// ReadOnly is a Mode value for shard that does not
|
||||||
// accept write operation but is readable.
|
// accept write operation but is readable.
|
||||||
ReadOnly
|
ReadOnly Mode = 1 << iota
|
||||||
|
|
||||||
// Degraded is a Mode value for shard that is set automatically
|
// Degraded is a Mode value for shard that is set automatically
|
||||||
// after a certain number of errors is encountered. It is the same as
|
// after a certain number of errors is encountered. It is the same as
|
||||||
|
@ -31,3 +31,13 @@ func (m Mode) String() string {
|
||||||
return "DEGRADED"
|
return "DEGRADED"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NoMetabase returns true iff m is operating without the metabase.
|
||||||
|
func (m Mode) NoMetabase() bool {
|
||||||
|
return m&Degraded != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadOnly returns true iff m prohibits modifying operations with shard.
|
||||||
|
func (m Mode) ReadOnly() bool {
|
||||||
|
return m&ReadOnly != 0
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -24,8 +23,11 @@ func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
|
||||||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||||
// another shard.
|
// another shard.
|
||||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
||||||
if s.GetMode() != mode.ReadWrite {
|
m := s.GetMode()
|
||||||
|
if m.ReadOnly() {
|
||||||
return ToMoveItRes{}, ErrReadOnlyMode
|
return ToMoveItRes{}, ErrReadOnlyMode
|
||||||
|
} else if m.NoMetabase() {
|
||||||
|
return ToMoveItRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var toMovePrm meta.ToMoveItPrm
|
var toMovePrm meta.ToMoveItPrm
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -30,7 +29,8 @@ func (p *PutPrm) SetObject(obj *object.Object) {
|
||||||
//
|
//
|
||||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||||
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||||
if s.GetMode() != mode.ReadWrite {
|
m := s.GetMode()
|
||||||
|
if m.ReadOnly() {
|
||||||
return PutRes{}, ErrReadOnlyMode
|
return PutRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,14 +58,15 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// put to metabase
|
if !m.NoMetabase() {
|
||||||
var pPrm meta.PutPrm
|
var pPrm meta.PutPrm
|
||||||
pPrm.SetObject(prm.obj)
|
pPrm.SetObject(prm.obj)
|
||||||
pPrm.SetBlobovniczaID(res.BlobovniczaID())
|
pPrm.SetBlobovniczaID(res.BlobovniczaID())
|
||||||
if _, err := s.metaBase.Put(pPrm); err != nil {
|
if _, err := s.metaBase.Put(pPrm); err != nil {
|
||||||
// may we need to handle this case in a special way
|
// may we need to handle this case in a special way
|
||||||
// since the object has been successfully written to BlobStor
|
// since the object has been successfully written to BlobStor
|
||||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return PutRes{}, nil
|
return PutRes{}, nil
|
||||||
|
|
|
@ -102,7 +102,8 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
||||||
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, big, small)
|
||||||
|
|
||||||
return RngRes{
|
return RngRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,7 +61,7 @@ func (s *Shard) Restore(prm RestorePrm) (RestoreRes, error) {
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
if s.info.Mode != mode.ReadWrite {
|
if s.info.Mode.ReadOnly() {
|
||||||
return RestoreRes{}, ErrReadOnlyMode
|
return RestoreRes{}, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,10 @@ func (r SelectRes) AddressList() []oid.Address {
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
// did not allow to completely select the objects.
|
// did not allow to completely select the objects.
|
||||||
func (s *Shard) Select(prm SelectPrm) (SelectRes, error) {
|
func (s *Shard) Select(prm SelectPrm) (SelectRes, error) {
|
||||||
|
if s.GetMode().NoMetabase() {
|
||||||
|
return SelectRes{}, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
var selectPrm meta.SelectPrm
|
var selectPrm meta.SelectPrm
|
||||||
selectPrm.SetFilters(prm.filters)
|
selectPrm.SetFilters(prm.filters)
|
||||||
selectPrm.SetContainerID(prm.cnr)
|
selectPrm.SetContainerID(prm.cnr)
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
||||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,7 +17,7 @@ func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Mo
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return nil, ErrPiloramaDisabled
|
return nil, ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
if s.GetMode() != mode.ReadWrite {
|
if s.GetMode().ReadOnly() {
|
||||||
return nil, ErrReadOnlyMode
|
return nil, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeMove(d, treeID, m)
|
return s.pilorama.TreeMove(d, treeID, m)
|
||||||
|
@ -29,7 +28,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return nil, ErrPiloramaDisabled
|
return nil, ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
if s.GetMode() != mode.ReadWrite {
|
if s.GetMode().ReadOnly() {
|
||||||
return nil, ErrReadOnlyMode
|
return nil, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta)
|
return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta)
|
||||||
|
@ -40,7 +39,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return ErrPiloramaDisabled
|
return ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
if s.GetMode() != mode.ReadWrite {
|
if s.GetMode().ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeApply(d, treeID, m)
|
return s.pilorama.TreeApply(d, treeID, m)
|
||||||
|
|
Loading…
Reference in a new issue