frostfs-node/pkg/local_object_storage/engine/get.go
Leonard Lyubich 6f8c45d61b [#176] localstore: Change multiple access sync
In previous implementation each operation on local storage
locked engine mutex. This was done under the assumption that
the weights of the shards change as a result of write operations.
With the transition to static weights of shards, it is no longer
necessary to lock the global mutex during the execution of operations.
However, since the set of engine shards is dynamic, there is still a
need to control multiple access to this set. The same mutex is used
for synchronization.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-12-11 17:19:37 +03:00

99 lines
2.2 KiB
Go

package engine
import (
"errors"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"go.uber.org/zap"
)
// GetPrm groups the parameters of Get operation.
type GetPrm struct {
off, ln uint64
addr *objectSDK.Address
}
// GetRes groups resulting values of Get operation.
type GetRes struct {
obj *object.Object
}
// ErrObjectNotFound is returns on read operations requested on a missing object.
var ErrObjectNotFound = errors.New("object not found")
// WithAddress is a Get option to set the address of the requested object.
//
// Option is required.
func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
if p != nil {
p.addr = addr
}
return p
}
// WithPayloadRange is a Get option to set range of requested payload data.
//
// Missing an option or calling with zero length is equivalent
// to getting the full payload range.
func (p *GetPrm) WithPayloadRange(off, ln uint64) *GetPrm {
if p != nil {
p.off, p.ln = off, ln
}
return p
}
// Object returns the requested object part.
//
// Instance payload contains the requested range of the original object.
func (r *GetRes) Object() *object.Object {
return r.obj
}
// Get reads part of an object from local storage.
//
// Returns any error encountered that
// did not allow to completely read the object part.
//
// Returns ErrObjectNotFound if requested object is missing in local storage.
func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) {
var obj *object.Object
shPrm := new(shard.GetPrm).
WithAddress(prm.addr)
if prm.ln == 0 {
shPrm = shPrm.WithFullRange()
} else {
shPrm = shPrm.WithRange(prm.off, int64(prm.ln))
}
e.iterateOverSortedShards(prm.addr, func(sh *shard.Shard) (stop bool) {
res, err := sh.Get(shPrm)
if err != nil {
if !errors.Is(err, shard.ErrObjectNotFound) {
// TODO: smth wrong with shard, need to be processed
e.log.Warn("could not get object from shard",
zap.Stringer("shard", sh.ID()),
zap.String("error", err.Error()),
)
}
} else {
obj = res.Object()
}
return err == nil
})
if obj == nil {
return nil, ErrObjectNotFound
}
return &GetRes{
obj: obj,
}, nil
}