forked from TrueCloudLab/frostfs-node
Change-Id: I1b73e561a8daad67d0a8ffc0d293cbdd09aaab6b Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
154 lines
3.6 KiB
Go
154 lines
3.6 KiB
Go
package zombie
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
type quarantine struct {
|
|
// mtx protects current field.
|
|
mtx sync.Mutex
|
|
current int
|
|
trees []*fstree.FSTree
|
|
}
|
|
|
|
func createQuarantine(cmd *cobra.Command, engineInfo engine.Info) *quarantine {
|
|
var paths []string
|
|
for _, sh := range engineInfo.Shards {
|
|
var storagePaths []string
|
|
for _, st := range sh.BlobStorInfo.SubStorages {
|
|
storagePaths = append(storagePaths, st.Path)
|
|
}
|
|
if len(storagePaths) == 0 {
|
|
continue
|
|
}
|
|
paths = append(paths, filepath.Join(commonPath(storagePaths), "quarantine"))
|
|
}
|
|
q, err := newQuarantine(paths)
|
|
commonCmd.ExitOnErr(cmd, "create quarantine: %w", err)
|
|
return q
|
|
}
|
|
|
|
func commonPath(paths []string) string {
|
|
if len(paths) == 0 {
|
|
return ""
|
|
}
|
|
if len(paths) == 1 {
|
|
return paths[0]
|
|
}
|
|
minLen := math.MaxInt
|
|
for _, p := range paths {
|
|
if len(p) < minLen {
|
|
minLen = len(p)
|
|
}
|
|
}
|
|
|
|
var sb strings.Builder
|
|
for i := range minLen {
|
|
for _, path := range paths[1:] {
|
|
if paths[0][i] != path[i] {
|
|
return sb.String()
|
|
}
|
|
}
|
|
sb.WriteByte(paths[0][i])
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
func newQuarantine(paths []string) (*quarantine, error) {
|
|
var q quarantine
|
|
for i := range paths {
|
|
f := fstree.New(
|
|
fstree.WithDepth(1),
|
|
fstree.WithDirNameLen(1),
|
|
fstree.WithPath(paths[i]),
|
|
fstree.WithPerm(os.ModePerm),
|
|
)
|
|
if err := f.Open(mode.ComponentReadWrite); err != nil {
|
|
return nil, fmt.Errorf("open fstree %s: %w", paths[i], err)
|
|
}
|
|
if err := f.Init(); err != nil {
|
|
return nil, fmt.Errorf("init fstree %s: %w", paths[i], err)
|
|
}
|
|
q.trees = append(q.trees, f)
|
|
}
|
|
return &q, nil
|
|
}
|
|
|
|
func (q *quarantine) Get(ctx context.Context, a oid.Address) (*objectSDK.Object, error) {
|
|
for i := range q.trees {
|
|
res, err := q.trees[i].Get(ctx, common.GetPrm{Address: a})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
return res.Object, nil
|
|
}
|
|
return nil, &apistatus.ObjectNotFound{}
|
|
}
|
|
|
|
func (q *quarantine) Delete(ctx context.Context, a oid.Address) error {
|
|
for i := range q.trees {
|
|
_, err := q.trees[i].Delete(ctx, common.DeletePrm{Address: a})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
return nil
|
|
}
|
|
return &apistatus.ObjectNotFound{}
|
|
}
|
|
|
|
func (q *quarantine) Put(ctx context.Context, obj *objectSDK.Object) error {
|
|
data, err := obj.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var prm common.PutPrm
|
|
prm.Address = objectcore.AddressOf(obj)
|
|
prm.Object = obj
|
|
prm.RawData = data
|
|
|
|
q.mtx.Lock()
|
|
current := q.current
|
|
q.current = (q.current + 1) % len(q.trees)
|
|
q.mtx.Unlock()
|
|
|
|
_, err = q.trees[current].Put(ctx, prm)
|
|
return err
|
|
}
|
|
|
|
func (q *quarantine) Iterate(ctx context.Context, f func(oid.Address) error) error {
|
|
var prm common.IteratePrm
|
|
prm.Handler = func(elem common.IterationElement) error {
|
|
return f(elem.Address)
|
|
}
|
|
for i := range q.trees {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
_, err := q.trees[i].Iterate(ctx, prm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|