Compare commits

...

24 commits

Author SHA1 Message Date
3889e829e6 [#667] writecache: Add logs for report error func in tests
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-09-14 17:02:54 +00:00
10570fc035 [#690] go.mod: Update contract and api-go
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-14 14:50:30 +03:00
c6af4a3ec8 [#679] engine: Do not increase error counter on meta mismatch
It was introduced in 69e1e6ca to help node determine faulty shards.
However, the situation is possible in a real-life scenario:
1. Object O is evacuated from shard A to B.
2. Shard A is unmounted because of lower-level errors.
3. We now have object in meta on A and in blobstor on B. Technically we
   have it in meta on shard B too, but we still got the error if B goes
   to a degraded mode.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-14 10:39:18 +03:00
58239d1b2c [#683] cli: Add context to policy parsing errors
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-12 09:47:21 +00:00
3c76884182 [#682] cli: Unify array of ranges type
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-12 10:42:34 +03:00
f435ab1b26 [#682] go.mod: Update sdk-go
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-12 10:42:34 +03:00
aa9f8dce3d [#677] client: Refactor PrmAnnounceSpace/EndpointInfo/NetworkInfo usage
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-08 09:42:28 +03:00
8a81af5a3b [#653] Add context parameter to Open functions
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2023-09-07 18:03:29 +03:00
a716db99db [#668] shard/test: Do not alter rootPath option
Supposedly, this was added to allow creating 2 different shards without
subtest. Now we use t.TempDir() everywhere, so this should not be a
problem.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
36759f8434 [#668] shard/test: Properly check event processing
See https://git.frostfs.info/TrueCloudLab/frostfs-node/actions/runs/1594/jobs/2

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
39879fa868 [#668] shard/test: Add dontRelease options
Most of the time we would like to close shard with minor exceptions.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
c661ba1312 [#668] shard/test: Use sane defaults in the test constructor
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
268adb79cb [#668] shard/test: Simplify shard construction
newCustomShard() has many parameters but only the first is obligatory.
`enableWriteCache` is left as-is, because it directly affects the
functionality.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
429f941cda [#668] shard/test: Release shard in t.Cleanup()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
382eb8a485 [#668] shard/test: Disable GC where it is not needed
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
42696016de [#668] shard: Close stopChannel in GC
It is done once, but now we could read it from multiple places.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
bdecfbc1be [#668] shard/test: Move tests to the main package
Semantic patch (also, duplicate definitions are removed):
```
@@
var e identifier
@@
-import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"

-shard.e
+e
```

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
aa23c6a83a [#668] shard/test: Remove subtest from TestCounters
Otherwise, individual tests cannot be run.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
da8f384324 [#668] shard/test: Fix typo in existence
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-07 07:39:39 +00:00
aeeb8193d2 [#676] node: Fix header source creation when checking eacl
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-09-06 17:06:54 +03:00
d9de9e2bbb [#675] client: Refactor PrmObjectDelete usage
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-09-06 08:05:47 +00:00
88d50e4c77 [#656] policer: Add "bad" testcase
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-09-06 08:04:59 +00:00
054e3ef3d3 [#674] pre-commit: Update shellcheck-py
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-04 15:34:24 +03:00
a54b4472de [#674] network: Close connections on address updates
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-04 15:34:24 +03:00
73 changed files with 469 additions and 430 deletions

View file

@ -26,7 +26,7 @@ repos:
exclude: ".key$" exclude: ".key$"
- repo: https://github.com/shellcheck-py/shellcheck-py - repo: https://github.com/shellcheck-py/shellcheck-py
rev: v0.9.0.2 rev: v0.9.0.5
hooks: hooks:
- id: shellcheck - id: shellcheck

View file

@ -8,6 +8,7 @@ import (
"io" "io"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -224,8 +225,8 @@ func SetEACL(ctx context.Context, prm SetEACLPrm) (res SetEACLRes, err error) {
// NetworkInfoPrm groups parameters of NetworkInfo operation. // NetworkInfoPrm groups parameters of NetworkInfo operation.
type NetworkInfoPrm struct { type NetworkInfoPrm struct {
commonPrm Client *client.Client
client.PrmNetworkInfo ClientParams client.PrmNetworkInfo
} }
// NetworkInfoRes groups the resulting values of NetworkInfo operation. // NetworkInfoRes groups the resulting values of NetworkInfo operation.
@ -242,15 +243,15 @@ func (x NetworkInfoRes) NetworkInfo() netmap.NetworkInfo {
// //
// Returns any error which prevented the operation from completing correctly in error return. // Returns any error which prevented the operation from completing correctly in error return.
func NetworkInfo(ctx context.Context, prm NetworkInfoPrm) (res NetworkInfoRes, err error) { func NetworkInfo(ctx context.Context, prm NetworkInfoPrm) (res NetworkInfoRes, err error) {
res.cliRes, err = prm.cli.NetworkInfo(ctx, prm.PrmNetworkInfo) res.cliRes, err = prm.Client.NetworkInfo(ctx, prm.ClientParams)
return return
} }
// NodeInfoPrm groups parameters of NodeInfo operation. // NodeInfoPrm groups parameters of NodeInfo operation.
type NodeInfoPrm struct { type NodeInfoPrm struct {
commonPrm Client *client.Client
client.PrmEndpointInfo ClientParams client.PrmEndpointInfo
} }
// NodeInfoRes groups the resulting values of NodeInfo operation. // NodeInfoRes groups the resulting values of NodeInfo operation.
@ -272,7 +273,7 @@ func (x NodeInfoRes) LatestVersion() version.Version {
// //
// Returns any error which prevented the operation from completing correctly in error return. // Returns any error which prevented the operation from completing correctly in error return.
func NodeInfo(ctx context.Context, prm NodeInfoPrm) (res NodeInfoRes, err error) { func NodeInfo(ctx context.Context, prm NodeInfoPrm) (res NodeInfoRes, err error) {
res.cliRes, err = prm.cli.EndpointInfo(ctx, prm.PrmEndpointInfo) res.cliRes, err = prm.Client.EndpointInfo(ctx, prm.ClientParams)
return return
} }
@ -507,20 +508,17 @@ func (x DeleteObjectRes) Tombstone() oid.ID {
// //
// Returns any error which prevented the operation from completing correctly in error return. // Returns any error which prevented the operation from completing correctly in error return.
func DeleteObject(ctx context.Context, prm DeleteObjectPrm) (*DeleteObjectRes, error) { func DeleteObject(ctx context.Context, prm DeleteObjectPrm) (*DeleteObjectRes, error) {
var delPrm client.PrmObjectDelete cnr := prm.objAddr.Container()
delPrm.FromContainer(prm.objAddr.Container()) obj := prm.objAddr.Object()
delPrm.ByID(prm.objAddr.Object())
if prm.sessionToken != nil { delPrm := client.PrmObjectDelete{
delPrm.WithinSession(*prm.sessionToken) XHeaders: prm.xHeaders,
ContainerID: &cnr,
ObjectID: &obj,
Session: prm.sessionToken,
BearerToken: prm.bearerToken,
} }
if prm.bearerToken != nil {
delPrm.WithBearerToken(*prm.bearerToken)
}
delPrm.WithXHeaders(prm.xHeaders...)
cliRes, err := prm.cli.ObjectDelete(ctx, delPrm) cliRes, err := prm.cli.ObjectDelete(ctx, delPrm)
if err != nil { if err != nil {
return nil, fmt.Errorf("remove object via client: %w", err) return nil, fmt.Errorf("remove object via client: %w", err)
@ -741,7 +739,7 @@ type HashPayloadRangesPrm struct {
tz bool tz bool
rngs []*objectSDK.Range rngs []objectSDK.Range
salt []byte salt []byte
} }
@ -752,7 +750,7 @@ func (x *HashPayloadRangesPrm) TZ() {
} }
// SetRanges sets a list of payload ranges to hash. // SetRanges sets a list of payload ranges to hash.
func (x *HashPayloadRangesPrm) SetRanges(rngs []*objectSDK.Range) { func (x *HashPayloadRangesPrm) SetRanges(rngs []objectSDK.Range) {
x.rngs = rngs x.rngs = rngs
} }
@ -776,39 +774,25 @@ func (x HashPayloadRangesRes) HashList() [][]byte {
// Returns any error which prevented the operation from completing correctly in error return. // Returns any error which prevented the operation from completing correctly in error return.
// Returns an error if number of received hashes differs with the number of requested ranges. // Returns an error if number of received hashes differs with the number of requested ranges.
func HashPayloadRanges(ctx context.Context, prm HashPayloadRangesPrm) (*HashPayloadRangesRes, error) { func HashPayloadRanges(ctx context.Context, prm HashPayloadRangesPrm) (*HashPayloadRangesRes, error) {
var cliPrm client.PrmObjectHash cs := checksum.SHA256
cliPrm.FromContainer(prm.objAddr.Container())
cliPrm.ByID(prm.objAddr.Object())
if prm.local {
cliPrm.MarkLocal()
}
cliPrm.UseSalt(prm.salt)
rngs := make([]uint64, 2*len(prm.rngs))
for i := range prm.rngs {
rngs[2*i] = prm.rngs[i].GetOffset()
rngs[2*i+1] = prm.rngs[i].GetLength()
}
cliPrm.SetRangeList(rngs...)
if prm.tz { if prm.tz {
cliPrm.TillichZemorAlgo() cs = checksum.TZ
} }
if prm.sessionToken != nil { cnr := prm.objAddr.Container()
cliPrm.WithinSession(*prm.sessionToken) obj := prm.objAddr.Object()
cliPrm := client.PrmObjectHash{
ContainerID: &cnr,
ObjectID: &obj,
Local: prm.local,
Salt: prm.salt,
Ranges: prm.rngs,
ChecksumType: cs,
Session: prm.sessionToken,
BearerToken: prm.bearerToken,
XHeaders: prm.xHeaders,
} }
if prm.bearerToken != nil {
cliPrm.WithBearerToken(*prm.bearerToken)
}
cliPrm.WithXHeaders(prm.xHeaders...)
res, err := prm.cli.ObjectHash(ctx, cliPrm) res, err := prm.cli.ObjectHash(ctx, cliPrm)
if err != nil { if err != nil {
return nil, fmt.Errorf("read payload hashes via client: %w", err) return nil, fmt.Errorf("read payload hashes via client: %w", err)

View file

@ -185,12 +185,12 @@ func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.Plac
return &result, nil return &result, nil
} }
if err = result.UnmarshalJSON([]byte(policyString)); err == nil { if err := result.UnmarshalJSON([]byte(policyString)); err == nil {
common.PrintVerbose(cmd, "Parsed JSON encoded policy") common.PrintVerbose(cmd, "Parsed JSON encoded policy")
return &result, nil return &result, nil
} }
return nil, errors.New("can't parse placement policy") return nil, fmt.Errorf("can't parse placement policy: %w", err)
} }
func parseAttributes(dst *container.Container, attributes []string) error { func parseAttributes(dst *container.Container, attributes []string) error {

View file

@ -16,8 +16,9 @@ var getEpochCmd = &cobra.Command{
p := key.GetOrGenerate(cmd) p := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC) cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC)
var prm internalclient.NetworkInfoPrm prm := internalclient.NetworkInfoPrm{
prm.SetClient(cli) Client: cli,
}
res, err := internalclient.NetworkInfo(cmd.Context(), prm) res, err := internalclient.NetworkInfo(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err) commonCmd.ExitOnErr(cmd, "rpc error: %w", err)

View file

@ -20,8 +20,9 @@ var netInfoCmd = &cobra.Command{
p := key.GetOrGenerate(cmd) p := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC) cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC)
var prm internalclient.NetworkInfoPrm prm := internalclient.NetworkInfoPrm{
prm.SetClient(cli) Client: cli,
}
res, err := internalclient.NetworkInfo(cmd.Context(), prm) res, err := internalclient.NetworkInfo(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err) commonCmd.ExitOnErr(cmd, "rpc error: %w", err)

View file

@ -22,8 +22,9 @@ var nodeInfoCmd = &cobra.Command{
p := key.GetOrGenerate(cmd) p := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC) cli := internalclient.GetSDKClientByFlag(cmd, p, commonflags.RPC)
var prm internalclient.NodeInfoPrm prm := internalclient.NodeInfoPrm{
prm.SetClient(cli) Client: cli,
}
res, err := internalclient.NodeInfo(cmd.Context(), prm) res, err := internalclient.NodeInfo(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err) commonCmd.ExitOnErr(cmd, "rpc error: %w", err)

View file

@ -84,7 +84,7 @@ func getObjectRange(cmd *cobra.Command, _ []string) {
raw, _ := cmd.Flags().GetBool(rawFlag) raw, _ := cmd.Flags().GetBool(rawFlag)
prm.SetRawFlag(raw) prm.SetRawFlag(raw)
prm.SetAddress(objAddr) prm.SetAddress(objAddr)
prm.SetRange(ranges[0]) prm.SetRange(&ranges[0])
prm.SetPayloadWriter(out) prm.SetPayloadWriter(out)
_, err = internalclient.PayloadRange(cmd.Context(), prm) _, err = internalclient.PayloadRange(cmd.Context(), prm)
@ -146,13 +146,13 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
} }
} }
func getRangeList(cmd *cobra.Command) ([]*objectSDK.Range, error) { func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
v := cmd.Flag("range").Value.String() v := cmd.Flag("range").Value.String()
if len(v) == 0 { if len(v) == 0 {
return nil, nil return nil, nil
} }
vs := strings.Split(v, ",") vs := strings.Split(v, ",")
rs := make([]*objectSDK.Range, len(vs)) rs := make([]objectSDK.Range, len(vs))
for i := range vs { for i := range vs {
before, after, found := strings.Cut(vs[i], rangeSep) before, after, found := strings.Cut(vs[i], rangeSep)
if !found { if !found {
@ -176,7 +176,6 @@ func getRangeList(cmd *cobra.Command) ([]*objectSDK.Range, error) {
return nil, fmt.Errorf("invalid '%s' range: uint64 overflow", vs[i]) return nil, fmt.Errorf("invalid '%s' range: uint64 overflow", vs[i])
} }
rs[i] = objectSDK.NewRange()
rs[i].SetOffset(offset) rs[i].SetOffset(offset)
rs[i].SetLength(length) rs[i].SetLength(length)
} }

View file

@ -91,8 +91,9 @@ func createSession(cmd *cobra.Command, _ []string) {
// //
// Fills ID, lifetime and session key. // Fills ID, lifetime and session key.
func CreateSession(ctx context.Context, dst *session.Object, c *client.Client, lifetime uint64) error { func CreateSession(ctx context.Context, dst *session.Object, c *client.Client, lifetime uint64) error {
var netInfoPrm internalclient.NetworkInfoPrm netInfoPrm := internalclient.NetworkInfoPrm{
netInfoPrm.SetClient(c) Client: c,
}
ni, err := internalclient.NetworkInfo(ctx, netInfoPrm) ni, err := internalclient.NetworkInfo(ctx, netInfoPrm)
if err != nil { if err != nil {
@ -104,7 +105,7 @@ func CreateSession(ctx context.Context, dst *session.Object, c *client.Client, l
var sessionPrm internalclient.CreateSessionPrm var sessionPrm internalclient.CreateSessionPrm
sessionPrm.SetClient(c) sessionPrm.SetClient(c)
sessionPrm.SetExp(exp) sessionPrm.Expiration = exp
sessionRes, err := internalclient.CreateSession(ctx, sessionPrm) sessionRes, err := internalclient.CreateSession(ctx, sessionPrm)
if err != nil { if err != nil {

View file

@ -43,7 +43,7 @@ func openMeta(cmd *cobra.Command) *meta.DB {
}), }),
meta.WithEpochState(epochState{}), meta.WithEpochState(epochState{}),
) )
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(true))) common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(cmd.Context(), true)))
return db return db
} }

View file

@ -901,7 +901,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr return c.localAddr
} }
func initLocalStorage(c *cfg) { func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...) ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
@ -914,7 +914,7 @@ func initLocalStorage(c *cfg) {
var shardsAttached int var shardsAttached int
for _, optsWithMeta := range c.shardOpts() { for _, optsWithMeta := range c.shardOpts() {
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
if err != nil { if err != nil {
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
} else { } else {
@ -931,7 +931,7 @@ func initLocalStorage(c *cfg) {
c.onShutdown(func() { c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine) c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
err := ls.Close() err := ls.Close(context.Background())
if err != nil { if err != nil {
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure, c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
zap.String("error", err.Error()), zap.String("error", err.Error()),

View file

@ -383,9 +383,9 @@ func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
} }
func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error { func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error {
var cliPrm apiClient.PrmAnnounceSpace cliPrm := apiClient.PrmAnnounceSpace{
Announcements: r.buf,
cliPrm.SetValues(r.buf) }
_, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm) _, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm)
return err return err

View file

@ -91,10 +91,10 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) }) initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
initLocalStorage(c) initLocalStorage(ctx, c)
initAndLog(c, "storage engine", func(c *cfg) { initAndLog(c, "storage engine", func(c *cfg) {
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open()) fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open(ctx))
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx)) fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
}) })

10
go.mod
View file

@ -3,10 +3,10 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
go 1.20 go 1.20
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230627134746-36f3d39c406a git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230828082657-84e7e69f98ac git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29
@ -20,7 +20,7 @@ require (
github.com/mr-tron/base58 v1.2.0 github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.9.0 github.com/multiformats/go-multiaddr v0.9.0
github.com/nats-io/nats.go v1.27.1 github.com/nats-io/nats.go v1.27.1
github.com/nspcc-dev/neo-go v0.101.2-0.20230601131642-a0117042e8fc github.com/nspcc-dev/neo-go v0.101.5-0.20230808195420-5fc61be5f6c5
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.7.5 github.com/panjf2000/ants/v2 v2.7.5
github.com/paulmach/orb v0.9.2 github.com/paulmach/orb v0.9.2
@ -97,7 +97,7 @@ require (
github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230808195420-5fc61be5f6c5 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -53,6 +53,7 @@ const (
PolicerRoutineStopped = "routine stopped" PolicerRoutineStopped = "routine stopped"
PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" PolicerFailureAtObjectSelectForReplication = "failure at object select for replication"
PolicerPoolSubmission = "pool submission" PolicerPoolSubmission = "pool submission"
PolicerUnableToProcessObj = "unable to process object"
ReplicatorFinishWork = "finish work" ReplicatorFinishWork = "finish work"
ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage"
ReplicatorCouldNotReplicateObject = "could not replicate object" ReplicatorCouldNotReplicateObject = "could not replicate object"
@ -257,6 +258,7 @@ const (
ShardMetaObjectCounterRead = "meta: object counter read" ShardMetaObjectCounterRead = "meta: object counter read"
ShardMetaCantReadContainerList = "meta: can't read container list" ShardMetaCantReadContainerList = "meta: can't read container list"
ShardMetaCantReadContainerSize = "meta: can't read container size" ShardMetaCantReadContainerSize = "meta: can't read container size"
ShardMetaInfoPresentButObjectNotFound = "meta info was present, but the object is missing"
ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode" ShardMetabaseFailureSwitchingMode = "metabase failure, switching mode"
ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode" ShardCantMoveShardToReadonlySwitchMode = "can't move shard to readonly, switch mode"
ShardCouldNotUnmarshalObject = "could not unmarshal object" ShardCouldNotUnmarshalObject = "could not unmarshal object"

View file

@ -51,7 +51,7 @@ func TestCompression(t *testing.T) {
bs := New( bs := New(
WithCompressObjects(compress), WithCompressObjects(compress),
WithStorages(defaultStorages(dir, smallSizeLimit))) WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, bs.Open(false)) require.NoError(t, bs.Open(context.Background(), false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
return bs return bs
} }
@ -126,7 +126,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
Storage: fstree.New(fstree.WithPath(dir)), Storage: fstree.New(fstree.WithPath(dir)),
}, },
})) }))
require.NoError(t, bs.Open(false)) require.NoError(t, bs.Open(context.Background(), false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
return bs return bs
} }
@ -188,7 +188,7 @@ func TestConcurrentPut(t *testing.T) {
blobStor := New( blobStor := New(
WithStorages(defaultStorages(dir, smallSizeLimit))) WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, blobStor.Open(false)) require.NoError(t, blobStor.Open(context.Background(), false))
require.NoError(t, blobStor.Init()) require.NoError(t, blobStor.Init())
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
@ -268,7 +268,7 @@ func TestConcurrentDelete(t *testing.T) {
blobStor := New( blobStor := New(
WithStorages(defaultStorages(dir, smallSizeLimit))) WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, blobStor.Open(false)) require.NoError(t, blobStor.Open(context.Background(), false))
require.NoError(t, blobStor.Init()) require.NoError(t, blobStor.Init())
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {

View file

@ -1,6 +1,7 @@
package blobstor package blobstor
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
@ -9,10 +10,15 @@ import (
) )
// Open opens BlobStor. // Open opens BlobStor.
func (b *BlobStor) Open(readOnly bool) error { func (b *BlobStor) Open(ctx context.Context, readOnly bool) error {
b.log.Debug(logs.BlobstorOpening) b.log.Debug(logs.BlobstorOpening)
for i := range b.storage { for i := range b.storage {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := b.storage[i].Storage.Open(readOnly) err := b.storage[i].Storage.Open(readOnly)
if err != nil { if err != nil {
return err return err

View file

@ -20,7 +20,7 @@ func TestExists(t *testing.T) {
b := New(WithStorages(storages)) b := New(WithStorages(storages))
require.NoError(t, b.Open(false)) require.NoError(t, b.Open(context.Background(), false))
require.NoError(t, b.Init()) require.NoError(t, b.Init())
objects := []*objectSDK.Object{ objects := []*objectSDK.Object{

View file

@ -26,7 +26,7 @@ func TestIterateObjects(t *testing.T) {
defer os.RemoveAll(p) defer os.RemoveAll(p)
// open Blobstor // open Blobstor
require.NoError(t, blobStor.Open(false)) require.NoError(t, blobStor.Open(context.Background(), false))
// initialize Blobstor // initialize Blobstor
require.NoError(t, blobStor.Init()) require.NoError(t, blobStor.Init())

View file

@ -1,6 +1,7 @@
package blobstor package blobstor
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -21,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
err := b.Close() err := b.Close()
if err == nil { if err == nil {
if err = b.Open(m.ReadOnly()); err == nil { if err = b.Open(context.TODO(), m.ReadOnly()); err == nil {
err = b.Init() err = b.Init()
} }
} }

View file

@ -21,11 +21,11 @@ type shardInitError struct {
} }
// Open opens all StorageEngine's components. // Open opens all StorageEngine's components.
func (e *StorageEngine) Open() error { func (e *StorageEngine) Open(ctx context.Context) error {
return e.open() return e.open(ctx)
} }
func (e *StorageEngine) open() error { func (e *StorageEngine) open(ctx context.Context) error {
e.mtx.Lock() e.mtx.Lock()
defer e.mtx.Unlock() defer e.mtx.Unlock()
@ -36,7 +36,7 @@ func (e *StorageEngine) open() error {
wg.Add(1) wg.Add(1)
go func(id string, sh *shard.Shard) { go func(id string, sh *shard.Shard) {
defer wg.Done() defer wg.Done()
if err := sh.Open(); err != nil { if err := sh.Open(ctx); err != nil {
errCh <- shardInitError{ errCh <- shardInitError{
err: err, err: err,
id: id, id: id,
@ -148,10 +148,10 @@ var errClosed = errors.New("storage engine is closed")
// After the call, all the next ones will fail. // After the call, all the next ones will fail.
// //
// The method MUST only be called when the application exits. // The method MUST only be called when the application exits.
func (e *StorageEngine) Close() error { func (e *StorageEngine) Close(ctx context.Context) error {
close(e.closeCh) close(e.closeCh)
defer e.wg.Wait() defer e.wg.Wait()
return e.setBlockExecErr(errClosed) return e.setBlockExecErr(ctx, errClosed)
} }
// closes all shards. Never returns an error, shard errors are logged. // closes all shards. Never returns an error, shard errors are logged.
@ -197,7 +197,7 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error {
// - otherwise, resumes execution. If exec was blocked, calls open method. // - otherwise, resumes execution. If exec was blocked, calls open method.
// //
// Can be called concurrently with exec. In this case it waits for all executions to complete. // Can be called concurrently with exec. In this case it waits for all executions to complete.
func (e *StorageEngine) setBlockExecErr(err error) error { func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
e.blockExec.mtx.Lock() e.blockExec.mtx.Lock()
defer e.blockExec.mtx.Unlock() defer e.blockExec.mtx.Unlock()
@ -212,7 +212,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
if err == nil { if err == nil {
if prevErr != nil { // block -> ok if prevErr != nil { // block -> ok
return e.open() return e.open(ctx)
} }
} else if prevErr == nil { // ok -> block } else if prevErr == nil { // ok -> block
return e.close(errors.Is(err, errClosed)) return e.close(errors.Is(err, errClosed))
@ -235,7 +235,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution // Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
// for this. // for this.
func (e *StorageEngine) BlockExecution(err error) error { func (e *StorageEngine) BlockExecution(err error) error {
return e.setBlockExecErr(err) return e.setBlockExecErr(context.Background(), err)
} }
// ResumeExecution resumes the execution of any data-related operation. // ResumeExecution resumes the execution of any data-related operation.
@ -247,7 +247,7 @@ func (e *StorageEngine) BlockExecution(err error) error {
// //
// Must not be called concurrently with either Open or Init. // Must not be called concurrently with either Open or Init.
func (e *StorageEngine) ResumeExecution() error { func (e *StorageEngine) ResumeExecution() error {
return e.setBlockExecErr(nil) return e.setBlockExecErr(context.Background(), nil)
} }
type ReConfiguration struct { type ReConfiguration struct {
@ -334,14 +334,14 @@ loop:
} }
for _, newID := range shardsToAdd { for _, newID := range shardsToAdd {
sh, err := e.createShard(rcfg.shards[newID]) sh, err := e.createShard(ctx, rcfg.shards[newID])
if err != nil { if err != nil {
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err) return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
} }
idStr := sh.ID().String() idStr := sh.ID().String()
err = sh.Open() err = sh.Open(ctx)
if err == nil { if err == nil {
err = sh.Init(ctx) err = sh.Init(ctx)
} }

View file

@ -126,7 +126,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
var configID string var configID string
e := New() e := New()
_, err := e.AddShard(opts...) _, err := e.AddShard(context.Background(), opts...)
if errOnAdd { if errOnAdd {
require.Error(t, err) require.Error(t, err)
// This branch is only taken when we cannot update shard ID in the metabase. // This branch is only taken when we cannot update shard ID in the metabase.
@ -144,7 +144,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
configID = calculateShardID(e.shards[id].Shard.DumpInfo()) configID = calculateShardID(e.shards[id].Shard.DumpInfo())
e.mtx.RUnlock() e.mtx.RUnlock()
err = e.Open() err = e.Open(context.Background())
if err == nil { if err == nil {
require.Error(t, e.Init(context.Background())) require.Error(t, e.Init(context.Background()))
} }
@ -193,7 +193,7 @@ func TestExecBlocks(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// close // close
require.NoError(t, e.Close()) require.NoError(t, e.Close(context.Background()))
// try exec after close // try exec after close
_, err = Head(context.Background(), e, addr) _, err = Head(context.Background(), e, addr)
@ -209,13 +209,13 @@ func TestPersistentShardID(t *testing.T) {
te := newEngineWithErrorThreshold(t, dir, 1) te := newEngineWithErrorThreshold(t, dir, 1)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close()) require.NoError(t, te.ng.Close(context.Background()))
newTe := newEngineWithErrorThreshold(t, dir, 1) newTe := newEngineWithErrorThreshold(t, dir, 1)
for i := 0; i < len(newTe.shards); i++ { for i := 0; i < len(newTe.shards); i++ {
require.Equal(t, te.shards[i].id, newTe.shards[i].id) require.Equal(t, te.shards[i].id, newTe.shards[i].id)
} }
require.NoError(t, newTe.ng.Close()) require.NoError(t, newTe.ng.Close(context.Background()))
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
@ -227,7 +227,7 @@ func TestPersistentShardID(t *testing.T) {
newTe = newEngineWithErrorThreshold(t, dir, 1) newTe = newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, te.shards[1].id, newTe.shards[0].id) require.Equal(t, te.shards[1].id, newTe.shards[0].id)
require.Equal(t, te.shards[0].id, newTe.shards[1].id) require.Equal(t, te.shards[0].id, newTe.shards[1].id)
require.NoError(t, newTe.ng.Close()) require.NoError(t, newTe.ng.Close(context.Background()))
} }
@ -313,7 +313,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
require.Equal(t, num, len(e.shards)) require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools)) require.Equal(t, num, len(e.shardPools))
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
return e, currShards return e, currShards

View file

@ -54,7 +54,7 @@ func TestDeleteBigObject(t *testing.T) {
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
e.log = test.NewLogger(t, true) e.log = test.NewLogger(t, true)
defer e.Close() defer e.Close(context.Background())
for i := range children { for i := range children {
require.NoError(t, Put(context.Background(), e, children[i])) require.NoError(t, Put(context.Background(), e, children[i]))

View file

@ -50,7 +50,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
e := testNewEngine(b).setInitializedShards(b, shards...).engine e := testNewEngine(b).setInitializedShards(b, shards...).engine
b.Cleanup(func() { b.Cleanup(func() {
_ = e.Close() _ = e.Close(context.Background())
_ = os.RemoveAll(b.Name()) _ = os.RemoveAll(b.Name())
}) })
@ -119,7 +119,7 @@ func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrap
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper { func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
opts := shardOpts(i) opts := shardOpts(i)
id, err := te.engine.AddShard(opts...) id, err := te.engine.AddShard(context.Background(), opts...)
require.NoError(t, err) require.NoError(t, err)
te.shardIDs = append(te.shardIDs, id) te.shardIDs = append(te.shardIDs, id)
} }
@ -130,7 +130,7 @@ func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, s
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
defaultOpts := testDefaultShardOptions(t, i) defaultOpts := testDefaultShardOptions(t, i)
opts := append(defaultOpts, shardOpts(i)...) opts := append(defaultOpts, shardOpts(i)...)
id, err := te.engine.AddShard(opts...) id, err := te.engine.AddShard(context.Background(), opts...)
require.NoError(t, err) require.NoError(t, err)
te.shardIDs = append(te.shardIDs, id) te.shardIDs = append(te.shardIDs, id)
} }
@ -190,7 +190,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...) shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
s := shard.New(shardOpts...) s := shard.New(shardOpts...)
require.NoError(t, s.Open()) require.NoError(t, s.Open(context.Background()))
require.NoError(t, s.Init(context.Background())) require.NoError(t, s.Init(context.Background()))
return s return s

View file

@ -68,7 +68,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
} }
}) })
e := te.engine e := te.engine
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
for i, id := range te.shardIDs { for i, id := range te.shardIDs {
@ -192,7 +192,7 @@ func TestBlobstorFailback(t *testing.T) {
} }
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close()) require.NoError(t, te.ng.Close(context.Background()))
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
@ -217,7 +217,7 @@ func TestBlobstorFailback(t *testing.T) {
require.True(t, shard.IsErrOutOfRange(err)) require.True(t, shard.IsErrOutOfRange(err))
} }
checkShardState(t, te.ng, te.shards[0].id, 2, mode.ReadOnly) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }

View file

@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
} }
}) })
e, ids := te.engine, te.shardIDs e, ids := te.engine, te.shardIDs
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -100,9 +101,11 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
if it.Object == nil { if it.Object == nil {
return GetRes{}, it.OutError return GetRes{}, it.OutError
} }
if it.ShardWithMeta.Shard != nil { if it.ShardWithMeta.Shard != nil && it.MetaError != nil {
e.reportShardError(it.ShardWithMeta, "meta info was present, but object is missing", e.log.Warn(logs.ShardMetaInfoPresentButObjectNotFound,
it.MetaError, zap.Stringer("address", prm.addr)) zap.Stringer("shard_id", it.ShardWithMeta.ID()),
zap.String("error", it.MetaError.Error()),
zap.Stringer("address", prm.addr))
} }
} }

View file

@ -43,7 +43,7 @@ func TestHeadRaw(t *testing.T) {
s2 := testNewShard(t, 2) s2 := testNewShard(t, 2)
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close() defer e.Close(context.Background())
var putPrmLeft shard.PutPrm var putPrmLeft shard.PutPrm
putPrmLeft.SetObject(child) putPrmLeft.SetObject(child)

View file

@ -37,7 +37,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
t.Run("delete small object", func(t *testing.T) { t.Run("delete small object", func(t *testing.T) {
e := testNewEngine(t).setShardsNum(t, 1).engine e := testNewEngine(t).setShardsNum(t, 1).engine
defer e.Close() defer e.Close(context.Background())
err := Put(context.Background(), e, parent) err := Put(context.Background(), e, parent)
require.NoError(t, err) require.NoError(t, err)
@ -58,7 +58,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
s2 := testNewShard(t, 2) s2 := testNewShard(t, 2)
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close() defer e.Close(context.Background())
var putChild shard.PutPrm var putChild shard.PutPrm
putChild.SetObject(child) putChild.SetObject(child)

View file

@ -76,11 +76,11 @@ func TestListWithCursor(t *testing.T) {
meta.WithEpochState(epochState{}), meta.WithEpochState(epochState{}),
)} )}
}).engine }).engine
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
e.Close() e.Close(context.Background())
}) })
expected := make([]object.AddressWithType, 0, tt.objectNum) expected := make([]object.AddressWithType, 0, tt.objectNum)

View file

@ -59,11 +59,11 @@ func TestLockUserScenario(t *testing.T) {
} }
}) })
e := testEngine.engine e := testEngine.engine
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
_ = e.Close() _ = e.Close(context.Background())
}) })
lockerID := oidtest.ID() lockerID := oidtest.ID()
@ -167,11 +167,11 @@ func TestLockExpiration(t *testing.T) {
} }
}) })
e := testEngine.engine e := testEngine.engine
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
_ = e.Close() _ = e.Close(context.Background())
}) })
const lockerExpiresAfter = 13 const lockerExpiresAfter = 13
@ -247,10 +247,10 @@ func TestLockForceRemoval(t *testing.T) {
shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithDeletedLockCallback(e.processDeletedLocks),
} }
}).engine }).engine
require.NoError(t, e.Open()) require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
_ = e.Close() _ = e.Close(context.Background())
}) })
cnr := cidtest.ID() cnr := cidtest.ID()

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"strconv" "strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -113,9 +114,10 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error
if it.Object == nil { if it.Object == nil {
return RngRes{}, it.OutError return RngRes{}, it.OutError
} }
if it.ShardWithMeta.Shard != nil { if it.ShardWithMeta.Shard != nil && it.MetaError != nil {
e.reportShardError(it.ShardWithMeta, "meta info was present, but object is missing", e.log.Warn(logs.ShardMetaInfoPresentButObjectNotFound,
it.MetaError, zap.Stringer("shard_id", it.ShardWithMeta.ID()),
zap.String("error", it.MetaError.Error()),
zap.Stringer("address", prm.addr)) zap.Stringer("address", prm.addr))
} }
} }

