forked from TrueCloudLab/frostfs-node
6f8c45d61b
In previous implementation each operation on local storage locked engine mutex. This was done under the assumption that the weights of the shards change as a result of write operations. With the transition to static weights of shards, it is no longer necessary to lock the global mutex during the execution of operations. However, since the set of engine shards is dynamic, there is still a need to control multiple access to this set. The same mutex is used for synchronization. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
91 lines
1.9 KiB
Go
91 lines
1.9 KiB
Go
package engine
|
|
|
|
import (
|
|
"errors"
|
|
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// PutPrm groups the parameters of Put operation.
|
|
type PutPrm struct {
|
|
obj *object.Object
|
|
}
|
|
|
|
// PutRes groups resulting values of Put operation.
|
|
type PutRes struct{}
|
|
|
|
var errPutShard = errors.New("could not put object to any shard")
|
|
|
|
// WithObject is a Put option to set object to save.
|
|
//
|
|
// Option is required.
|
|
func (p *PutPrm) WithObject(obj *object.Object) *PutPrm {
|
|
if p != nil {
|
|
p.obj = obj
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// Put saves the object to local storage.
|
|
//
|
|
// Returns any error encountered that
|
|
// did not allow to completely save the object.
|
|
func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) {
|
|
// choose shards through sorting by weight
|
|
sortedShards := e.sortShardsByWeight(prm.obj.Address())
|
|
|
|
// check object existence
|
|
if e.objectExists(prm.obj, sortedShards) {
|
|
return nil, nil
|
|
}
|
|
|
|
shPrm := new(shard.PutPrm)
|
|
|
|
// save the object into the "largest" possible shard
|
|
for _, sh := range sortedShards {
|
|
_, err := sh.Put(
|
|
shPrm.WithObject(prm.obj),
|
|
)
|
|
|
|
if err != nil {
|
|
// TODO: smth wrong with shard, need to be processed
|
|
e.log.Warn("could not save object in shard",
|
|
zap.Stringer("shard", sh.ID()),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
} else {
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
return nil, errPutShard
|
|
}
|
|
|
|
func (e *StorageEngine) objectExists(obj *object.Object, shards []*shard.Shard) bool {
|
|
exists := false
|
|
|
|
for _, sh := range shards {
|
|
res, err := sh.Exists(
|
|
new(shard.ExistsPrm).
|
|
WithAddress(obj.Address()),
|
|
)
|
|
|
|
if err != nil {
|
|
// TODO: smth wrong with shard, need to be processed
|
|
e.log.Warn("could not check object existence",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
if exists = res.Exists(); exists {
|
|
break
|
|
}
|
|
}
|
|
|
|
return exists
|
|
}
|