[#17] localstorage: Adopt local object storage for new types

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-09-02 15:50:51 +03:00 committed by Alex Vanin
parent edcaef8478
commit 9f51c85054
17 changed files with 227 additions and 1348 deletions

View file

@ -1,35 +0,0 @@
package localstore
import (
"github.com/nspcc-dev/neofs-api-go/hash"
"github.com/nspcc-dev/neofs-api-go/object"
"github.com/nspcc-dev/neofs-api-go/refs"
)
// CID is a type alias of
// CID from refs package of neofs-api-go.
type CID = refs.CID
// SGID is a type alias of
// SGID from refs package of neofs-api-go.
type SGID = refs.ObjectID
// Header is a type alias of
// Header from object package of neofs-api-go.
type Header = object.Header
// Object is a type alias of
// Object from object package of neofs-api-go.
type Object = object.Object
// ObjectID is a type alias of
// ObjectID from refs package of neofs-api-go.
type ObjectID = refs.ObjectID
// Address is a type alias of
// Address from refs package of neofs-api-go.
type Address = refs.Address
// Hash is a type alias of
// Hash from hash package of neofs-api-go.
type Hash = hash.Hash

View file

@ -1,38 +0,0 @@
package localstore
import (
"github.com/nspcc-dev/neofs-api-go/refs"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func (l *localstore) Del(key refs.Address) error {
k, err := key.Hash()
if err != nil {
return errors.Wrap(err, "Localstore Del failed on key.Marshal")
}
// try to fetch object for metrics
obj, err := l.Get(key)
if err != nil {
l.log.Warn("localstore Del failed on localstore.Get", zap.Error(err))
}
if err := l.blobBucket.Del(k); err != nil {
l.log.Warn("Localstore Del failed on BlobBucket.Del", zap.Error(err))
}
if err := l.metaBucket.Del(k); err != nil {
return errors.Wrap(err, "Localstore Del failed on MetaBucket.Del")
}
if obj != nil {
l.col.UpdateContainer(
key.CID,
obj.SystemHeader.PayloadLength,
metrics2.RemSpace)
}
return nil
}

View file

@ -175,27 +175,6 @@ func NewFilter(p *FilterParams) FilterPipeline {
} }
} }
// AllPassIncludingFilter returns FilterPipeline with sub-filters composed from parameters.
// Result filter fails with CodeFail code if any of the sub-filters returns not a CodePass code.
func AllPassIncludingFilter(name string, params ...*FilterParams) (FilterPipeline, error) {
res := NewFilter(&FilterParams{
Name: name,
FilterFunc: SkippingFilterFunc,
})
for i := range params {
if err := res.PutSubFilter(SubFilterParams{
FilterPipeline: NewFilter(params[i]),
OnIgnore: CodeFail,
OnFail: CodeFail,
}); err != nil {
return nil, errors.Wrap(err, "could not create all pass including filter")
}
}
return res, nil
}
func (p *filterPipeline) Pass(ctx context.Context, meta *ObjectMeta) *FilterResult { func (p *filterPipeline) Pass(ctx context.Context, meta *ObjectMeta) *FilterResult {
p.RLock() p.RLock()
defer p.RUnlock() defer p.RUnlock()

View file

@ -1,39 +0,0 @@
package localstore
import (
"context"
)
// SkippingFilterFunc is a FilterFunc that always returns result with
// CodePass code and nil error.
func SkippingFilterFunc(_ context.Context, _ *ObjectMeta) *FilterResult {
return ResultPass()
}
// ContainerFilterFunc returns a FilterFunc that returns:
// - result with CodePass code and nil error if CID of ObjectMeta if from the CID list;
// - result with CodeFail code an nil error otherwise.
func ContainerFilterFunc(cidList []CID) FilterFunc {
return func(_ context.Context, meta *ObjectMeta) *FilterResult {
for i := range cidList {
if meta.Object.SystemHeader.CID.Equal(cidList[i]) {
return ResultPass()
}
}
return ResultFail()
}
}
// StoredEarlierThanFilterFunc returns a FilterFunc that returns:
// - result with CodePass code and nil error if StoreEpoch is less that argument;
// - result with CodeFail code and nil error otherwise.
func StoredEarlierThanFilterFunc(epoch uint64) FilterFunc {
return func(_ context.Context, meta *ObjectMeta) *FilterResult {
if meta.StoreEpoch < epoch {
return ResultPass()
}
return ResultFail()
}
}

View file

@ -1,18 +1,12 @@
package localstore package localstore
import ( import (
"context"
"errors" "errors"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestSkippingFilterFunc(t *testing.T) {
res := SkippingFilterFunc(context.TODO(), &ObjectMeta{})
require.Equal(t, CodePass, res.Code())
}
func TestFilterResult(t *testing.T) { func TestFilterResult(t *testing.T) {
var ( var (
r *FilterResult r *FilterResult

View file

@ -1,30 +0,0 @@
package localstore
import (
"github.com/nspcc-dev/neofs-api-go/refs"
"github.com/pkg/errors"
)
func (l *localstore) Get(key refs.Address) (*Object, error) {
var (
err error
k, v []byte
o = new(Object)
)
k, err = key.Hash()
if err != nil {
return nil, errors.Wrap(err, "Localstore Get failed on key.Marshal")
}
v, err = l.blobBucket.Get(k)
if err != nil {
return nil, errors.Wrap(err, "Localstore Get failed on blobBucket.Get")
}
if err = o.Unmarshal(v); err != nil {
return nil, errors.Wrap(err, "Localstore Get failed on Object.Unmarshal")
}
return o, nil
}

View file

@ -1,20 +0,0 @@
package localstore
import (
"github.com/nspcc-dev/neofs-api-go/refs"
"github.com/pkg/errors"
)
func (l *localstore) Has(key refs.Address) (bool, error) {
var (
err error
k []byte
)
k, err = key.Hash()
if err != nil {
return false, errors.Wrap(err, "localstore.Has failed on key.Marshal")
}
return l.metaBucket.Has(k) && l.blobBucket.Has(k), nil
}

View file

@ -1,102 +0,0 @@
package localstore
import (
"context"
"github.com/nspcc-dev/neofs-api-go/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type (
// Localstore is an interface of local object storage.
Localstore interface {
Put(context.Context, *Object) error
Get(Address) (*Object, error)
Del(Address) error
Meta(Address) (*ObjectMeta, error)
Iterator
Has(Address) (bool, error)
ObjectsCount() (uint64, error)
object.PositionReader
Size() int64
}
// MetaHandler is a function that handles ObjectMeta.
MetaHandler func(*ObjectMeta) bool
// Iterator is an interface of the iterator over local object storage.
Iterator interface {
Iterate(FilterPipeline, MetaHandler) error
}
// ListItem is an ObjectMeta wrapper.
ListItem struct {
ObjectMeta
}
// Params groups the parameters of
// local object storage constructor.
Params struct {
BlobBucket bucket.Bucket
MetaBucket bucket.Bucket
Logger *zap.Logger
Collector metrics2.Collector
}
localstore struct {
metaBucket bucket.Bucket
blobBucket bucket.Bucket
log *zap.Logger
col metrics2.Collector
}
)
// ErrOutOfRange is returned when requested object payload range is
// out of object payload bounds.
var ErrOutOfRange = errors.New("range is out of payload bounds")
// ErrEmptyMetaHandler is returned by functions that expect
// a non-nil MetaHandler, but received nil.
var ErrEmptyMetaHandler = errors.New("meta handler is nil")
var errNilLogger = errors.New("logger is nil")
var errNilCollector = errors.New("metrics collector is nil")
// New is a local object storage constructor.
func New(p Params) (Localstore, error) {
switch {
case p.MetaBucket == nil:
return nil, errors.New("meta bucket is nil")
case p.BlobBucket == nil:
return nil, errors.New("blob bucket is nil")
case p.Logger == nil:
return nil, errNilLogger
case p.Collector == nil:
return nil, errNilCollector
}
return &localstore{
metaBucket: p.MetaBucket,
blobBucket: p.BlobBucket,
log: p.Logger,
col: p.Collector,
}, nil
}
func (l localstore) Size() int64 { return l.blobBucket.Size() }
// TODO: implement less costly method of counting.
func (l localstore) ObjectsCount() (uint64, error) {
items, err := l.metaBucket.List()
if err != nil {
return 0, err
}
return uint64(len(items)), nil
}

View file

@ -1,41 +0,0 @@
package localstore
import (
"context"
"go.uber.org/zap"
)
func (l *localstore) Iterate(filter FilterPipeline, handler MetaHandler) error {
if handler == nil {
return ErrEmptyMetaHandler
} else if filter == nil {
filter = NewFilter(&FilterParams{
Name: "SKIPPING_FILTER",
FilterFunc: SkippingFilterFunc,
})
}
return l.metaBucket.Iterate(func(_, v []byte) bool {
meta := new(ObjectMeta)
if err := meta.Unmarshal(v); err != nil {
l.log.Error("unmarshal meta bucket item failure", zap.Error(err))
} else if filter.Pass(context.TODO(), meta).Code() == CodePass {
return !handler(meta)
}
return true
})
}
// ListItems iterates over Iterator with FilterPipeline and returns all passed items.
func ListItems(it Iterator, f FilterPipeline) ([]ListItem, error) {
res := make([]ListItem, 0)
err := it.Iterate(f, func(meta *ObjectMeta) (stop bool) {
res = append(res, ListItem{
ObjectMeta: *meta,
})
return
})
return res, err
}

View file

@ -1,464 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pkg/local_object_storage/localstore/localstore.proto
package localstore
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/golang/protobuf/proto"
object "github.com/nspcc-dev/neofs-api-go/object"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type ObjectMeta struct {
Object *object.Object `protobuf:"bytes,1,opt,name=Object,proto3" json:"Object,omitempty"`
PayloadHash Hash `protobuf:"bytes,2,opt,name=PayloadHash,proto3,customtype=Hash" json:"PayloadHash"`
PayloadSize uint64 `protobuf:"varint,3,opt,name=PayloadSize,proto3" json:"PayloadSize,omitempty"`
StoreEpoch uint64 `protobuf:"varint,4,opt,name=StoreEpoch,proto3" json:"StoreEpoch,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ObjectMeta) Reset() { *m = ObjectMeta{} }
func (m *ObjectMeta) String() string { return proto.CompactTextString(m) }
func (*ObjectMeta) ProtoMessage() {}
func (*ObjectMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_6a79df71a3d7f4f8, []int{0}
}
func (m *ObjectMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ObjectMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ObjectMeta.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ObjectMeta) XXX_Merge(src proto.Message) {
xxx_messageInfo_ObjectMeta.Merge(m, src)
}
func (m *ObjectMeta) XXX_Size() int {
return m.Size()
}
func (m *ObjectMeta) XXX_DiscardUnknown() {
xxx_messageInfo_ObjectMeta.DiscardUnknown(m)
}
var xxx_messageInfo_ObjectMeta proto.InternalMessageInfo
func (m *ObjectMeta) GetObject() *object.Object {
if m != nil {
return m.Object
}
return nil
}
func (m *ObjectMeta) GetPayloadSize() uint64 {
if m != nil {
return m.PayloadSize
}
return 0
}
func (m *ObjectMeta) GetStoreEpoch() uint64 {
if m != nil {
return m.StoreEpoch
}
return 0
}
func init() {
proto.RegisterType((*ObjectMeta)(nil), "localstore.ObjectMeta")
}
func init() {
proto.RegisterFile("pkg/local_object_storage/localstore/localstore.proto", fileDescriptor_6a79df71a3d7f4f8)
}
var fileDescriptor_6a79df71a3d7f4f8 = []byte{
// 267 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x29, 0xc8, 0x4e, 0xd7,
0xcf, 0xc9, 0x4f, 0x4e, 0xcc, 0x89, 0xcf, 0x4f, 0xca, 0x4a, 0x4d, 0x2e, 0x89, 0x2f, 0x2e, 0xc9,
0x2f, 0x4a, 0x4c, 0x4f, 0x85, 0x08, 0x82, 0x38, 0xc8, 0x4c, 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c,
0x21, 0x2e, 0x84, 0x88, 0x94, 0x10, 0x44, 0x9f, 0x7e, 0x49, 0x65, 0x41, 0x6a, 0x31, 0x44, 0x5e,
0x4a, 0x37, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0x3f, 0x3d, 0x3f, 0x3d,
0x5f, 0x1f, 0x2c, 0x9c, 0x54, 0x9a, 0x06, 0xe6, 0x81, 0x39, 0x60, 0x16, 0x44, 0xb9, 0xd2, 0x32,
0x46, 0x2e, 0x2e, 0x7f, 0xb0, 0x29, 0xbe, 0xa9, 0x25, 0x89, 0x42, 0x6a, 0x5c, 0x6c, 0x10, 0x9e,
0x04, 0xa3, 0x02, 0xa3, 0x06, 0xb7, 0x11, 0x9f, 0x1e, 0xc4, 0x0a, 0x3d, 0x88, 0x68, 0x10, 0x54,
0x56, 0x48, 0x8f, 0x8b, 0x3b, 0x20, 0xb1, 0x32, 0x27, 0x3f, 0x31, 0xc5, 0x23, 0xb1, 0x38, 0x43,
0x82, 0x49, 0x81, 0x51, 0x83, 0xc7, 0x89, 0xe7, 0xc4, 0x3d, 0x79, 0x86, 0x5b, 0xf7, 0xe4, 0x59,
0x40, 0x62, 0x41, 0xc8, 0x0a, 0x84, 0x14, 0xe0, 0xea, 0x83, 0x33, 0xab, 0x52, 0x25, 0x98, 0x15,
0x18, 0x35, 0x58, 0x82, 0x90, 0x85, 0x84, 0xe4, 0xb8, 0xb8, 0x82, 0x41, 0x9e, 0x72, 0x2d, 0xc8,
0x4f, 0xce, 0x90, 0x60, 0x01, 0x2b, 0x40, 0x12, 0x71, 0x0a, 0x3d, 0xf1, 0x48, 0x8e, 0xf1, 0xc2,
0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63, 0x88, 0x72, 0x46, 0xf2, 0x69,
0x5e, 0x71, 0x41, 0x72, 0xb2, 0x6e, 0x4a, 0x6a, 0x99, 0x7e, 0x5e, 0x6a, 0x7e, 0x5a, 0xb1, 0x6e,
0x5e, 0x7e, 0x4a, 0xaa, 0x3e, 0x11, 0xe1, 0x9b, 0xc4, 0x06, 0x0e, 0x06, 0x63, 0x40, 0x00, 0x00,
0x00, 0xff, 0xff, 0x80, 0x4c, 0xae, 0xf3, 0x8d, 0x01, 0x00, 0x00,
}
func (m *ObjectMeta) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ObjectMeta) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ObjectMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.StoreEpoch != 0 {
i = encodeVarintLocalstore(dAtA, i, uint64(m.StoreEpoch))
i--
dAtA[i] = 0x20
}
if m.PayloadSize != 0 {
i = encodeVarintLocalstore(dAtA, i, uint64(m.PayloadSize))
i--
dAtA[i] = 0x18
}
{
size := m.PayloadHash.Size()
i -= size
if _, err := m.PayloadHash.MarshalTo(dAtA[i:]); err != nil {
return 0, err
}
i = encodeVarintLocalstore(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
if m.Object != nil {
{
size, err := m.Object.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintLocalstore(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintLocalstore(dAtA []byte, offset int, v uint64) int {
offset -= sovLocalstore(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *ObjectMeta) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Object != nil {
l = m.Object.Size()
n += 1 + l + sovLocalstore(uint64(l))
}
l = m.PayloadHash.Size()
n += 1 + l + sovLocalstore(uint64(l))
if m.PayloadSize != 0 {
n += 1 + sovLocalstore(uint64(m.PayloadSize))
}
if m.StoreEpoch != 0 {
n += 1 + sovLocalstore(uint64(m.StoreEpoch))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovLocalstore(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozLocalstore(x uint64) (n int) {
return sovLocalstore(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *ObjectMeta) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLocalstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ObjectMeta: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ObjectMeta: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Object", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLocalstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLocalstore
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLocalstore
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Object == nil {
m.Object = &object.Object{}
}
if err := m.Object.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PayloadHash", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLocalstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthLocalstore
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthLocalstore
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.PayloadHash.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PayloadSize", wireType)
}
m.PayloadSize = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLocalstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.PayloadSize |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field StoreEpoch", wireType)
}
m.StoreEpoch = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLocalstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.StoreEpoch |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipLocalstore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLocalstore
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLocalstore
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipLocalstore(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowLocalstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowLocalstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowLocalstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthLocalstore
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupLocalstore
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthLocalstore
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthLocalstore = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowLocalstore = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupLocalstore = fmt.Errorf("proto: unexpected end of group")
)

View file

@ -1,14 +0,0 @@
syntax = "proto3";
option go_package = "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore";
package localstore;
import "object/types.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message ObjectMeta {
object.Object Object = 1;
bytes PayloadHash = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "Hash"];
uint64 PayloadSize = 3;
uint64 StoreEpoch = 4;
}

View file

@ -1,418 +0,0 @@
package localstore
import (
"context"
"sync"
"testing"
"github.com/google/uuid"
"github.com/nspcc-dev/neofs-api-go/container"
"github.com/nspcc-dev/neofs-api-go/hash"
"github.com/nspcc-dev/neofs-api-go/object"
"github.com/nspcc-dev/neofs-api-go/refs"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/test"
meta2 "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/meta"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
type (
fakeCollector struct {
sync.Mutex
items map[refs.CID]uint64
}
)
func (f *fakeCollector) Start(_ context.Context) { panic("implement me") }
func (f *fakeCollector) UpdateSpaceUsage() { panic("implement me") }
func (f *fakeCollector) SetIterator(_ meta2.Iterator) { panic("implement me") }
func (f *fakeCollector) SetCounter(counter metrics2.ObjectCounter) { panic("implement me") }
func (f *fakeCollector) UpdateContainer(cid refs.CID, size uint64, op metrics2.SpaceOp) {
f.Lock()
defer f.Unlock()
switch op {
case metrics2.AddSpace:
f.items[cid] += size
case metrics2.RemSpace:
if val, ok := f.items[cid]; !ok || val < size {
return
}
f.items[cid] -= size
default:
return
}
}
func newCollector() metrics2.Collector {
return &fakeCollector{
items: make(map[refs.CID]uint64),
}
}
func testObject(t *testing.T) *Object {
var (
uid refs.UUID
cid CID
)
t.Run("Prepare object", func(t *testing.T) {
cnr, err := container.NewTestContainer()
require.NoError(t, err)
cid, err = cnr.ID()
require.NoError(t, err)
id, err := uuid.NewRandom()
uid = refs.UUID(id)
require.NoError(t, err)
})
obj := &Object{
SystemHeader: object.SystemHeader{
Version: 1,
ID: uid,
CID: cid,
OwnerID: refs.OwnerID([refs.OwnerIDSize]byte{}), // TODO: avoid hardcode
},
Headers: []Header{
{
Value: &object.Header_UserHeader{
UserHeader: &object.UserHeader{
Key: "Profession",
Value: "Developer",
},
},
},
{
Value: &object.Header_UserHeader{
UserHeader: &object.UserHeader{
Key: "Language",
Value: "GO",
},
},
},
},
}
return obj
}
func newLocalstore(t *testing.T) Localstore {
ls, err := New(Params{
BlobBucket: test.Bucket(),
MetaBucket: test.Bucket(),
Logger: zap.L(),
Collector: newCollector(),
})
require.NoError(t, err)
return ls
}
func TestNew(t *testing.T) {
t.Run("New localstore", func(t *testing.T) {
var err error
_, err = New(Params{})
require.Error(t, err)
_, err = New(Params{
BlobBucket: test.Bucket(),
MetaBucket: test.Bucket(),
Logger: zap.L(),
Collector: newCollector(),
})
require.NoError(t, err)
})
}
func TestLocalstore_Del(t *testing.T) {
t.Run("Del method", func(t *testing.T) {
var (
err error
ls Localstore
obj *Object
)
ls = newLocalstore(t)
obj = testObject(t)
obj.SetPayload([]byte("Hello, world"))
k := *obj.Address()
store, ok := ls.(*localstore)
require.True(t, ok)
require.NotNil(t, store)
metric, ok := store.col.(*fakeCollector)
require.True(t, ok)
require.NotNil(t, metric)
err = ls.Put(context.Background(), obj)
require.NoError(t, err)
require.NotEmpty(t, obj.Payload)
require.Contains(t, metric.items, obj.SystemHeader.CID)
require.Equal(t, obj.SystemHeader.PayloadLength, metric.items[obj.SystemHeader.CID])
err = ls.Del(k)
require.NoError(t, err)
require.Contains(t, metric.items, obj.SystemHeader.CID)
require.Equal(t, uint64(0), metric.items[obj.SystemHeader.CID])
_, err = ls.Get(k)
require.Error(t, err)
})
}
func TestLocalstore_Get(t *testing.T) {
t.Run("Get method (default)", func(t *testing.T) {
var (
err error
ls Localstore
obj *Object
)
ls = newLocalstore(t)
obj = testObject(t)
err = ls.Put(context.Background(), obj)
require.NoError(t, err)
k := *obj.Address()
o, err := ls.Get(k)
require.NoError(t, err)
require.Equal(t, obj, o)
})
}
func TestLocalstore_Put(t *testing.T) {
t.Run("Put method", func(t *testing.T) {
var (
err error
ls Localstore
obj *Object
)
ls = newLocalstore(t)
store, ok := ls.(*localstore)
require.True(t, ok)
require.NotNil(t, store)
metric, ok := store.col.(*fakeCollector)
require.True(t, ok)
require.NotNil(t, metric)
obj = testObject(t)
err = ls.Put(context.Background(), obj)
require.NoError(t, err)
require.Contains(t, metric.items, obj.SystemHeader.CID)
require.Equal(t, obj.SystemHeader.PayloadLength, metric.items[obj.SystemHeader.CID])
o, err := ls.Get(*obj.Address())
require.NoError(t, err)
require.Equal(t, obj, o)
})
}
func TestLocalstore_List(t *testing.T) {
t.Run("List method (no filters)", func(t *testing.T) {
var (
err error
ls Localstore
objCount = 10
objs = make([]Object, objCount)
)
for i := range objs {
objs[i] = *testObject(t)
}
ls = newLocalstore(t)
for i := range objs {
err = ls.Put(context.Background(), &objs[i])
require.NoError(t, err)
}
items, err := ListItems(ls, nil)
require.NoError(t, err)
require.Len(t, items, objCount)
for i := range items {
require.Contains(t, objs, *items[i].Object)
}
})
t.Run("List method ('bad' filter)", func(t *testing.T) {
var (
err error
ls Localstore
objCount = 10
objs = make([]Object, objCount)
)
for i := range objs {
objs[i] = *testObject(t)
}
ls = newLocalstore(t)
for i := range objs {
err = ls.Put(context.Background(), &objs[i])
require.NoError(t, err)
}
items, err := ListItems(ls, NewFilter(&FilterParams{
FilterFunc: ContainerFilterFunc([]CID{}),
}))
require.NoError(t, err)
require.Len(t, items, 0)
})
t.Run("List method (filter by cid)", func(t *testing.T) {
var (
err error
ls Localstore
objCount = 10
objs = make([]Object, objCount)
)
for i := range objs {
objs[i] = *testObject(t)
}
ls = newLocalstore(t)
for i := range objs {
err = ls.Put(context.Background(), &objs[i])
require.NoError(t, err)
}
cidVals := []CID{objs[0].SystemHeader.CID}
items, err := ListItems(ls, NewFilter(&FilterParams{
FilterFunc: ContainerFilterFunc(cidVals),
}))
require.NoError(t, err)
require.Len(t, items, 1)
for i := range items {
require.Contains(t, objs, *items[i].Object)
}
})
t.Run("Filter stored earlier", func(t *testing.T) {
var (
err error
ls Localstore
objCount = 10
objs = make([]Object, objCount)
epoch uint64 = 100
list []ListItem
)
for i := range objs {
objs[i] = *testObject(t)
}
ls = newLocalstore(t)
ctx := context.WithValue(context.Background(), StoreEpochValue, epoch)
for i := range objs {
err = ls.Put(ctx, &objs[i])
require.NoError(t, err)
}
list, err = ListItems(ls, NewFilter(&FilterParams{
FilterFunc: StoredEarlierThanFilterFunc(epoch - 1),
}))
require.NoError(t, err)
require.Empty(t, list)
list, err = ListItems(ls, NewFilter(&FilterParams{
FilterFunc: StoredEarlierThanFilterFunc(epoch),
}))
require.NoError(t, err)
require.Empty(t, list)
list, err = ListItems(ls, NewFilter(&FilterParams{
FilterFunc: StoredEarlierThanFilterFunc(epoch + 1),
}))
require.NoError(t, err)
require.Len(t, list, objCount)
})
t.Run("Filter with complex filter", func(t *testing.T) {
var (
err error
ls Localstore
objCount = 10
objs = make([]Object, objCount)
)
for i := range objs {
objs[i] = *testObject(t)
}
ls = newLocalstore(t)
for i := range objs {
err = ls.Put(context.Background(), &objs[i])
require.NoError(t, err)
}
cidVals := []CID{objs[0].SystemHeader.CID}
mainF, err := AllPassIncludingFilter("TEST_FILTER", &FilterParams{
Name: "CID_LIST",
FilterFunc: ContainerFilterFunc(cidVals),
})
items, err := ListItems(ls, mainF)
require.NoError(t, err)
require.Len(t, items, 1)
})
t.Run("Meta info", func(t *testing.T) {
var (
err error
ls Localstore
objCount = 10
objs = make([]Object, objCount)
epoch uint64 = 100
)
for i := range objs {
objs[i] = *testObject(t)
}
ls = newLocalstore(t)
ctx := context.WithValue(context.Background(), StoreEpochValue, epoch)
for i := range objs {
err = ls.Put(ctx, &objs[i])
require.NoError(t, err)
meta, err := ls.Meta(*objs[i].Address())
require.NoError(t, err)
noPayload := objs[i]
noPayload.Payload = nil
require.Equal(t, *meta.Object, noPayload)
require.Equal(t, meta.PayloadHash, hash.Sum(objs[i].Payload))
require.Equal(t, meta.PayloadSize, uint64(len(objs[i].Payload)))
require.Equal(t, epoch, meta.StoreEpoch)
}
})
}

View file

@ -1,52 +1,82 @@
package localstore package localstore
import ( import (
"context" "encoding/binary"
"io"
"github.com/nspcc-dev/neofs-api-go/hash" "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-api-go/refs"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// StoreEpochValue is a context key of object storing epoch number. // ObjectMeta represents meta information about
const StoreEpochValue = "store epoch" // the object that is stored in meta storage.
type ObjectMeta struct {
head *object.Object
func (l *localstore) Meta(key refs.Address) (*ObjectMeta, error) { savedAtEpoch uint64
var (
err error
meta ObjectMeta
k, v []byte
)
k, err = key.Hash()
if err != nil {
return nil, errors.Wrap(err, "Localstore Meta failed on key.Marshal")
} }
v, err = l.metaBucket.Get(k) // SavedAtEpoch returns the number of epoch
if err != nil { // at which the object was saved locally.
return nil, errors.Wrap(err, "Localstore Meta failed on metaBucket.Get") func (m *ObjectMeta) SavedAtEpoch() uint64 {
if m != nil {
return m.savedAtEpoch
} }
if err := meta.Unmarshal(v); err != nil { return 0
return nil, errors.Wrap(err, "Localstore Metafailed on ObjectMeta.Unmarshal")
} }
return &meta, nil // Head returns the object w/o payload.
func (m *ObjectMeta) Head() *object.Object {
if m != nil {
return m.head
} }
func metaFromObject(ctx context.Context, obj *Object) *ObjectMeta { return nil
}
// AddressFromMeta extracts the Address from object meta.
func AddressFromMeta(m *ObjectMeta) *object.Address {
return m.Head().Address()
}
func metaFromObject(o *object.Object) *ObjectMeta {
// FIXME: remove hard-code
meta := new(ObjectMeta) meta := new(ObjectMeta)
o := *obj meta.savedAtEpoch = 10
meta.Object = &o meta.head = &object.Object{
meta.Object.Payload = nil Object: o.CutPayload(),
meta.PayloadSize = uint64(len(obj.Payload))
meta.PayloadHash = hash.Sum(obj.Payload)
storeEpoch, ok := ctx.Value(StoreEpochValue).(uint64)
if ok {
meta.StoreEpoch = storeEpoch
} }
return meta return meta
} }
func metaToBytes(m *ObjectMeta) ([]byte, error) {
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, m.savedAtEpoch)
addrBytes, err := m.head.MarshalStableV2()
if err != nil {
return nil, err
}
return append(data, addrBytes...), nil
}
func metaFromBytes(data []byte) (*ObjectMeta, error) {
if len(data) < 8 {
return nil, io.ErrUnexpectedEOF
}
obj, err := object.FromBytes(data[8:])
if err != nil {
return nil, errors.Wrap(err, "could not get object address from bytes")
}
meta := new(ObjectMeta)
meta.savedAtEpoch = binary.BigEndian.Uint64(data)
meta.head = obj
return meta, nil
}

View file

@ -0,0 +1,107 @@
package localstore
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func (s *Storage) Put(obj *object.Object) error {
addrBytes, err := obj.Address().MarshalStableV2()
if err != nil {
return errors.Wrap(err, "could not marshal object address")
}
objBytes, err := obj.MarshalStableV2()
if err != nil {
return errors.Wrap(err, "could not marshal the object")
}
metaBytes, err := metaToBytes(metaFromObject(obj))
if err != nil {
return errors.Wrap(err, "could not marshal object meta")
}
if err := s.blobBucket.Set(addrBytes, objBytes); err != nil {
return errors.Wrap(err, "could no save object in BLOB storage")
}
if err := s.metaBucket.Set(addrBytes, metaBytes); err != nil {
return errors.Wrap(err, "could not save object in meta storage")
}
return nil
}
func (s *Storage) Delete(addr *object.Address) error {
addrBytes, err := addr.MarshalStableV2()
if err != nil {
return errors.Wrap(err, "could not marshal object address")
}
if err := s.blobBucket.Del(addrBytes); err != nil {
s.log.Warn("could not remove object from BLOB storage",
zap.Error(err),
)
}
if err := s.metaBucket.Del(addrBytes); err != nil {
return errors.Wrap(err, "could not remove object from meta storage")
}
return nil
}
func (s *Storage) Get(addr *object.Address) (*object.Object, error) {
addrBytes, err := addr.MarshalStableV2()
if err != nil {
return nil, errors.Wrap(err, "could not marshal object address")
}
objBytes, err := s.blobBucket.Get(addrBytes)
if err != nil {
return nil, errors.Wrap(err, "could not get object from BLOB storage")
}
return object.FromBytes(objBytes)
}
func (s *Storage) Head(addr *object.Address) (*ObjectMeta, error) {
addrBytes, err := addr.MarshalStableV2()
if err != nil {
return nil, errors.Wrap(err, "could not marshal object address")
}
metaBytes, err := s.metaBucket.Get(addrBytes)
if err != nil {
return nil, errors.Wrap(err, "could not get object from meta storage")
}
return metaFromBytes(metaBytes)
}
func (s *Storage) Iterate(filter FilterPipeline, handler func(*ObjectMeta) bool) error {
if filter == nil {
filter = NewFilter(&FilterParams{
Name: "SKIPPING_FILTER",
FilterFunc: func(context.Context, *ObjectMeta) *FilterResult {
return ResultPass()
},
})
}
return s.metaBucket.Iterate(func(_, v []byte) bool {
meta, err := metaFromBytes(v)
if err != nil {
s.log.Error("unmarshal meta bucket item failure",
zap.Error(err),
)
} else if filter.Pass(context.TODO(), meta).Code() == CodePass {
return !handler(meta)
}
return true
})
}

View file

@ -1,47 +0,0 @@
package localstore
import (
"context"
"github.com/nspcc-dev/neofs-api-go/refs"
metrics2 "github.com/nspcc-dev/neofs-node/pkg/services/metrics"
"github.com/pkg/errors"
)
func (l *localstore) Put(ctx context.Context, obj *Object) error {
var (
oa refs.Address
k, v []byte
err error
)
oa = *obj.Address()
k, err = oa.Hash()
if err != nil {
return errors.Wrap(err, "Localstore Put failed on StorageKey.marshal")
}
if v, err = obj.Marshal(); err != nil {
return errors.Wrap(err, "Localstore Put failed on blobValue")
}
if err = l.blobBucket.Set(k, v); err != nil {
return errors.Wrap(err, "Localstore Put failed on BlobBucket.Set")
}
if v, err = metaFromObject(ctx, obj).Marshal(); err != nil {
return errors.Wrap(err, "Localstore Put failed on metaValue")
}
if err = l.metaBucket.Set(k, v); err != nil {
return errors.Wrap(err, "Localstore Put failed on MetaBucket.Set")
}
l.col.UpdateContainer(
obj.SystemHeader.CID,
obj.SystemHeader.PayloadLength,
metrics2.AddSpace)
return nil
}

View file

@ -1,36 +0,0 @@
package localstore
import (
"context"
"github.com/nspcc-dev/neofs-api-go/object"
"github.com/pkg/errors"
)
func (l *localstore) PRead(ctx context.Context, key Address, rng object.Range) ([]byte, error) {
var (
err error
k, v []byte
obj Object
)
k, err = key.Hash()
if err != nil {
return nil, errors.Wrap(err, "Localstore Get failed on key.Marshal")
}
v, err = l.blobBucket.Get(k)
if err != nil {
return nil, errors.Wrap(err, "Localstore Get failed on blobBucket.Get")
}
if err := obj.Unmarshal(v); err != nil {
return nil, errors.Wrap(err, "Localstore Get failed on object.Unmarshal")
}
if rng.Offset+rng.Length > uint64(len(obj.Payload)) {
return nil, ErrOutOfRange
}
return obj.Payload[rng.Offset : rng.Offset+rng.Length], nil
}

View file

@ -0,0 +1,53 @@
package localstore
import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Storage represents NeoFS local object storage.
type Storage struct {
log *logger.Logger
metaBucket bucket.Bucket
blobBucket bucket.Bucket
}
// Option is an option of Storage constructor.
type Option func(*cfg)
type cfg struct {
logger *logger.Logger
}
func defaultCfg() *cfg {
return &cfg{
logger: zap.L(),
}
}
// New is a local object storage constructor.
func New(blob, meta bucket.Bucket, opts ...Option) *Storage {
cfg := defaultCfg()
for i := range opts {
opts[i](cfg)
}
return &Storage{
metaBucket: meta,
blobBucket: blob,
log: cfg.logger,
}
}
// WithLogger returns Storage option of used logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
if l != nil {
c.logger = l
}
}
}