restic/internal/backend/frostfs/frostfs.go
Aleksey Kravchenko 1b28213636
Some checks failed
/ DCO (pull_request) Successful in 2m26s
/ Builds (pull_request) Successful in 2m51s
/ Lint (pull_request) Successful in 3m6s
/ Tests (pull_request) Failing after 2m27s
[#2] Add forgejo CI jobs
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
2024-12-23 15:14:17 +03:00

301 lines
6.5 KiB
Go

package frostfs
import (
"context"
"fmt"
"hash"
"io"
"net/http"
"strings"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/location"
"github.com/restic/restic/internal/backend/util"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
)
type (
// Backend stores data on a neofs storage.
Backend struct {
client *pool.Pool
owner *user.ID
cnrID cid.ID
connections uint
}
// ObjInfo represents inner file info.
ObjInfo struct {
backend.FileInfo
address oid.Address
}
)
var _ backend.Backend = (*Backend)(nil)
const attrResticType = "restic-type"
func NewFactory() location.Factory {
return location.NewHTTPBackendFactory("frostfs", ParseConfig, location.NoPassword, Create, Open)
}
func Open(ctx context.Context, cfg Config, _ http.RoundTripper) (backend.Backend, error) {
return open(ctx, cfg)
}
func Create(ctx context.Context, cfg Config, _ http.RoundTripper) (backend.Backend, error) {
return open(ctx, cfg)
}
func open(ctx context.Context, cfg Config) (backend.Backend, error) {
acc, err := getAccount(cfg)
if err != nil {
return nil, err
}
var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
p, err := createPool(ctx, acc, cfg)
if err != nil {
return nil, err
}
containerID, err := getContainerID(ctx, p, owner, cfg.Container)
if err != nil {
return nil, fmt.Errorf("failed to resolve container id: %w", err)
}
debug.Log("container repo: %s", containerID.String())
return &Backend{
client: p,
owner: &owner,
cnrID: containerID,
connections: cfg.Connections,
}, nil
}
func (b *Backend) IsPermanentError(_ error) bool {
return true
}
func (b *Backend) Hasher() hash.Hash {
return nil // nil is valid value
}
func (b *Backend) Remove(ctx context.Context, h backend.Handle) error {
objInfo, err := b.stat(ctx, h)
if err != nil {
return err
}
var prm pool.PrmObjectDelete
prm.SetAddress(objInfo.address)
return b.client.DeleteObject(ctx, prm)
}
func (b *Backend) Close() error {
b.client.Close()
return nil
}
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
name := getName(h)
obj := formRawObject(b.owner, b.cnrID, name, map[string]string{attrResticType: string(h.Type)})
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
prm.SetPayload(rd)
_, err := b.client.PutObject(ctx, prm)
return err
}
func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return util.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
}
func (b *Backend) openReader(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
objInfo, err := b.stat(ctx, h)
if err != nil {
return nil, err
}
ln := uint64(length)
if ln == 0 {
ln = uint64(objInfo.Size - offset)
}
var prm pool.PrmObjectRange
prm.SetAddress(objInfo.address)
prm.SetOffset(uint64(offset))
prm.SetLength(ln)
res, err := b.client.ObjectRange(ctx, prm)
if err != nil {
return nil, err
}
return &res, nil
}
func (b *Backend) stat(ctx context.Context, h backend.Handle) (*ObjInfo, error) {
name := getName(h)
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(object.AttributeFileName, name, object.MatchStringEqual)
filters.AddFilter(attrResticType, string(h.Type), object.MatchStringEqual)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(b.cnrID)
prmSearch.SetFilters(filters)
res, err := b.client.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, fmt.Errorf("search objects: %w", err)
}
defer res.Close()
var objID oid.ID
var found bool
var inErr error
err = res.Iterate(func(id oid.ID) bool {
if found {
inErr = fmt.Errorf("found more than one object for file: '%s'", name)
return true
}
objID = id
found = true
return false
})
if err == nil {
err = inErr
}
if err != nil {
return nil, fmt.Errorf("iterate objects: %w", err)
}
if !found {
return nil, fmt.Errorf("not found file: '%s'", name)
}
addr := newAddress(b.cnrID, objID)
var prm pool.PrmObjectHead
prm.SetAddress(addr)
obj, err := b.client.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
return &ObjInfo{
FileInfo: backend.FileInfo{
Name: name,
Size: int64(obj.PayloadSize()),
},
address: addr,
}, nil
}
func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo, error) {
objInfo, err := b.stat(ctx, h)
if err != nil {
return backend.FileInfo{}, err
}
return objInfo.FileInfo, nil
}
func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(backend.FileInfo) error) error {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(attrResticType, string(t), object.MatchStringEqual)
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(b.cnrID)
prmSearch.SetFilters(filters)
res, err := b.client.SearchObjects(ctx, prmSearch)
if err != nil {
return fmt.Errorf("search objects: %w", err)
}
defer res.Close()
var addr oid.Address
addr.SetContainer(b.cnrID)
var inErr error
err = res.Iterate(func(id oid.ID) bool {
addr.SetObject(id)
var prm pool.PrmObjectHead
prm.SetAddress(addr)
obj, err := b.client.HeadObject(ctx, prm)
if err != nil {
inErr = fmt.Errorf("head object: %w", err)
return true
}
fileInfo := backend.FileInfo{
Size: int64(obj.PayloadSize()),
Name: getNameAttr(obj),
}
if err = fn(fileInfo); err != nil {
inErr = fmt.Errorf("handle fileInfo: %w", err)
return true
}
return false
})
if err == nil {
err = inErr
}
if err != nil {
return fmt.Errorf("iterate objects: %w", err)
}
return nil
}
func (b *Backend) Connections() uint {
return b.connections
}
func (b *Backend) HasAtomicReplace() bool {
return false
}
func (b *Backend) IsNotExist(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "not found")
}
func (b *Backend) Delete(ctx context.Context) error {
prm := pool.PrmContainerDelete{ContainerID: b.cnrID}
if err := b.client.DeleteContainer(ctx, prm); err != nil {
return fmt.Errorf("delete container: %w", err)
}
return nil
}
func getName(h backend.Handle) string {
name := h.Name
if h.Type == restic.ConfigFile {
name = "config"
}
return name
}