package getsvc import ( "context" "crypto/ecdsa" "crypto/rand" "errors" "fmt" "strconv" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmaptest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) type testStorage struct { inhumed map[string]struct{} virtual map[string]*objectSDK.SplitInfo phy map[string]*objectSDK.Object } type testTraverserGenerator struct { c container.Container b map[uint64]placement.Builder } type testPlacementBuilder struct { vectors map[string][][]netmap.NodeInfo } type testClientCache struct { clients map[string]*testClient } type testClient struct { results map[string]struct { obj *objectSDK.Object err error } } type testEpochReceiver uint64 func (e testEpochReceiver) Epoch() (uint64, error) { return uint64(e), nil } func newTestStorage() *testStorage { return &testStorage{ inhumed: make(map[string]struct{}), virtual: make(map[string]*objectSDK.SplitInfo), phy: make(map[string]*objectSDK.Object), } } func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { opts := make([]placement.Option, 0, 4) opts = append(opts, placement.ForContainer(g.c), placement.UseBuilder(g.b[e]), placement.SuccessAfter(1), ) if obj != nil { opts = append(opts, placement.ForObject(*obj)) } return placement.NewTraverser(opts...) } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { var addr oid.Address addr.SetContainer(cnr) if obj != nil { addr.SetObject(*obj) } vs, ok := p.vectors[addr.EncodeToString()] if !ok { return nil, errors.New("vectors for address not found") } return vs, nil } func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) { v, ok := c.clients[network.StringifyGroup(info.AddressGroup())] if !ok { return nil, errors.New("could not construct client") } return v, nil } func newTestClient() *testClient { return &testClient{ results: map[string]struct { obj *objectSDK.Object err error }{}, } } func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) { c.results[addr.EncodeToString()] = struct { obj *objectSDK.Object err error }{obj: obj, err: err} } func (c *testClient) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) { v, ok := c.results[address.EncodeToString()] if !ok { return nil, new(apistatus.ObjectNotFound) } if v.err != nil { return nil, v.err } return v.obj, nil } func (c *testClient) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) { return c.Get(ctx, address, requestParams) } func (c *testClient) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) { obj, err := c.Get(ctx, address, requestParams) if err != nil { return nil, err } return cutToRange(obj, rng), nil } func (c *testClient) ForwardRequest(ctx context.Context, info client.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) { return nil, fmt.Errorf("not implemented") } func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) { return s.Range(ctx, address, nil) } func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) { return s.Range(ctx, address, nil) } func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) { var ( ok bool obj *objectSDK.Object sAddr = address.EncodeToString() ) if _, ok = s.inhumed[sAddr]; ok { return nil, new(apistatus.ObjectAlreadyRemoved) } if info, ok := s.virtual[sAddr]; ok { return nil, objectSDK.NewSplitInfoError(info) } if obj, ok = s.phy[sAddr]; ok { return cutToRange(obj, rng), nil } return nil, new(apistatus.ObjectNotFound) } func cutToRange(o *objectSDK.Object, rng *objectSDK.Range) *objectSDK.Object { if rng == nil { return o } from := rng.GetOffset() to := from + rng.GetLength() payload := o.Payload() o = o.CutPayload() o.SetPayload(payload[from:to]) return o } func (s *testStorage) addPhy(addr oid.Address, obj *objectSDK.Object) { s.phy[addr.EncodeToString()] = obj } func (s *testStorage) addVirtual(addr oid.Address, info *objectSDK.SplitInfo) { s.virtual[addr.EncodeToString()] = info } func (s *testStorage) inhume(addr oid.Address) { s.inhumed[addr.EncodeToString()] = struct{}{} } func generateObject(addr oid.Address, prev *oid.ID, payload []byte, children ...oid.ID) *objectSDK.Object { obj := objectSDK.New() obj.SetContainerID(addr.Container()) obj.SetID(addr.Object()) obj.SetPayload(payload) obj.SetPayloadSize(uint64(len(payload))) if prev != nil { obj.SetPreviousID(*prev) } obj.SetChildren(children...) return obj } type writeHeaderError struct{} func (whe *writeHeaderError) Error() string { return "write header error" } type writeHeaderErrorObjectWriter struct{} func (w *writeHeaderErrorObjectWriter) WriteHeader(_ context.Context, _ *objectSDK.Object) error { return &writeHeaderError{} } func (w *writeHeaderErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) error { return nil } type writePayloadError struct{} func (whe *writePayloadError) Error() string { return "write payload error" } type writePayloadErrorObjectWriter struct{} func (w *writePayloadErrorObjectWriter) WriteHeader(_ context.Context, _ *objectSDK.Object) error { return nil } func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) error { return &writePayloadError{} } type testKeyStorage struct{} func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) { return &ecdsa.PrivateKey{}, nil } func TestGetLocalOnly(t *testing.T) { ctx := context.Background() newSvc := func(storage *testStorage) *Service { return &Service{ log: test.NewLogger(t, true), localStorage: storage, } } newPrm := func(raw bool, w ObjectWriter) Prm { p := Prm{} p.SetObjectWriter(w) p.WithRawFlag(raw) p.common = new(util.CommonPrm).WithLocalOnly(true) return p } newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm { p := RangePrm{} p.SetChunkWriter(w) p.WithRawFlag(raw) p.common = new(util.CommonPrm).WithLocalOnly(true) r := objectSDK.NewRange() r.SetOffset(off) r.SetLength(ln) p.SetRange(r) return p } newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm { p := HeadPrm{} p.SetHeaderWriter(w) p.WithRawFlag(raw) p.common = new(util.CommonPrm).WithLocalOnly(true) return p } t.Run("OK", func(t *testing.T) { storage := newTestStorage() svc := newSvc(storage) w := NewSimpleObjectWriter() p := newPrm(false, w) payloadSz := uint64(10) payload := make([]byte, payloadSz) rand.Read(payload) addr := oidtest.Address() obj := generateObject(addr, nil, payload) storage.addPhy(addr, obj) p.WithAddress(addr) storage.addPhy(addr, obj) err := svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, obj, w.Object()) w = NewSimpleObjectWriter() rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.Object().Payload()) w = NewSimpleObjectWriter() headPrm := newHeadPrm(false, w) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.NoError(t, err) require.Equal(t, obj.CutPayload(), w.Object()) }) t.Run("INHUMED", func(t *testing.T) { storage := newTestStorage() svc := newSvc(storage) p := newPrm(false, nil) addr := oidtest.Address() storage.inhume(addr) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectAlreadyRemoved(err)) rngPrm := newRngPrm(false, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectAlreadyRemoved(err)) headPrm := newHeadPrm(false, nil) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.True(t, clientSDK.IsErrObjectAlreadyRemoved(err)) }) t.Run("404", func(t *testing.T) { storage := newTestStorage() svc := newSvc(storage) p := newPrm(false, nil) addr := oidtest.Address() p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) rngPrm := newRngPrm(false, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) headPrm := newHeadPrm(false, nil) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) }) t.Run("VIRTUAL", func(t *testing.T) { storage := newTestStorage() svc := newSvc(storage) p := newPrm(true, nil) addr := oidtest.Address() splitInfo := objectSDK.NewSplitInfo() splitInfo.SetSplitID(objectSDK.NewSplitID()) splitInfo.SetLink(oidtest.ID()) splitInfo.SetLastPart(oidtest.ID()) p.WithAddress(addr) storage.addVirtual(addr, splitInfo) err := svc.Get(ctx, p) errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) require.True(t, errors.As(err, &errSplit)) require.Equal(t, splitInfo, errSplit.SplitInfo()) rngPrm := newRngPrm(true, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.Get(ctx, p) require.True(t, errors.As(err, &errSplit)) headPrm := newHeadPrm(true, nil) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.True(t, errors.As(err, &errSplit)) require.Equal(t, splitInfo, errSplit.SplitInfo()) }) } func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) { mNodes := make([][]netmap.NodeInfo, len(dim)) mAddr := make([][]string, len(dim)) for i := range dim { ns := make([]netmap.NodeInfo, dim[i]) as := make([]string, dim[i]) for j := 0; j < dim[i]; j++ { a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s", strconv.Itoa(i), strconv.Itoa(60000+j), ) var ni netmap.NodeInfo ni.SetNetworkEndpoints(a) var na network.AddressGroup err := na.FromIterator(netmapcore.Node(ni)) require.NoError(t, err) as[j] = network.StringifyGroup(na) ns[j] = ni } mNodes[i] = ns mAddr[i] = as } return mNodes, mAddr } func generateChain(ln int, cnr cid.ID) ([]*objectSDK.Object, []oid.ID, []byte) { curID := oidtest.ID() var prevID *oid.ID var addr oid.Address addr.SetContainer(cnr) res := make([]*objectSDK.Object, 0, ln) ids := make([]oid.ID, 0, ln) payload := make([]byte, 0, ln*10) for i := 0; i < ln; i++ { ids = append(ids, curID) addr.SetObject(curID) payloadPart := make([]byte, 10) rand.Read(payloadPart) o := generateObject(addr, prevID, []byte{byte(i)}) o.SetPayload(payloadPart) o.SetPayloadSize(uint64(len(payloadPart))) o.SetID(curID) payload = append(payload, payloadPart...) res = append(res, o) cpCurID := curID prevID = &cpCurID curID = oidtest.ID() } return res, ids, payload } func TestGetRemoteSmall(t *testing.T) { ctx := context.Background() var cnr container.Container cnr.SetPlacementPolicy(netmaptest.PlacementPolicy()) var idCnr cid.ID container.CalculateID(&idCnr, cnr) newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { const curEpoch = 13 return &Service{ log: test.NewLogger(t, true), localStorage: newTestStorage(), traverserGenerator: &testTraverserGenerator{ c: cnr, b: map[uint64]placement.Builder{ curEpoch: b, }, }, epochSource: testEpochReceiver(curEpoch), remoteStorageConstructor: c, keyStore: &testKeyStorage{}, } } newPrm := func(raw bool, w ObjectWriter) Prm { p := Prm{} p.SetObjectWriter(w) p.WithRawFlag(raw) p.common = new(util.CommonPrm).WithLocalOnly(false) return p } newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm { p := RangePrm{} p.SetChunkWriter(w) p.WithRawFlag(raw) p.common = new(util.CommonPrm).WithLocalOnly(false) r := objectSDK.NewRange() r.SetOffset(off) r.SetLength(ln) p.SetRange(r) return p } newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm { p := HeadPrm{} p.SetHeaderWriter(w) p.WithRawFlag(raw) p.common = new(util.CommonPrm).WithLocalOnly(false) return p } t.Run("OK", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) ns, as := testNodeMatrix(t, []int{2}) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, }, } payloadSz := uint64(10) payload := make([]byte, payloadSz) rand.Read(payload) obj := generateObject(addr, nil, payload) c1 := newTestClient() c1.addResult(addr, obj, nil) c2 := newTestClient() c2.addResult(addr, nil, errors.New("any error")) svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, obj, w.Object()) *c1, *c2 = *c2, *c1 err = svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, obj, w.Object()) w = NewSimpleObjectWriter() rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.Object().Payload()) w = NewSimpleObjectWriter() headPrm := newHeadPrm(false, w) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.NoError(t, err) require.Equal(t, obj.CutPayload(), w.Object()) }) t.Run("INHUMED", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) ns, as := testNodeMatrix(t, []int{2}) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, }, } c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, new(apistatus.ObjectAlreadyRemoved)) svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) p := newPrm(false, nil) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectAlreadyRemoved(err)) rngPrm := newRngPrm(false, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectAlreadyRemoved(err)) headPrm := newHeadPrm(false, nil) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.True(t, clientSDK.IsErrObjectAlreadyRemoved(err)) }) t.Run("404", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) ns, as := testNodeMatrix(t, []int{2}) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, }, } c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, errors.New("any error")) svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) p := newPrm(false, nil) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) rngPrm := newRngPrm(false, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) headPrm := newHeadPrm(false, nil) headPrm.WithAddress(addr) err = svc.Head(ctx, headPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) }) t.Run("VIRTUAL", func(t *testing.T) { testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) { headPrm := newHeadPrm(false, nil) headPrm.WithAddress(addr) errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) err := svc.Head(ctx, headPrm) require.True(t, errors.As(err, &errSplit)) require.Equal(t, i, errSplit.SplitInfo()) } t.Run("linking", func(t *testing.T) { t.Run("get linking failure", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) var splitAddr oid.Address splitAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() splitAddr.SetObject(idLink) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(splitAddr, nil, new(apistatus.ObjectNotFound)) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(splitAddr, nil, new(apistatus.ObjectNotFound)) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, splitAddr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) p := newPrm(false, nil) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) rngPrm := newRngPrm(false, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) }) t.Run("get chain element failure", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) srcObj.SetPayloadSize(10) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) children, childIDs, _ := generateChain(2, idCnr) var linkAddr oid.Address linkAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() linkAddr.SetObject(idLink) linkingObj := generateObject(linkAddr, nil, nil, childIDs...) linkingObj.SetParentID(addr.Object()) linkingObj.SetParent(srcObj) var child1Addr oid.Address child1Addr.SetContainer(idCnr) child1Addr.SetObject(childIDs[0]) var child2Addr oid.Address child2Addr.SetContainer(idCnr) child2Addr.SetObject(childIDs[1]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(linkAddr, nil, errors.New("any error")) c1.addResult(child1Addr, nil, errors.New("any error")) c1.addResult(child2Addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) c2.addResult(child2Addr, nil, new(apistatus.ObjectNotFound)) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, linkAddr.EncodeToString(): ns, child1Addr.EncodeToString(): ns, child2Addr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) p := newPrm(false, NewSimpleObjectWriter()) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) rngPrm := newRngPrm(false, NewSimpleObjectWriter(), 0, 1) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) }) t.Run("OK", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) children, childIDs, payload := generateChain(2, idCnr) srcObj.SetPayload(payload) srcObj.SetPayloadSize(uint64(len(payload))) children[len(children)-1].SetParent(srcObj) var linkAddr oid.Address linkAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() linkAddr.SetObject(idLink) linkingObj := generateObject(linkAddr, nil, nil, childIDs...) linkingObj.SetParentID(addr.Object()) linkingObj.SetParent(srcObj) var child1Addr oid.Address child1Addr.SetContainer(idCnr) child1Addr.SetObject(childIDs[0]) var child2Addr oid.Address child2Addr.SetContainer(idCnr) child2Addr.SetObject(childIDs[1]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(linkAddr, nil, errors.New("any error")) c1.addResult(child1Addr, nil, errors.New("any error")) c1.addResult(child2Addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) c2.addResult(child2Addr, children[1], nil) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, linkAddr.EncodeToString(): ns, child1Addr.EncodeToString(): ns, child2Addr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, srcObj, w.Object()) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 ln := payloadSz / 3 rngPrm := newRngPrm(false, w, off, ln) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) require.Equal(t, payload[off:off+ln], w.Object().Payload()) }) t.Run("write header/payload failure", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) children, childIDs, payload := generateChain(2, idCnr) srcObj.SetPayload(payload) srcObj.SetPayloadSize(uint64(len(payload))) children[len(children)-1].SetParent(srcObj) var linkAddr oid.Address linkAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() linkAddr.SetObject(idLink) linkingObj := generateObject(linkAddr, nil, nil, childIDs...) linkingObj.SetParentID(addr.Object()) linkingObj.SetParent(srcObj) var child1Addr oid.Address child1Addr.SetContainer(idCnr) child1Addr.SetObject(childIDs[0]) var child2Addr oid.Address child2Addr.SetContainer(idCnr) child2Addr.SetObject(childIDs[1]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(linkAddr, nil, errors.New("any error")) c1.addResult(child1Addr, nil, errors.New("any error")) c1.addResult(child2Addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) c2.addResult(child2Addr, children[1], nil) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, linkAddr.EncodeToString(): ns, child1Addr.EncodeToString(): ns, child2Addr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) wh := &writeHeaderErrorObjectWriter{} p := newPrm(false, wh) p.WithAddress(addr) err := svc.Get(ctx, p) require.ErrorAs(t, err, new(*writeHeaderError)) wp := &writePayloadErrorObjectWriter{} p = newPrm(false, wp) p.WithAddress(addr) err = svc.Get(ctx, p) require.ErrorAs(t, err, new(*writePayloadError)) }) t.Run("linked object not a child of parent", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) children, childIDs, payload := generateChain(2, idCnr) srcObj.SetPayload(payload) srcObj.SetPayloadSize(uint64(len(payload))) children[len(children)-1].SetParent(srcObj) var linkAddr oid.Address linkAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() linkAddr.SetObject(idLink) wrongParentAddr := oidtest.Address() wrongParentID := oidtest.ID() wrongParentAddr.SetObject(wrongParentID) wrongParentAddr.SetContainer(idCnr) wrongParent := generateObject(wrongParentAddr, nil, nil) linkingObj := generateObject(linkAddr, nil, nil, childIDs...) linkingObj.SetParentID(wrongParentID) linkingObj.SetParent(wrongParent) var child1Addr oid.Address child1Addr.SetContainer(idCnr) child1Addr.SetObject(childIDs[0]) var child2Addr oid.Address child2Addr.SetContainer(idCnr) child2Addr.SetObject(childIDs[1]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(linkAddr, nil, errors.New("any error")) c1.addResult(child1Addr, nil, errors.New("any error")) c1.addResult(child2Addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) c2.addResult(child2Addr, children[1], nil) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, linkAddr.EncodeToString(): ns, child1Addr.EncodeToString(): ns, child2Addr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.ErrorIs(t, err, errParentAddressDiffers) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 ln := payloadSz / 3 rngPrm := newRngPrm(false, w, off, ln) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.ErrorIs(t, err, errParentAddressDiffers) }) t.Run("linked object with parent udefined", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) children, childIDs, payload := generateChain(2, idCnr) srcObj.SetPayload(payload) srcObj.SetPayloadSize(uint64(len(payload))) children[len(children)-1].SetParent(srcObj) var linkAddr oid.Address linkAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() linkAddr.SetObject(idLink) linkingObj := generateObject(linkAddr, nil, nil, childIDs...) var child1Addr oid.Address child1Addr.SetContainer(idCnr) child1Addr.SetObject(childIDs[0]) var child2Addr oid.Address child2Addr.SetContainer(idCnr) child2Addr.SetObject(childIDs[1]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(linkAddr, nil, errors.New("any error")) c1.addResult(child1Addr, nil, errors.New("any error")) c1.addResult(child2Addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) c2.addResult(child2Addr, children[1], nil) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, linkAddr.EncodeToString(): ns, child1Addr.EncodeToString(): ns, child2Addr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.Error(t, err) require.ErrorIs(t, err, errChildWithEmptyParent) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 ln := payloadSz / 3 rngPrm := newRngPrm(false, w, off, ln) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.Error(t, err) require.ErrorIs(t, err, errChildWithEmptyParent) }) t.Run("out of range", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) children, childIDs, payload := generateChain(2, idCnr) srcObj.SetPayload(payload) srcObj.SetPayloadSize(uint64(len(payload))) children[len(children)-1].SetParent(srcObj) var linkAddr oid.Address linkAddr.SetContainer(idCnr) idLink, _ := splitInfo.Link() linkAddr.SetObject(idLink) linkingObj := generateObject(linkAddr, nil, nil, childIDs...) linkingObj.SetParentID(addr.Object()) linkingObj.SetParent(srcObj) var child1Addr oid.Address child1Addr.SetContainer(idCnr) child1Addr.SetObject(childIDs[0]) var child2Addr oid.Address child2Addr.SetContainer(idCnr) child2Addr.SetObject(childIDs[1]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(linkAddr, nil, errors.New("any error")) c1.addResult(child1Addr, nil, errors.New("any error")) c1.addResult(child2Addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(linkAddr, linkingObj, nil) c2.addResult(child1Addr, children[0], nil) c2.addResult(child2Addr, children[1], nil) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, linkAddr.EncodeToString(): ns, child1Addr.EncodeToString(): ns, child2Addr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newRngPrm(false, w, uint64(len(payload)), uint64(len(payload))) p.WithAddress(addr) err := svc.GetRange(ctx, p) require.ErrorAs(t, err, new(*apistatus.ObjectOutOfRange)) }) }) t.Run("right child", func(t *testing.T) { t.Run("get right child failure", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) var splitAddr oid.Address splitAddr.SetContainer(idCnr) idLast, _ := splitInfo.LastPart() splitAddr.SetObject(idLast) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(splitAddr, nil, new(apistatus.ObjectNotFound)) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(splitAddr, nil, new(apistatus.ObjectNotFound)) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, splitAddr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) p := newPrm(false, nil) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) rngPrm := newRngPrm(false, nil, 0, 0) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) }) t.Run("get chain element failure", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) srcObj.SetPayloadSize(11) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) children, _, _ := generateChain(2, idCnr) var rightAddr oid.Address rightAddr.SetContainer(idCnr) idLast, _ := splitInfo.LastPart() rightAddr.SetObject(idLast) rightObj := children[len(children)-1] rightObj.SetParentID(addr.Object()) rightObj.SetParent(srcObj) preRightAddr := object.AddressOf(children[len(children)-2]) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c1.addResult(rightAddr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) c2.addResult(rightAddr, rightObj, nil) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns, rightAddr.EncodeToString(): ns, preRightAddr.EncodeToString(): ns, }, } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) headSvc := newTestClient() headSvc.addResult(preRightAddr, nil, new(apistatus.ObjectNotFound)) p := newPrm(false, NewSimpleObjectWriter()) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) rngPrm := newRngPrm(false, nil, 0, 1) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.True(t, clientSDK.IsErrObjectNotFound(err)) }) t.Run("child has different parent", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) children, _, payload := generateChain(2, idCnr) srcObj.SetPayloadSize(uint64(len(payload))) srcObj.SetPayload(payload) wrongParentAddr := oidtest.Address() wrongParentID := oidtest.ID() wrongParentAddr.SetObject(wrongParentID) wrongParentAddr.SetContainer(idCnr) wrongParent := generateObject(wrongParentAddr, nil, nil) rightObj := children[len(children)-1] idLast, _ := splitInfo.LastPart() rightObj.SetID(idLast) rightObj.SetParentID(addr.Object()) rightObj.SetParent(srcObj) firstObj := children[0] firstObj.SetParent(wrongParent) firstObj.SetParentID(wrongParentID) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) for i := range children { c1.addResult(object.AddressOf(children[i]), nil, errors.New("any error")) } c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) for i := range children { c2.addResult(object.AddressOf(children[i]), children[i], nil) } builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{}, } builder.vectors[addr.EncodeToString()] = ns for i := range children { builder.vectors[object.AddressOf(children[i]).EncodeToString()] = ns } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.ErrorIs(t, err, errParentAddressDiffers) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 ln := payloadSz / 3 rngPrm := newRngPrm(false, w, off, ln) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.ErrorIs(t, err, errParentAddressDiffers) }) t.Run("OK", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) srcObj := generateObject(addr, nil, nil) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) children, _, payload := generateChain(2, idCnr) srcObj.SetPayloadSize(uint64(len(payload))) srcObj.SetPayload(payload) rightObj := children[len(children)-1] idLast, _ := splitInfo.LastPart() rightObj.SetID(idLast) rightObj.SetParentID(addr.Object()) rightObj.SetParent(srcObj) c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) for i := range children { c1.addResult(object.AddressOf(children[i]), nil, errors.New("any error")) } c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) for i := range children { c2.addResult(object.AddressOf(children[i]), children[i], nil) } builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{}, } builder.vectors[addr.EncodeToString()] = ns for i := range children { builder.vectors[object.AddressOf(children[i]).EncodeToString()] = ns } svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, srcObj, w.Object()) w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 ln := payloadSz / 3 rngPrm := newRngPrm(false, w, off, ln) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) require.Equal(t, payload[off:off+ln], w.Object().Payload()) w = NewSimpleObjectWriter() off = payloadSz - 2 ln = 1 rngPrm = newRngPrm(false, w, off, ln) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) require.Equal(t, payload[off:off+ln], w.Object().Payload()) }) }) t.Run("corrupted source object", func(t *testing.T) { addr := oidtest.Address() addr.SetContainer(idCnr) addr.SetObject(oidtest.ID()) ns, as := testNodeMatrix(t, []int{2}) splitInfo := objectSDK.NewSplitInfo() c1 := newTestClient() c1.addResult(addr, nil, errors.New("any error")) c2 := newTestClient() c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo)) builder := &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{}, } builder.vectors[addr.EncodeToString()] = ns svc := newSvc(builder, &testClientCache{ clients: map[string]*testClient{ as[0][0]: c1, as[0][1]: c2, }, }) testHeadVirtual(svc, addr, splitInfo) w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.ErrorAs(t, err, new(*objectSDK.SplitInfoError)) }) }) } func TestGetFromPastEpoch(t *testing.T) { ctx := context.Background() var cnr container.Container cnr.SetPlacementPolicy(netmaptest.PlacementPolicy()) var idCnr cid.ID container.CalculateID(&idCnr, cnr) addr := oidtest.Address() addr.SetContainer(idCnr) payloadSz := uint64(10) payload := make([]byte, payloadSz) _, _ = rand.Read(payload) obj := generateObject(addr, nil, payload) ns, as := testNodeMatrix(t, []int{2, 2}) c11 := newTestClient() c11.addResult(addr, nil, errors.New("any error")) c12 := newTestClient() c12.addResult(addr, nil, errors.New("any error")) c21 := newTestClient() c21.addResult(addr, nil, errors.New("any error")) c22 := newTestClient() c22.addResult(addr, obj, nil) const curEpoch = 13 svc := &Service{ log: test.NewLogger(t, true), localStorage: newTestStorage(), epochSource: testEpochReceiver(curEpoch), traverserGenerator: &testTraverserGenerator{ c: cnr, b: map[uint64]placement.Builder{ curEpoch: &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns[:1], }, }, curEpoch - 1: &testPlacementBuilder{ vectors: map[string][][]netmap.NodeInfo{ addr.EncodeToString(): ns[1:], }, }, }, }, remoteStorageConstructor: &testClientCache{ clients: map[string]*testClient{ as[0][0]: c11, as[0][1]: c12, as[1][0]: c21, as[1][1]: c22, }, }, keyStore: &testKeyStorage{}, } w := NewSimpleObjectWriter() commonPrm := new(util.CommonPrm) p := Prm{} p.SetObjectWriter(w) p.SetCommonParameters(commonPrm) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, clientSDK.IsErrObjectNotFound(err)) commonPrm.SetNetmapLookupDepth(1) err = svc.Get(ctx, p) require.NoError(t, err) require.Equal(t, obj, w.Object()) rp := RangePrm{} rp.SetChunkWriter(w) commonPrm.SetNetmapLookupDepth(0) rp.SetCommonParameters(commonPrm) rp.WithAddress(addr) off, ln := payloadSz/3, payloadSz/3 r := objectSDK.NewRange() r.SetOffset(off) r.SetLength(ln) rp.SetRange(r) err = svc.GetRange(ctx, rp) require.True(t, clientSDK.IsErrObjectNotFound(err)) w = NewSimpleObjectWriter() rp.SetChunkWriter(w) commonPrm.SetNetmapLookupDepth(1) err = svc.GetRange(ctx, rp) require.NoError(t, err) require.Equal(t, payload[off:off+ln], w.Object().Payload()) hp := HeadPrm{} hp.SetHeaderWriter(w) commonPrm.SetNetmapLookupDepth(0) hp.SetCommonParameters(commonPrm) hp.WithAddress(addr) err = svc.Head(ctx, hp) require.True(t, clientSDK.IsErrObjectNotFound(err)) w = NewSimpleObjectWriter() hp.SetHeaderWriter(w) commonPrm.SetNetmapLookupDepth(1) err = svc.Head(ctx, hp) require.NoError(t, err) require.Equal(t, obj.CutPayload(), w.Object()) }