View file

@ -1,6 +1,7 @@
package engine package engine
import ( import (
"context"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
@ -77,8 +78,8 @@ func (m *metricsWithID) DeleteShardMetrics() {
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.
// Otherwise returns the ID of the added shard. // Otherwise returns the ID of the added shard.
func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
sh, err := e.createShard(opts) sh, err := e.createShard(ctx, opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create a shard: %w", err) return nil, fmt.Errorf("could not create a shard: %w", err)
} }
@ -95,7 +96,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
return sh.ID(), nil return sh.ID(), nil
} }
func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
id, err := generateShardID() id, err := generateShardID()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err) return nil, fmt.Errorf("could not generate shard ID: %w", err)
@ -111,7 +112,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithReportErrorFunc(e.reportShardErrorBackground),
)...) )...)
if err := sh.UpdateID(); err != nil { if err := sh.UpdateID(ctx); err != nil {
return nil, fmt.Errorf("could not update shard ID: %w", err) return nil, fmt.Errorf("could not update shard ID: %w", err)
} }

View file

@ -1,6 +1,7 @@
package engine package engine
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -12,7 +13,7 @@ func TestRemoveShard(t *testing.T) {
te := testNewEngine(t).setShardsNum(t, numOfShards) te := testNewEngine(t).setShardsNum(t, numOfShards)
e, ids := te.engine, te.shardIDs e, ids := te.engine, te.shardIDs
t.Cleanup(func() { t.Cleanup(func() {
e.Close() e.Close(context.Background())
}) })
require.Equal(t, numOfShards, len(e.shardPools)) require.Equal(t, numOfShards, len(e.shardPools))

View file

@ -1,6 +1,7 @@
package storagetest package storagetest
import ( import (
"context"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -9,7 +10,7 @@ import (
// Component represents single storage component. // Component represents single storage component.
type Component interface { type Component interface {
Open(bool) error Open(context.Context, bool) error
SetMode(mode.Mode) error SetMode(mode.Mode) error
Init() error Init() error
Close() error Close() error
@ -57,18 +58,18 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
t.Run("RW", func(t *testing.T) { t.Run("RW", func(t *testing.T) {
// Use-case: irrecoverable error on some components, close everything. // Use-case: irrecoverable error on some components, close everything.
s := cons(t) s := cons(t)
require.NoError(t, s.Open(false)) require.NoError(t, s.Open(context.Background(), false))
require.NoError(t, s.Close()) require.NoError(t, s.Close())
}) })
t.Run("RO", func(t *testing.T) { t.Run("RO", func(t *testing.T) {
// Use-case: irrecoverable error on some components, close everything. // Use-case: irrecoverable error on some components, close everything.
// Open in read-only must be done after the db is here. // Open in read-only must be done after the db is here.
s := cons(t) s := cons(t)
require.NoError(t, s.Open(false)) require.NoError(t, s.Open(context.Background(), false))
require.NoError(t, s.Init()) require.NoError(t, s.Init())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
require.NoError(t, s.Open(true)) require.NoError(t, s.Open(context.Background(), true))
require.NoError(t, s.Close()) require.NoError(t, s.Close())
}) })
} }
@ -77,7 +78,7 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
func TestCloseTwice(t *testing.T, cons Constructor) { func TestCloseTwice(t *testing.T, cons Constructor) {
// Use-case: move to maintenance mode twice, first time failed. // Use-case: move to maintenance mode twice, first time failed.
s := cons(t) s := cons(t)
require.NoError(t, s.Open(false)) require.NoError(t, s.Open(context.Background(), false))
require.NoError(t, s.Init()) require.NoError(t, s.Init())
require.NoError(t, s.Close()) require.NoError(t, s.Close())
require.NoError(t, s.Close()) // already closed, no-op require.NoError(t, s.Close()) // already closed, no-op
@ -89,12 +90,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
// Use-case: metabase `Init` failed, // Use-case: metabase `Init` failed,
// call `SetMode` on all not-yet-initialized components. // call `SetMode` on all not-yet-initialized components.
s := cons(t) s := cons(t)
require.NoError(t, s.Open(false)) require.NoError(t, s.Open(context.Background(), false))
require.NoError(t, s.SetMode(m)) require.NoError(t, s.SetMode(m))
t.Run("after open in RO", func(t *testing.T) { t.Run("after open in RO", func(t *testing.T) {
require.NoError(t, s.Close()) require.NoError(t, s.Close())
require.NoError(t, s.Open(true)) require.NoError(t, s.Open(context.Background(), true))
require.NoError(t, s.SetMode(m)) require.NoError(t, s.SetMode(m))
}) })
@ -103,7 +104,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
t.Run("after init", func(t *testing.T) { t.Run("after init", func(t *testing.T) {
s := cons(t) s := cons(t)
// Use-case: notmal node operation. // Use-case: notmal node operation.
require.NoError(t, s.Open(false)) require.NoError(t, s.Open(context.Background(), false))
require.NoError(t, s.Init()) require.NoError(t, s.Init())
require.NoError(t, s.SetMode(m)) require.NoError(t, s.SetMode(m))
require.NoError(t, s.Close()) require.NoError(t, s.Close())
@ -113,7 +114,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) { func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
// Use-case: normal node operation. // Use-case: normal node operation.
s := cons(t) s := cons(t)
require.NoError(t, s.Open(false)) require.NoError(t, s.Open(context.Background(), false))
require.NoError(t, s.Init()) require.NoError(t, s.Init())
require.NoError(t, s.SetMode(from)) require.NoError(t, s.SetMode(from))
require.NoError(t, s.SetMode(to)) require.NoError(t, s.SetMode(to))

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"path/filepath" "path/filepath"
@ -21,7 +22,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode") var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
// Open boltDB instance for metabase. // Open boltDB instance for metabase.
func (db *DB) Open(readOnly bool) error { func (db *DB) Open(_ context.Context, readOnly bool) error {
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
if err != nil { if err != nil {
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)

View file

@ -1,6 +1,7 @@
package meta_test package meta_test
import ( import (
"context"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -49,7 +50,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB {
}, opts...)..., }, opts...)...,
) )
require.NoError(t, bdb.Open(false)) require.NoError(t, bdb.Open(context.Background(), false))
require.NoError(t, bdb.Init()) require.NoError(t, bdb.Init())
t.Cleanup(func() { t.Cleanup(func() {

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -27,9 +28,9 @@ func (db *DB) SetMode(m mode.Mode) error {
case m.NoMetabase(): case m.NoMetabase():
db.boltDB = nil db.boltDB = nil
case m.ReadOnly(): case m.ReadOnly():
err = db.Open(true) err = db.Open(context.TODO(), true)
default: default:
err = db.Open(false) err = db.Open(context.TODO(), false)
} }
if err == nil && !m.NoMetabase() && !m.ReadOnly() { if err == nil && !m.NoMetabase() && !m.ReadOnly() {
err = db.Init() err = db.Init()

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"context"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
@ -42,13 +43,13 @@ func TestVersion(t *testing.T) {
} }
t.Run("simple", func(t *testing.T) { t.Run("simple", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.NoError(t, db.Init()) require.NoError(t, db.Init())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
t.Run("reopen", func(t *testing.T) { t.Run("reopen", func(t *testing.T) {
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.NoError(t, db.Init()) require.NoError(t, db.Init())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
@ -56,29 +57,29 @@ func TestVersion(t *testing.T) {
}) })
t.Run("old data", func(t *testing.T) { t.Run("old data", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4})) require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
require.NoError(t, db.Close()) require.NoError(t, db.Close())
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.NoError(t, db.Init()) require.NoError(t, db.Init())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
}) })
t.Run("invalid version", func(t *testing.T) { t.Run("invalid version", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return updateVersion(tx, version+1) return updateVersion(tx, version+1)
})) }))
require.NoError(t, db.Close()) require.NoError(t, db.Close())
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.Error(t, db.Init()) require.Error(t, db.Init())
require.NoError(t, db.Close()) require.NoError(t, db.Close())
t.Run("reset", func(t *testing.T) { t.Run("reset", func(t *testing.T) {
require.NoError(t, db.Open(false)) require.NoError(t, db.Open(context.Background(), false))
require.NoError(t, db.Reset()) require.NoError(t, db.Reset())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())

View file

@ -26,7 +26,7 @@ func BenchmarkCreate(b *testing.B) {
f := NewBoltForest( f := NewBoltForest(
WithPath(filepath.Join(tmpDir, "test.db")), WithPath(filepath.Join(tmpDir, "test.db")),
WithMaxBatchSize(runtime.GOMAXPROCS(0))) WithMaxBatchSize(runtime.GOMAXPROCS(0)))
require.NoError(b, f.Open(false)) require.NoError(b, f.Open(context.Background(), false))
require.NoError(b, f.Init()) require.NoError(b, f.Init())
b.Cleanup(func() { b.Cleanup(func() {
require.NoError(b, f.Close()) require.NoError(b, f.Close())

View file

@ -99,7 +99,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
err := t.Close() err := t.Close()
if err == nil && !m.NoMetabase() { if err == nil && !m.NoMetabase() {
if err = t.Open(m.ReadOnly()); err == nil { if err = t.Open(context.TODO(), m.ReadOnly()); err == nil {
err = t.Init() err = t.Init()
} }
} }
@ -111,7 +111,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
t.metrics.SetMode(m) t.metrics.SetMode(m)
return nil return nil
} }
func (t *boltForest) Open(readOnly bool) error { func (t *boltForest) Open(_ context.Context, readOnly bool) error {
err := util.MkdirAllX(filepath.Dir(t.path), t.perm) err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
if err != nil { if err != nil {
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err)) return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))

View file

@ -110,7 +110,7 @@ func (f *memoryForest) Init() error {
return nil return nil
} }
func (f *memoryForest) Open(bool) error { func (f *memoryForest) Open(context.Context, bool) error {
return nil return nil
} }
func (f *memoryForest) SetMode(mode.Mode) error { func (f *memoryForest) SetMode(mode.Mode) error {

View file

@ -24,7 +24,7 @@ var providers = []struct {
}{ }{
{"inmemory", func(t testing.TB, _ ...Option) Forest { {"inmemory", func(t testing.TB, _ ...Option) Forest {
f := NewMemoryForest() f := NewMemoryForest()
require.NoError(t, f.Open(false)) require.NoError(t, f.Open(context.Background(), false))
require.NoError(t, f.Init()) require.NoError(t, f.Init())
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, f.Close()) require.NoError(t, f.Close())
@ -37,7 +37,7 @@ var providers = []struct {
append([]Option{ append([]Option{
WithPath(filepath.Join(t.TempDir(), "test.db")), WithPath(filepath.Join(t.TempDir(), "test.db")),
WithMaxBatchSize(1)}, opts...)...) WithMaxBatchSize(1)}, opts...)...)
require.NoError(t, f.Open(false)) require.NoError(t, f.Open(context.Background(), false))
require.NoError(t, f.Init()) require.NoError(t, f.Init())
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, f.Close()) require.NoError(t, f.Close())

View file

@ -58,7 +58,7 @@ type ForestStorage interface {
// DumpInfo returns information about the pilorama. // DumpInfo returns information about the pilorama.
DumpInfo() Info DumpInfo() Info
Init() error Init() error
Open(bool) error Open(context.Context, bool) error
Close() error Close() error
SetMode(m mode.Mode) error SetMode(m mode.Mode) error
SetParentID(id string) SetParentID(id string)

View file

@ -41,8 +41,10 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error {
} }
// Open opens all Shard's components. // Open opens all Shard's components.
func (s *Shard) Open() error { func (s *Shard) Open(ctx context.Context) error {
components := []interface{ Open(bool) error }{ components := []interface {
Open(context.Context, bool) error
}{
s.blobStor, s.metaBase, s.blobStor, s.metaBase,
} }
@ -55,12 +57,12 @@ func (s *Shard) Open() error {
} }
for i, component := range components { for i, component := range components {
if err := component.Open(false); err != nil { if err := component.Open(ctx, false); err != nil {
if component == s.metaBase { if component == s.metaBase {
// We must first open all other components to avoid // We must first open all other components to avoid
// opening non-existent DB in read-only mode. // opening non-existent DB in read-only mode.
for j := i + 1; j < len(components); j++ { for j := i + 1; j < len(components); j++ {
if err := components[j].Open(false); err != nil { if err := components[j].Open(ctx, false); err != nil {
// Other components must be opened, fail. // Other components must be opened, fail.
return fmt.Errorf("could not open %T: %w", components[j], err) return fmt.Errorf("could not open %T: %w", components[j], err)
} }

View file

@ -32,12 +32,6 @@ import (
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
type epochState struct{}
func (s epochState) CurrentEpoch() uint64 {
return 0
}
type objAddr struct { type objAddr struct {
obj *objectSDK.Object obj *objectSDK.Object
addr oid.Address addr oid.Address
@ -93,7 +87,7 @@ func TestShardOpen(t *testing.T) {
allowedMode.Store(int64(os.O_RDWR)) allowedMode.Store(int64(os.O_RDWR))
sh := newShard() sh := newShard()
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadWrite, sh.GetMode()) require.Equal(t, mode.ReadWrite, sh.GetMode())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
@ -102,7 +96,7 @@ func TestShardOpen(t *testing.T) {
allowedMode.Store(int64(os.O_RDONLY)) allowedMode.Store(int64(os.O_RDONLY))
sh = newShard() sh = newShard()
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadOnly, sh.GetMode()) require.Equal(t, mode.ReadOnly, sh.GetMode())
require.Error(t, sh.SetMode(mode.ReadWrite)) require.Error(t, sh.SetMode(mode.ReadWrite))
@ -113,7 +107,7 @@ func TestShardOpen(t *testing.T) {
allowedMode.Store(math.MaxInt64) allowedMode.Store(math.MaxInt64)
sh = newShard() sh = newShard()
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.DegradedReadOnly, sh.GetMode()) require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
@ -141,7 +135,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
WithBlobStorOptions(blobOpts...), WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{}))) WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
obj := objecttest.Object() obj := objecttest.Object()
@ -164,7 +158,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
WithRefillMetabase(true)) WithRefillMetabase(true))
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
var getPrm GetPrm var getPrm GetPrm
@ -203,7 +197,7 @@ func TestRefillMetabase(t *testing.T) {
) )
// open Blobstor // open Blobstor
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
// initialize Blobstor // initialize Blobstor
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
@ -371,7 +365,7 @@ func TestRefillMetabase(t *testing.T) {
) )
// open Blobstor // open Blobstor
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
// initialize Blobstor // initialize Blobstor
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -29,15 +28,14 @@ func TestShard_Delete(t *testing.T) {
func testShardDelete(t *testing.T, hasWriteCache bool) { func testShardDelete(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
cnr := cidtest.ID() cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr) obj := testutil.GenerateObjectWithCID(cnr)
testutil.AddAttribute(obj, "foo", "bar") testutil.AddAttribute(obj, "foo", "bar")
var putPrm shard.PutPrm var putPrm PutPrm
var getPrm shard.GetPrm var getPrm GetPrm
t.Run("big object", func(t *testing.T) { t.Run("big object", func(t *testing.T) {
testutil.AddPayload(obj, 1<<20) testutil.AddPayload(obj, 1<<20)
@ -45,7 +43,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
putPrm.SetObject(obj) putPrm.SetObject(obj)
getPrm.SetAddress(object.AddressOf(obj)) getPrm.SetAddress(object.AddressOf(obj))
var delPrm shard.DeletePrm var delPrm DeletePrm
delPrm.SetAddresses(object.AddressOf(obj)) delPrm.SetAddresses(object.AddressOf(obj))
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)
@ -71,7 +69,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
putPrm.SetObject(obj) putPrm.SetObject(obj)
getPrm.SetAddress(object.AddressOf(obj)) getPrm.SetAddress(object.AddressOf(obj))
var delPrm shard.DeletePrm var delPrm DeletePrm
delPrm.SetAddresses(object.AddressOf(obj)) delPrm.SetAddresses(object.AddressOf(obj))
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)

View file

@ -119,6 +119,8 @@ type gcCfg struct {
expiredCollectorBatchSize int expiredCollectorBatchSize int
metrics GCMectrics metrics GCMectrics
testHookRemover func(ctx context.Context) gcRunResult
} }
func defaultGCCfg() gcCfg { func defaultGCCfg() gcCfg {
@ -158,33 +160,37 @@ func (gc *gc) listenEvents(ctx context.Context) {
return return
} }
v, ok := gc.mEventHandler[event.typ()] gc.handleEvent(ctx, event)
if !ok { }
continue }
}
v.cancelFunc() func (gc *gc) handleEvent(ctx context.Context, event Event) {
v.prevGroup.Wait() v, ok := gc.mEventHandler[event.typ()]
if !ok {
return
}
var runCtx context.Context v.cancelFunc()
runCtx, v.cancelFunc = context.WithCancel(ctx) v.prevGroup.Wait()
v.prevGroup.Add(len(v.handlers)) var runCtx context.Context
runCtx, v.cancelFunc = context.WithCancel(ctx)
for i := range v.handlers { v.prevGroup.Add(len(v.handlers))
h := v.handlers[i]
err := gc.workerPool.Submit(func() { for i := range v.handlers {
defer v.prevGroup.Done() h := v.handlers[i]
h(runCtx, event)
})
if err != nil {
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
zap.String("error", err.Error()),
)
v.prevGroup.Done() err := gc.workerPool.Submit(func() {
} defer v.prevGroup.Done()
h(runCtx, event)
})
if err != nil {
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
zap.String("error", err.Error()),
)
v.prevGroup.Done()
} }
} }
} }
@ -209,7 +215,12 @@ func (gc *gc) tickRemover(ctx context.Context) {
case <-timer.C: case <-timer.C:
startedAt := time.Now() startedAt := time.Now()
result := gc.remover(ctx) var result gcRunResult
if gc.testHookRemover != nil {
result = gc.testHookRemover(ctx)
} else {
result = gc.remover(ctx)
}
timer.Reset(gc.removerInterval) timer.Reset(gc.removerInterval)
gc.metrics.AddRunDuration(time.Since(startedAt), result.success) gc.metrics.AddRunDuration(time.Since(startedAt), result.success)
@ -220,7 +231,7 @@ func (gc *gc) tickRemover(ctx context.Context) {
func (gc *gc) stop() { func (gc *gc) stop() {
gc.onceStop.Do(func() { gc.onceStop.Do(func() {
gc.stopChannel <- struct{}{} close(gc.stopChannel)
}) })
gc.log.Info(logs.ShardWaitingForGCWorkersToStop) gc.log.Info(logs.ShardWaitingForGCWorkersToStop)

View file

@ -75,8 +75,8 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
} }
sh = New(opts...) sh = New(opts...)
sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} }
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
@ -116,13 +116,13 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm) storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
require.NoError(t, err, "failed to get storage ID") require.NoError(t, err, "failed to get storage ID")
//check existance in blobstore //check existence in blobstore
var bsExisted common.ExistsPrm var bsExisted common.ExistsPrm
bsExisted.Address = addr bsExisted.Address = addr
bsExisted.StorageID = storageID.StorageID() bsExisted.StorageID = storageID.StorageID()
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted) exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existance") require.NoError(t, err, "failed to check blobstore existence")
require.True(t, exRes.Exists, "invalid blobstore existance result") require.True(t, exRes.Exists, "invalid blobstore existence result")
//drop from blobstor //drop from blobstor
var bsDeletePrm common.DeletePrm var bsDeletePrm common.DeletePrm
@ -131,10 +131,10 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm) _, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
require.NoError(t, err, "failed to delete from blobstore") require.NoError(t, err, "failed to delete from blobstore")
//check existance in blobstore //check existence in blobstore
exRes, err = sh.blobStor.Exists(context.Background(), bsExisted) exRes, err = sh.blobStor.Exists(context.Background(), bsExisted)
require.NoError(t, err, "failed to check blobstore existance") require.NoError(t, err, "failed to check blobstore existence")
require.False(t, exRes.Exists, "invalid blobstore existance result") require.False(t, exRes.Exists, "invalid blobstore existence result")
//get should return object not found //get should return object not found
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)

View file

@ -1,17 +1,15 @@
package shard_test package shard
import ( import (
"context" "context"
"errors" "errors"
"testing" "testing"
"time"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -27,13 +25,11 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
Value: 100, Value: 100,
} }
wcOpts := writecacheconfig.Options{ sh := newCustomShard(t, false, shardOptions{
Type: writecacheconfig.TypeBBolt, metaOptions: []meta.Option{meta.WithEpochState(epoch)},
} additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
sh := newCustomShard(t, t.TempDir(), false, wcOpts, nil, []meta.Option{meta.WithEpochState(epoch)}) return util.NewPseudoWorkerPool() // synchronous event processing
})},
t.Cleanup(func() {
releaseShard(sh, t)
}) })
cnr := cidtest.ID() cnr := cidtest.ID()
@ -55,7 +51,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
lock.SetAttributes(lockExpirationAttr) lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID() lockID, _ := lock.ID()
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)
@ -69,14 +65,12 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
epoch.Value = 105 epoch.Value = 105
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value) sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
var getPrm shard.GetPrm var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(obj)) getPrm.SetAddress(objectCore.AddressOf(obj))
require.Eventually(t, func() bool { _, err = sh.Get(context.Background(), getPrm)
_, err = sh.Get(context.Background(), getPrm) require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted")
return client.IsErrObjectNotFound(err)
}, 3*time.Second, 1*time.Second, "expired object must be deleted")
} }
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
@ -127,13 +121,11 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
linkID, _ := link.ID() linkID, _ := link.ID()
wcOpts := writecacheconfig.Options{ sh := newCustomShard(t, false, shardOptions{
Type: writecacheconfig.TypeBBolt, metaOptions: []meta.Option{meta.WithEpochState(epoch)},
} additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
sh := newCustomShard(t, t.TempDir(), false, wcOpts, nil, []meta.Option{meta.WithEpochState(epoch)}) return util.NewPseudoWorkerPool() // synchronous event processing
})},
t.Cleanup(func() {
releaseShard(sh, t)
}) })
lock := testutil.GenerateObjectWithCID(cnr) lock := testutil.GenerateObjectWithCID(cnr)
@ -141,7 +133,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
lock.SetAttributes(lockExpirationAttr) lock.SetAttributes(lockExpirationAttr)
lockID, _ := lock.ID() lockID, _ := lock.ID()
var putPrm shard.PutPrm var putPrm PutPrm
for _, child := range children { for _, child := range children {
putPrm.SetObject(child) putPrm.SetObject(child)
@ -160,7 +152,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
_, err = sh.Put(context.Background(), putPrm) _, err = sh.Put(context.Background(), putPrm)
require.NoError(t, err) require.NoError(t, err)
var getPrm shard.GetPrm var getPrm GetPrm
getPrm.SetAddress(objectCore.AddressOf(parent)) getPrm.SetAddress(objectCore.AddressOf(parent))
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
@ -168,10 +160,8 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
require.True(t, errors.As(err, &splitInfoError), "split info must be provided") require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
epoch.Value = 105 epoch.Value = 105
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value) sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
require.Eventually(t, func() bool { _, err = sh.Get(context.Background(), getPrm)
_, err = sh.Get(context.Background(), getPrm) require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires")
return client.IsErrObjectNotFound(err)
}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
} }

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"bytes" "bytes"
@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -33,10 +32,9 @@ func TestShard_Get(t *testing.T) {
func testShardGet(t *testing.T, hasWriteCache bool) { func testShardGet(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
var putPrm shard.PutPrm var putPrm PutPrm
var getPrm shard.GetPrm var getPrm GetPrm
t.Run("small object", func(t *testing.T) { t.Run("small object", func(t *testing.T) {
obj := testutil.GenerateObject() obj := testutil.GenerateObject()
@ -116,7 +114,7 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
}) })
} }
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (shard.GetRes, error) { func testGet(t *testing.T, sh *Shard, getPrm GetPrm, hasWriteCache bool) (GetRes, error) {
res, err := sh.Get(context.Background(), getPrm) res, err := sh.Get(context.Background(), getPrm)
if hasWriteCache { if hasWriteCache {
require.Eventually(t, func() bool { require.Eventually(t, func() bool {

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -8,7 +8,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -31,10 +30,9 @@ func TestShard_Head(t *testing.T) {
func testShardHead(t *testing.T, hasWriteCache bool) { func testShardHead(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
var putPrm shard.PutPrm var putPrm PutPrm
var headPrm shard.HeadPrm var headPrm HeadPrm
t.Run("regular object", func(t *testing.T) { t.Run("regular object", func(t *testing.T) {
obj := testutil.GenerateObject() obj := testutil.GenerateObject()
@ -87,7 +85,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
}) })
} }
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (shard.HeadRes, error) { func testHead(t *testing.T, sh *Shard, headPrm HeadPrm, hasWriteCache bool) (HeadRes, error) {
res, err := sh.Head(context.Background(), headPrm) res, err := sh.Head(context.Background(), headPrm)
if hasWriteCache { if hasWriteCache {
require.Eventually(t, func() bool { require.Eventually(t, func() bool {

View file

@ -1,6 +1,8 @@
package shard package shard
import ( import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
"go.uber.org/zap" "go.uber.org/zap"
@ -27,8 +29,8 @@ func (s *Shard) ID() *ID {
} }
// UpdateID reads shard ID saved in the metabase and updates it if it is missing. // UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID() (err error) { func (s *Shard) UpdateID(ctx context.Context) (err error) {
if err = s.metaBase.Open(false); err != nil { if err = s.metaBase.Open(ctx, false); err != nil {
return err return err
} }
defer func() { defer func() {

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -6,7 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -28,7 +27,6 @@ func TestShard_Inhume(t *testing.T) {
func testShardInhume(t *testing.T, hasWriteCache bool) { func testShardInhume(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer releaseShard(sh, t)
cnr := cidtest.ID() cnr := cidtest.ID()
@ -37,13 +35,13 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
ts := testutil.GenerateObjectWithCID(cnr) ts := testutil.GenerateObjectWithCID(cnr)
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
var inhPrm shard.InhumePrm var inhPrm InhumePrm
inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj)) inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj))
var getPrm shard.GetPrm var getPrm GetPrm
getPrm.SetAddress(object.AddressOf(obj)) getPrm.SetAddress(object.AddressOf(obj))
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -19,19 +18,17 @@ func TestShard_List(t *testing.T) {
t.Run("without write cache", func(t *testing.T) { t.Run("without write cache", func(t *testing.T) {
t.Parallel() t.Parallel()
sh := newShard(t, false) sh := newShard(t, false)
defer releaseShard(sh, t)
testShardList(t, sh) testShardList(t, sh)
}) })
t.Run("with write cache", func(t *testing.T) { t.Run("with write cache", func(t *testing.T) {
t.Parallel() t.Parallel()
shWC := newShard(t, true) shWC := newShard(t, true)
defer releaseShard(shWC, t)
testShardList(t, shWC) testShardList(t, shWC)
}) })
} }
func testShardList(t *testing.T, sh *shard.Shard) { func testShardList(t *testing.T, sh *Shard) {
const C = 10 const C = 10
const N = 5 const N = 5
@ -59,7 +56,7 @@ func testShardList(t *testing.T, sh *shard.Shard) {
objs[object.AddressOf(obj).EncodeToString()] = 0 objs[object.AddressOf(obj).EncodeToString()] = 0
mtx.Unlock() mtx.Unlock()
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -26,13 +25,13 @@ import (
func TestShard_Lock(t *testing.T) { func TestShard_Lock(t *testing.T) {
t.Parallel() t.Parallel()
var sh *shard.Shard var sh *Shard
rootPath := t.TempDir() rootPath := t.TempDir()
opts := []shard.Option{ opts := []Option{
shard.WithID(shard.NewIDFromBytes([]byte{})), WithID(NewIDFromBytes([]byte{})),
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}), WithLogger(&logger.Logger{Logger: zap.NewNop()}),
shard.WithBlobStorOptions( WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{ blobstor.WithStorages([]blobstor.SubStorage{
{ {
Storage: blobovniczatree.NewBlobovniczaTree( Storage: blobovniczatree.NewBlobovniczaTree(
@ -49,17 +48,17 @@ func TestShard_Lock(t *testing.T) {
}, },
}), }),
), ),
shard.WithMetaBaseOptions( WithMetaBaseOptions(
meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}), meta.WithEpochState(epochState{}),
), ),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses) sh.HandleDeletedLocks(addresses)
}), }),
} }
sh = shard.New(opts...) sh = New(opts...)
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
@ -76,7 +75,7 @@ func TestShard_Lock(t *testing.T) {
// put the object // put the object
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)
@ -94,7 +93,7 @@ func TestShard_Lock(t *testing.T) {
t.Run("inhuming locked objects", func(t *testing.T) { t.Run("inhuming locked objects", func(t *testing.T) {
ts := testutil.GenerateObjectWithCID(cnr) ts := testutil.GenerateObjectWithCID(cnr)
var inhumePrm shard.InhumePrm var inhumePrm InhumePrm
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj)) inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj))
var objLockedErr *apistatus.ObjectLocked var objLockedErr *apistatus.ObjectLocked
@ -110,7 +109,7 @@ func TestShard_Lock(t *testing.T) {
t.Run("inhuming lock objects", func(t *testing.T) { t.Run("inhuming lock objects", func(t *testing.T) {
ts := testutil.GenerateObjectWithCID(cnr) ts := testutil.GenerateObjectWithCID(cnr)
var inhumePrm shard.InhumePrm var inhumePrm InhumePrm
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock)) inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock))
_, err = sh.Inhume(context.Background(), inhumePrm) _, err = sh.Inhume(context.Background(), inhumePrm)
@ -122,7 +121,7 @@ func TestShard_Lock(t *testing.T) {
}) })
t.Run("force objects inhuming", func(t *testing.T) { t.Run("force objects inhuming", func(t *testing.T) {
var inhumePrm shard.InhumePrm var inhumePrm InhumePrm
inhumePrm.MarkAsGarbage(objectcore.AddressOf(lock)) inhumePrm.MarkAsGarbage(objectcore.AddressOf(lock))
inhumePrm.ForceRemoval() inhumePrm.ForceRemoval()
@ -132,7 +131,7 @@ func TestShard_Lock(t *testing.T) {
// it should be possible to remove // it should be possible to remove
// lock object now // lock object now
inhumePrm = shard.InhumePrm{} inhumePrm = InhumePrm{}
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
_, err = sh.Inhume(context.Background(), inhumePrm) _, err = sh.Inhume(context.Background(), inhumePrm)
@ -140,7 +139,7 @@ func TestShard_Lock(t *testing.T) {
// check that object has been removed // check that object has been removed
var getPrm shard.GetPrm var getPrm GetPrm
getPrm.SetAddress(objectcore.AddressOf(obj)) getPrm.SetAddress(objectcore.AddressOf(obj))
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
@ -160,7 +159,7 @@ func TestShard_IsLocked(t *testing.T) {
// put the object // put the object
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -126,9 +125,6 @@ func (m *metricsStore) DeleteShardMetrics() {
m.errCounter = 0 m.errCounter = 0
} }
const physical = "phy"
const logical = "logic"
func TestCounters(t *testing.T) { func TestCounters(t *testing.T) {
t.Parallel() t.Parallel()
@ -163,24 +159,22 @@ func TestCounters(t *testing.T) {
totalPayload += oSize totalPayload += oSize
} }
t.Run("put", func(t *testing.T) { var prm PutPrm
var prm shard.PutPrm
for i := 0; i < objNumber; i++ { for i := 0; i < objNumber; i++ {
prm.SetObject(oo[i]) prm.SetObject(oo[i])
_, err := sh.Put(context.Background(), prm) _, err := sh.Put(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
} }
require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(physical))
require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical)) require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical))
require.Equal(t, expectedSizes, mm.containerSizes()) require.Equal(t, expectedSizes, mm.containerSizes())
require.Equal(t, totalPayload, mm.payloadSize()) require.Equal(t, totalPayload, mm.payloadSize())
})
t.Run("inhume_GC", func(t *testing.T) { t.Run("inhume_GC", func(t *testing.T) {
var prm shard.InhumePrm var prm InhumePrm
inhumedNumber := objNumber / 4 inhumedNumber := objNumber / 4
for i := 0; i < inhumedNumber; i++ { for i := 0; i < inhumedNumber; i++ {
@ -199,7 +193,7 @@ func TestCounters(t *testing.T) {
}) })
t.Run("inhume_TS", func(t *testing.T) { t.Run("inhume_TS", func(t *testing.T) {
var prm shard.InhumePrm var prm InhumePrm
ts := objectcore.AddressOf(testutil.GenerateObject()) ts := objectcore.AddressOf(testutil.GenerateObject())
phy := mm.getObjectCounter(physical) phy := mm.getObjectCounter(physical)
@ -220,7 +214,7 @@ func TestCounters(t *testing.T) {
}) })
t.Run("Delete", func(t *testing.T) { t.Run("Delete", func(t *testing.T) {
var prm shard.DeletePrm var prm DeletePrm
phy := mm.getObjectCounter(physical) phy := mm.getObjectCounter(physical)
logic := mm.getObjectCounter(logical) logic := mm.getObjectCounter(logical)
@ -246,7 +240,7 @@ func TestCounters(t *testing.T) {
}) })
} }
func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) { func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
blobOpts := []blobstor.Option{ blobOpts := []blobstor.Option{
blobstor.WithStorages([]blobstor.SubStorage{ blobstor.WithStorages([]blobstor.SubStorage{
{ {
@ -266,16 +260,16 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
cnrSize: make(map[string]int64), cnrSize: make(map[string]int64),
} }
sh := shard.New( sh := New(
shard.WithID(shard.NewIDFromBytes([]byte{})), WithID(NewIDFromBytes([]byte{})),
shard.WithBlobStorOptions(blobOpts...), WithBlobStorOptions(blobOpts...),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(path, "pilorama"))), WithPiloramaOptions(pilorama.WithPath(filepath.Join(path, "pilorama"))),
shard.WithMetaBaseOptions( WithMetaBaseOptions(
meta.WithPath(filepath.Join(path, "meta")), meta.WithPath(filepath.Join(path, "meta")),
meta.WithEpochState(epochState{})), meta.WithEpochState(epochState{})),
shard.WithMetricsWriter(mm), WithMetricsWriter(mm),
) )
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
@ -77,25 +76,27 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
}, },
} }
sh := newCustomShard(t, t.TempDir(), hasWriteCache, wcOpts, sh := newCustomShard(t, hasWriteCache, shardOptions{
[]blobstor.Option{blobstor.WithStorages([]blobstor.SubStorage{ wcOpts: wcOpts,
{ bsOpts: []blobstor.Option{
Storage: blobovniczatree.NewBlobovniczaTree( blobstor.WithStorages([]blobstor.SubStorage{
blobovniczatree.WithLogger(test.NewLogger(t, true)), {
blobovniczatree.WithRootPath(filepath.Join(t.TempDir(), "blob", "blobovnicza")), Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithBlobovniczaShallowDepth(1), blobovniczatree.WithLogger(test.NewLogger(t, true)),
blobovniczatree.WithBlobovniczaShallowWidth(1)), blobovniczatree.WithRootPath(filepath.Join(t.TempDir(), "blob", "blobovnicza")),
Policy: func(_ *objectSDK.Object, data []byte) bool { blobovniczatree.WithBlobovniczaShallowDepth(1),
return len(data) <= smallObjectSize blobovniczatree.WithBlobovniczaShallowWidth(1)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return len(data) <= smallObjectSize
},
}, },
}, {
{ Storage: fstree.New(
Storage: fstree.New( fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))), },
}, }),
})}, },
nil) })
defer releaseShard(sh, t)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -106,13 +107,13 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
addr := object.AddressOf(obj) addr := object.AddressOf(obj)
payload := slice.Copy(obj.Payload()) payload := slice.Copy(obj.Payload())
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err) require.NoError(t, err)
var rngPrm shard.RngPrm var rngPrm RngPrm
rngPrm.SetAddress(addr) rngPrm.SetAddress(addr)
rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength()) rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength())

View file

@ -52,7 +52,7 @@ func TestShardReload(t *testing.T) {
pilorama.WithPath(filepath.Join(p, "pilorama")))} pilorama.WithPath(filepath.Join(p, "pilorama")))}
sh := New(opts...) sh := New(opts...)
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
objects := make([]objAddr, 5) objects := make([]objAddr, 5)

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
@ -31,39 +30,50 @@ func (s epochState) CurrentEpoch() uint64 {
return s.Value return s.Value
} }
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard { type shardOptions struct {
return newCustomShard(t, t.TempDir(), enableWriteCache, rootPath string
writecacheconfig.Options{Type: writecacheconfig.TypeBBolt}, dontRelease bool
nil, wcOpts writecacheconfig.Options
nil) bsOpts []blobstor.Option
metaOptions []meta.Option
additionalShardOptions []Option
} }
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts writecacheconfig.Options, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard { func newShard(t testing.TB, enableWriteCache bool) *Shard {
var sh *shard.Shard return newCustomShard(t, enableWriteCache, shardOptions{})
if enableWriteCache { }
rootPath = filepath.Join(rootPath, "wc")
switch wcOpts.Type { func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard {
case writecacheconfig.TypeBBolt: if o.rootPath == "" {
wcOpts.BBoltOptions = append( o.rootPath = t.TempDir()
[]writecachebbolt.Option{writecachebbolt.WithPath(filepath.Join(rootPath, "wcache"))}, }
wcOpts.BBoltOptions...) if enableWriteCache && o.wcOpts.Type == 0 {
case writecacheconfig.TypeBadger: o.wcOpts.Type = writecacheconfig.TypeBBolt
wcOpts.BadgerOptions = append(
[]writecachebadger.Option{writecachebadger.WithPath(filepath.Join(rootPath, "wcache"))},
wcOpts.BadgerOptions...)
}
} else {
rootPath = filepath.Join(rootPath, "nowc")
} }
if bsOpts == nil { var sh *Shard
bsOpts = []blobstor.Option{ if enableWriteCache {
switch o.wcOpts.Type {
case writecacheconfig.TypeBBolt:
o.wcOpts.BBoltOptions = append(
[]writecachebbolt.Option{writecachebbolt.WithPath(filepath.Join(o.rootPath, "wcache"))},
o.wcOpts.BBoltOptions...)
case writecacheconfig.TypeBadger:
o.wcOpts.BadgerOptions = append(
[]writecachebadger.Option{writecachebadger.WithPath(filepath.Join(o.rootPath, "wcache"))},
o.wcOpts.BadgerOptions...)
}
}
if o.bsOpts == nil {
o.bsOpts = []blobstor.Option{
blobstor.WithLogger(test.NewLogger(t, true)), blobstor.WithLogger(test.NewLogger(t, true)),
blobstor.WithStorages([]blobstor.SubStorage{ blobstor.WithStorages([]blobstor.SubStorage{
{ {
Storage: blobovniczatree.NewBlobovniczaTree( Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithLogger(test.NewLogger(t, true)), blobovniczatree.WithLogger(test.NewLogger(t, true)),
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")), blobovniczatree.WithRootPath(filepath.Join(o.rootPath, "blob", "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowDepth(1), blobovniczatree.WithBlobovniczaShallowDepth(1),
blobovniczatree.WithBlobovniczaShallowWidth(1)), blobovniczatree.WithBlobovniczaShallowWidth(1)),
Policy: func(_ *objectSDK.Object, data []byte) bool { Policy: func(_ *objectSDK.Object, data []byte) bool {
@ -72,46 +82,51 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
}, },
{ {
Storage: fstree.New( Storage: fstree.New(
fstree.WithPath(filepath.Join(rootPath, "blob"))), fstree.WithPath(filepath.Join(o.rootPath, "blob"))),
}, },
}), }),
} }
} }
opts := []shard.Option{ opts := []Option{
shard.WithID(shard.NewIDFromBytes([]byte{})), WithID(NewIDFromBytes([]byte{})),
shard.WithLogger(test.NewLogger(t, true)), WithLogger(test.NewLogger(t, true)),
shard.WithBlobStorOptions(bsOpts...), WithBlobStorOptions(o.bsOpts...),
shard.WithMetaBaseOptions( WithMetaBaseOptions(
append([]meta.Option{ append([]meta.Option{
meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})}, meta.WithPath(filepath.Join(o.rootPath, "meta")), meta.WithEpochState(epochState{})},
metaOptions...)..., o.metaOptions...)...,
), ),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))), WithPiloramaOptions(pilorama.WithPath(filepath.Join(o.rootPath, "pilorama"))),
shard.WithWriteCache(enableWriteCache), WithWriteCache(enableWriteCache),
shard.WithWriteCacheOptions(wcOpts), WithWriteCacheOptions(o.wcOpts),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
sh.HandleDeletedLocks(addresses) sh.HandleDeletedLocks(addresses)
}), }),
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) { WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
sh.HandleExpiredLocks(ctx, epoch, a) sh.HandleExpiredLocks(ctx, epoch, a)
}), }),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz) pool, err := ants.NewPool(sz)
require.NoError(t, err) require.NoError(t, err)
return pool return pool
}), }),
shard.WithGCRemoverSleepInterval(100 * time.Millisecond), WithGCRemoverSleepInterval(100 * time.Millisecond),
} }
opts = append(opts, o.additionalShardOptions...)
sh = shard.New(opts...) sh = New(opts...)
require.NoError(t, sh.Open()) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
if !o.dontRelease {
t.Cleanup(func() { releaseShard(sh, t) })
}
return sh return sh
} }
func releaseShard(s *shard.Shard, t testing.TB) { func releaseShard(s *Shard, t testing.TB) {
require.NoError(t, s.Close()) require.NoError(t, s.Close())
} }

View file

@ -1,4 +1,4 @@
package shard_test package shard
import ( import (
"context" "context"
@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config" writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
@ -44,13 +43,13 @@ func TestWriteCacheObjectLoss(t *testing.T) {
}, },
} }
sh := newCustomShard(t, dir, true, wcOpts, nil, nil) sh := newCustomShard(t, true, shardOptions{dontRelease: true, rootPath: dir, wcOpts: wcOpts})
var errG errgroup.Group var errG errgroup.Group
for i := range objects { for i := range objects {
obj := objects[i] obj := objects[i]
errG.Go(func() error { errG.Go(func() error {
var putPrm shard.PutPrm var putPrm PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)
return err return err
@ -59,10 +58,9 @@ func TestWriteCacheObjectLoss(t *testing.T) {
require.NoError(t, errG.Wait()) require.NoError(t, errG.Wait())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close())
sh = newCustomShard(t, dir, true, wcOpts, nil, nil) sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
defer releaseShard(sh, t)
var getPrm shard.GetPrm var getPrm GetPrm
for i := range objects { for i := range objects {
getPrm.SetAddress(object.AddressOf(objects[i])) getPrm.SetAddress(object.AddressOf(objects[i]))

View file

@ -82,7 +82,7 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
} }
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) { func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
require.NoError(b, cache.Open(false), "opening") require.NoError(b, cache.Open(context.Background(), false), "opening")
require.NoError(b, cache.Init(), "initializing") require.NoError(b, cache.Init(), "initializing")
b.Cleanup(func() { b.Cleanup(func() {
require.NoError(b, cache.Close(), "closing") require.NoError(b, cache.Close(), "closing")

View file

@ -38,7 +38,7 @@ type Cache interface {
Flush(context.Context, bool) error Flush(context.Context, bool) error
Init() error Init() error
Open(readOnly bool) error Open(ctx context.Context, readOnly bool) error
Close() error Close() error
} }

View file

@ -1,6 +1,7 @@
package writecachebadger package writecachebadger
import ( import (
"context"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -83,7 +84,7 @@ func (c *cache) DumpInfo() writecache.Info {
} }
// Open opens and initializes database. Reads object counters from the ObjectCounters instance. // Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open(readOnly bool) error { func (c *cache) Open(_ context.Context, readOnly bool) error {
err := c.openStore(readOnly) err := c.openStore(readOnly)
if err != nil { if err != nil {
return metaerr.Wrap(err) return metaerr.Wrap(err)

View file

@ -14,9 +14,12 @@ import (
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap"
) )
func TestFlush(t *testing.T) { func TestFlush(t *testing.T) {
testlogger := test.NewLogger(t, true)
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache { createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New( return New(
append([]Option{ append([]Option{
@ -30,8 +33,9 @@ func TestFlush(t *testing.T) {
errCountOpt := func() (Option, *atomic.Uint32) { errCountOpt := func() (Option, *atomic.Uint32) {
cnt := &atomic.Uint32{} cnt := &atomic.Uint32{}
return WithReportErrorFunc(func(string, error) { return WithReportErrorFunc(func(msg string, err error) {
cnt.Add(1) cnt.Add(1)
testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err))
}), cnt }), cnt
} }

View file

@ -1,6 +1,7 @@
package writecachebbolt package writecachebbolt
import ( import (
"context"
"os" "os"
"sync" "sync"
@ -97,7 +98,7 @@ func (c *cache) DumpInfo() writecache.Info {
} }
// Open opens and initializes database. Reads object counters from the ObjectCounters instance. // Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open(readOnly bool) error { func (c *cache) Open(_ context.Context, readOnly bool) error {
err := c.openStore(readOnly) err := c.openStore(readOnly)
if err != nil { if err != nil {
return metaerr.Wrap(err) return metaerr.Wrap(err)

View file

@ -17,13 +17,16 @@ import (
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/zap"
) )
func TestFlush(t *testing.T) { func TestFlush(t *testing.T) {
testlogger := test.NewLogger(t, true)
createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache { createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs writecache.MainStorage, opts ...Option) writecache.Cache {
return New( return New(
append([]Option{ append([]Option{
WithLogger(test.NewLogger(t, true)), WithLogger(testlogger),
WithPath(filepath.Join(t.TempDir(), "writecache")), WithPath(filepath.Join(t.TempDir(), "writecache")),
WithSmallObjectSize(smallSize), WithSmallObjectSize(smallSize),
WithMetabase(mb), WithMetabase(mb),
@ -33,8 +36,9 @@ func TestFlush(t *testing.T) {
errCountOpt := func() (Option, *atomic.Uint32) { errCountOpt := func() (Option, *atomic.Uint32) {
cnt := &atomic.Uint32{} cnt := &atomic.Uint32{}
return WithReportErrorFunc(func(string, error) { return WithReportErrorFunc(func(msg string, err error) {
cnt.Add(1) cnt.Add(1)
testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err))
}), cnt }), cnt
} }

View file

@ -110,7 +110,7 @@ func newCache[Option any](
mb := meta.New( mb := meta.New(
meta.WithPath(filepath.Join(dir, "meta")), meta.WithPath(filepath.Join(dir, "meta")),
meta.WithEpochState(dummyEpoch{})) meta.WithEpochState(dummyEpoch{}))
require.NoError(t, mb.Open(false)) require.NoError(t, mb.Open(context.Background(), false))
require.NoError(t, mb.Init()) require.NoError(t, mb.Init())
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
@ -121,12 +121,12 @@ func newCache[Option any](
fstree.WithDirNameLen(1)), fstree.WithDirNameLen(1)),
}, },
})) }))
require.NoError(t, bs.Open(false)) require.NoError(t, bs.Open(context.Background(), false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
wc := createCacheFn(t, smallSize, mb, bs, opts...) wc := createCacheFn(t, smallSize, mb, bs, opts...)
t.Cleanup(func() { require.NoError(t, wc.Close()) }) t.Cleanup(func() { require.NoError(t, wc.Close()) })
require.NoError(t, wc.Open(false)) require.NoError(t, wc.Open(context.Background(), false))
require.NoError(t, wc.Init()) require.NoError(t, wc.Init())
// First set mode for metabase and blobstor to prevent background flushes. // First set mode for metabase and blobstor to prevent background flushes.

View file

@ -130,6 +130,7 @@ loop:
continue loop continue loop
} }
} }
x.clients[a].invalidate()
delete(x.clients, a) delete(x.clients, a)
} }

View file

@ -137,6 +137,8 @@ func (h *cfg) readObjectHeadersFromRequestXHeaderSource(m requestXHeaderSource,
dst.objectHeaders = headersFromObject(objectSDK.NewFromV2(oV2), h.cnr, h.obj) dst.objectHeaders = headersFromObject(objectSDK.NewFromV2(oV2), h.cnr, h.obj)
} }
case *objectV2.PutSingleRequest:
dst.objectHeaders = headersFromObject(objectSDK.NewFromV2(req.GetBody().GetObject()), h.cnr, h.obj)
case *objectV2.SearchRequest: case *objectV2.SearchRequest:
cnrV2 := req.GetBody().GetContainerID() cnrV2 := req.GetBody().GetContainerID()
var cnr cid.ID var cnr cid.ID

View file

@ -3,6 +3,7 @@ package policer
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
@ -16,48 +17,33 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) { func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error {
addr := addrWithType.Address addr := addrWithType.Address
idCnr := addr.Container() idCnr := addr.Container()
idObj := addr.Object() idObj := addr.Object()
cnr, err := p.cnrSrc.Get(idCnr) cnr, err := p.cnrSrc.Get(idCnr)
if err != nil { if err != nil {
p.log.Error(logs.PolicerCouldNotGetContainer,
zap.Stringer("cid", idCnr),
zap.String("error", err.Error()),
)
if client.IsErrContainerNotFound(err) { if client.IsErrContainerNotFound(err) {
existed, err := containercore.WasRemoved(p.cnrSrc, idCnr) existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr)
if err != nil { if errWasRemoved != nil {
p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval, return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved)
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
} else if existed { } else if existed {
err := p.buryFn(ctx, addrWithType.Address) err := p.buryFn(ctx, addrWithType.Address)
if err != nil { if err != nil {
p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer, return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err)
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
} }
} }
} }
return return fmt.Errorf("%s: %w", logs.PolicerCouldNotGetContainer, err)
} }
policy := cnr.Value.PlacementPolicy() policy := cnr.Value.PlacementPolicy()
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
if err != nil { if err != nil {
p.log.Error(logs.PolicerCouldNotBuildPlacementVectorForObject, return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
zap.Stringer("cid", idCnr),
zap.String("error", err.Error()),
)
return
} }
c := &placementRequirements{} c := &placementRequirements{}
@ -73,7 +59,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
for i := range nn { for i := range nn {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return ctx.Err()
default: default:
} }
@ -87,6 +73,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
p.cbRedundantCopy(ctx, addr) p.cbRedundantCopy(ctx, addr)
} }
return nil
} }
type placementRequirements struct { type placementRequirements struct {

View file

@ -12,6 +12,7 @@ import (
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -245,7 +246,8 @@ func TestProcessObject(t *testing.T) {
Type: ti.objType, Type: ti.objType,
} }
p.processObject(context.Background(), addrWithType) err := p.processObject(context.Background(), addrWithType)
require.NoError(t, err)
sort.Ints(gotReplicateTo) sort.Ints(gotReplicateTo)
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant) require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
@ -254,6 +256,35 @@ func TestProcessObject(t *testing.T) {
} }
} }
func TestProcessObjectError(t *testing.T) {
addr := oidtest.Address()
// Container source
cnr := &container.Container{}
cnr.Value.Init()
source := containerSrc{
get: func(id cid.ID) (*container.Container, error) {
return nil, new(apistatus.ContainerNotFound)
},
deletionInfo: func(id cid.ID) (*container.DelInfo, error) {
return nil, new(apistatus.ContainerNotFound)
},
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
return nil
}
p := New(
WithContainerSource(source),
WithBuryFunc(buryFn),
)
addrWithType := objectcore.AddressWithType{
Address: addr,
}
require.True(t, client.IsErrContainerNotFound(p.processObject(context.Background(), addrWithType)))
}
func TestIteratorContract(t *testing.T) { func TestIteratorContract(t *testing.T) {
addr := oidtest.Address() addr := oidtest.Address()
objs := []objectcore.AddressWithType{{ objs := []objectcore.AddressWithType{{

View file

@ -52,7 +52,12 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
} }
if p.objsInWork.add(addr.Address) { if p.objsInWork.add(addr.Address) {
p.processObject(ctx, addr) err := p.processObject(ctx, addr)
if err != nil {
p.log.Error(logs.PolicerUnableToProcessObj,
zap.Stringer("object", addr.Address),
zap.String("error", err.Error()))
}
p.cache.Add(addr.Address, time.Now()) p.cache.Add(addr.Address, time.Now())
p.objsInWork.remove(addr.Address) p.objsInWork.remove(addr.Address)
} }