frostfs-node/pkg/local_object_storage/blobstor/memstore/memstore.go

166 lines
4.2 KiB
Go

// Package memstore implements a memory-backed common.Storage for testing purposes.
package memstore
import (
"context"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
const Type = "memstore"
type memstoreImpl struct {
*cfg
mu sync.RWMutex
objs map[string][]byte
}
func New(opts ...Option) common.Storage {
st := &memstoreImpl{
cfg: defaultConfig(),
objs: map[string][]byte{},
}
for _, opt := range opts {
opt(st.cfg)
}
return st
}
func (s *memstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()
data, exists := s.objs[key]
s.mu.RUnlock()
if !exists {
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
// Decompress the data.
var err error
if data, err = s.compression.Decompress(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// Unmarshal the SDK object.
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return common.GetRes{Object: obj, RawData: data}, nil
}
func (s *memstoreImpl) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(ctx, common.GetPrm{
Address: req.Address,
StorageID: req.StorageID,
})
if err != nil {
return common.GetRangeRes{}, err
}
payload := getResp.Object.Payload()
from := req.Range.GetOffset()
to := from + req.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
return common.GetRangeRes{
Data: payload[from:to],
}, nil
}
func (s *memstoreImpl) Exists(_ context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
key := req.Address.EncodeToString()
s.mu.RLock()
defer s.mu.RUnlock()
_, exists := s.objs[key]
return common.ExistsRes{Exists: exists}, nil
}
func (s *memstoreImpl) Put(_ context.Context, req common.PutPrm) (common.PutRes, error) {
if s.readOnly {
return common.PutRes{}, common.ErrReadOnly
}
if !req.DontCompress {
req.RawData = s.compression.Compress(req.RawData)
}
key := req.Address.EncodeToString()
s.mu.Lock()
defer s.mu.Unlock()
s.objs[key] = req.RawData
return common.PutRes{StorageID: []byte(s.rootPath)}, nil
}
func (s *memstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.DeleteRes, error) {
if s.readOnly {
return common.DeleteRes{}, common.ErrReadOnly
}
key := req.Address.EncodeToString()
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.objs[key]; exists {
delete(s.objs, key)
return common.DeleteRes{}, nil
}
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for k, v := range s.objs {
elem := common.IterationElement{
ObjectData: v,
}
if err := elem.Address.DecodeString(string(k)); err != nil {
if req.IgnoreErrors {
continue
}
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err))
}
var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
if req.IgnoreErrors {
continue
}
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
}
switch {
case req.Handler != nil:
if err := req.Handler(elem); err != nil {
return common.IterateRes{}, err
}
default:
if !req.IgnoreErrors {
return common.IterateRes{}, logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))
}
}
}
return common.IterateRes{}, nil
}
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}