restic/internal/backend/frostfs/frostfs.go
Aleksey Kravchenko f9c3130c1b
Some checks failed
test / lint (pull_request) Failing after 3m19s
test / Cross Compile for subset 2/3 (pull_request) Failing after 4m57s
test / docker (pull_request) Failing after 6m11s
test / Linux Go 1.19.x (pull_request) Failing after 7m7s
test / Linux Go 1.20.x (pull_request) Failing after 8m48s
test / Cross Compile for subset 0/3 (pull_request) Successful in 9m52s
test / Cross Compile for subset 1/3 (pull_request) Successful in 10m37s
test / Linux Go 1.21.x (pull_request) Failing after 13m4s
test / Linux Go 1.22.x (pull_request) Failing after 13m3s
test / Linux (race) Go 1.22.x (pull_request) Failing after 15m33s
test / Windows Go 1.22.x (pull_request) Has been cancelled
test / macOS Go 1.22.x (pull_request) Has been cancelled
test / Analyze results (pull_request) Has been cancelled
[#XX] Add frostfs backend
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
2024-12-10 16:00:13 +03:00

331 lines
7.2 KiB
Go

package frostfs
import (
"context"
"fmt"
"hash"
"io"
"net/http"
"strings"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/location"
"github.com/restic/restic/internal/backend/util"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
)
type (
// Backend stores data on a neofs storage.
Backend struct {
client *pool.Pool
owner *user.ID
cnrID cid.ID
connections uint
}
// ObjInfo represents inner file info.
ObjInfo struct {
backend.FileInfo
address oid.Address
}
)
var _ backend.Backend = (*Backend)(nil)
const attrResticType = "restic-type"
func NewFactory() location.Factory {
return location.NewHTTPBackendFactory("frostfs", ParseConfig, location.NoPassword, Create, Open)
}
func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
return open(ctx, cfg)
}
func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (backend.Backend, error) {
return open(ctx, cfg)
}
func open(ctx context.Context, cfg Config) (backend.Backend, error) {
acc, err := getAccount(cfg)
if err != nil {
return nil, err
}
var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
p, err := createPool(ctx, acc, cfg)
if err != nil {
return nil, err
}
containerID, err := getContainerID(ctx, p, owner, cfg.Container)
if err != nil {
return nil, fmt.Errorf("failed to resolve container id: %w", err)
}
debug.Log("container repo: %s", containerID.String())
return &Backend{
client: p,
owner: &owner,
cnrID: containerID,
connections: cfg.Connections,
}, nil
}
func (b *Backend) IsPermanentError(err error) bool {
return true
}
func (b *Backend) Location() string {
return b.cnrID.String()
}
func (b *Backend) Hasher() hash.Hash {
return nil // nil is valid value
}
func (b *Backend) Test(ctx context.Context, h backend.Handle) (bool, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(object.AttributeFileName, getName(h), object.MatchStringEqual)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(b.cnrID)
prmSearch.SetFilters(filters)
res, err := b.client.SearchObjects(ctx, prmSearch)
if err != nil {
return false, fmt.Errorf("search objects: %w", err)
}
defer res.Close()
err = res.Iterate(func(id oid.ID) bool {
return true
})
if err != nil {
return false, fmt.Errorf("iterate objects: %w", err)
}
return true, nil
}
func (b *Backend) Remove(ctx context.Context, h backend.Handle) error {
objInfo, err := b.stat(ctx, h)
if err != nil {
return err
}
var prm pool.PrmObjectDelete
prm.SetAddress(objInfo.address)
return b.client.DeleteObject(ctx, prm)
}
func (b *Backend) Close() error {
b.client.Close()
return nil
}
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
name := getName(h)
obj := formRawObject(b.owner, b.cnrID, name, map[string]string{attrResticType: string(h.Type)})
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
prm.SetPayload(rd)
_, err := b.client.PutObject(ctx, prm)
return err
}
func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return util.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
}
func (b *Backend) openReader(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
objInfo, err := b.stat(ctx, h)
if err != nil {
return nil, err
}
ln := uint64(length)
if ln == 0 {
ln = uint64(objInfo.Size - offset)
}
var prm pool.PrmObjectRange
prm.SetAddress(objInfo.address)
prm.SetOffset(uint64(offset))
prm.SetLength(ln)
res, err := b.client.ObjectRange(ctx, prm)
if err != nil {
return nil, err
}
return &res, nil
}
func (b *Backend) stat(ctx context.Context, h backend.Handle) (*ObjInfo, error) {
name := getName(h)
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(object.AttributeFileName, name, object.MatchStringEqual)
filters.AddFilter(attrResticType, string(h.Type), object.MatchStringEqual)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(b.cnrID)
prmSearch.SetFilters(filters)
res, err := b.client.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, fmt.Errorf("search objects: %w", err)
}
defer res.Close()
var objID oid.ID
var found bool
var inErr error
err = res.Iterate(func(id oid.ID) bool {
if found {
inErr = fmt.Errorf("found more than one object for file: '%s'", name)
return true
}
objID = id
found = true
return false
})
if err == nil {
err = inErr
}
if err != nil {
return nil, fmt.Errorf("iterate objects: %w", err)
}
if !found {
return nil, fmt.Errorf("not found file: '%s'", name)
}
addr := newAddress(b.cnrID, objID)
var prm pool.PrmObjectHead
prm.SetAddress(addr)
obj, err := b.client.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
return &ObjInfo{
FileInfo: backend.FileInfo{
Name: name,
Size: int64(obj.PayloadSize()),
},
address: addr,
}, nil
}
func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, error) {
objInfo, err := b.stat(ctx, h)
if err != nil {
return backend.FileInfo{}, err
}
return objInfo.FileInfo, nil
}
func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(backend.FileInfo) error) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attrResticType, string(t), object.MatchStringEqual)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(b.cnrID)
prmSearch.SetFilters(filters)
res, err := b.client.SearchObjects(ctx, prmSearch)
if err != nil {
return fmt.Errorf("search objects: %w", err)
}
defer res.Close()
var addr oid.Address
addr.SetContainer(b.cnrID)
var inErr error
err = res.Iterate(func(id oid.ID) bool {
addr.SetObject(id)
var prm pool.PrmObjectHead
prm.SetAddress(addr)
obj, err := b.client.HeadObject(ctx, prm)
if err != nil {
inErr = fmt.Errorf("head object: %w", err)
return true
}
fileInfo := backend.FileInfo{
Size: int64(obj.PayloadSize()),
Name: getNameAttr(obj),
}
if err = fn(fileInfo); err != nil {
inErr = fmt.Errorf("handle fileInfo: %w", err)
return true
}
return false
})
if err == nil {
err = inErr
}
if err != nil {
return fmt.Errorf("iterate objects: %w", err)
}
return nil
}
func (b *Backend) Connections() uint {
return b.connections
}
func (b *Backend) HasAtomicReplace() bool {
return false
}
func (b *Backend) IsNotExist(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "not found")
}
func (b *Backend) Delete(ctx context.Context) error {
prm := pool.PrmContainerDelete{ContainerID: b.cnrID}
if err := b.client.DeleteContainer(ctx, prm); err != nil {
return fmt.Errorf("delete container: %w", err)
}
return nil
}
func getName(h backend.Handle) string {
name := h.Name
if h.Type == restic.ConfigFile {
name = "config"
}
return name
}