forked from TrueCloudLab/restic
32d5ceba87
Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
243 lines
7.2 KiB
Go
243 lines
7.2 KiB
Go
package restorer
|
|
|
|
import (
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/restic"
|
|
)
|
|
|
|
// packCache is thread safe in-memory cache of pack files required to restore
|
|
// one or more files. The cache is meant to hold pack files that cannot be
|
|
// fully used right away. This happens when pack files contains blobs from
|
|
// "head" of some files and "middle" of other files. "Middle" blobs cannot be
|
|
// written to their files until after blobs from some other packs are written
|
|
// to the files first.
|
|
//
|
|
// While the cache is thread safe, implementation assumes (and enforces)
|
|
// that individual entries are used by one client at a time. Clients must
|
|
// #Close() entry's reader to make the entry available for use by other
|
|
// clients. This limitation can be relaxed in the future if necessary.
|
|
type packCache struct {
|
|
// guards access to cache internal data structures
|
|
lock sync.Mutex
|
|
|
|
// cache capacity
|
|
capacity int
|
|
reservedCapacity int
|
|
allocatedCapacity int
|
|
|
|
// pack records currently being used by active restore worker
|
|
reservedPacks map[restic.ID]*packCacheRecord
|
|
|
|
// unused allocated packs, can be deleted if necessary
|
|
cachedPacks map[restic.ID]*packCacheRecord
|
|
}
|
|
|
|
type packCacheRecord struct {
|
|
master *packCacheRecord
|
|
cache *packCache
|
|
|
|
id restic.ID // cached pack id
|
|
offset int64 // cached pack byte range
|
|
|
|
data []byte
|
|
}
|
|
|
|
type readerAtCloser interface {
|
|
io.Closer
|
|
io.ReaderAt
|
|
}
|
|
|
|
type bytesWriteSeeker struct {
|
|
pos int
|
|
data []byte
|
|
}
|
|
|
|
func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) {
|
|
if wr.pos+len(p) > len(wr.data) {
|
|
return -1, errors.Errorf("not enough space")
|
|
}
|
|
n = copy(wr.data[wr.pos:], p)
|
|
wr.pos += n
|
|
return n, nil
|
|
}
|
|
|
|
func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) {
|
|
if offset != 0 || whence != io.SeekStart {
|
|
return -1, errors.Errorf("unsupported seek request")
|
|
}
|
|
wr.pos = 0
|
|
return 0, nil
|
|
}
|
|
|
|
func newPackCache(capacity int) *packCache {
|
|
return &packCache{
|
|
capacity: capacity,
|
|
reservedPacks: make(map[restic.ID]*packCacheRecord),
|
|
cachedPacks: make(map[restic.ID]*packCacheRecord),
|
|
}
|
|
}
|
|
|
|
func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if offset < 0 || length <= 0 {
|
|
return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length)
|
|
}
|
|
|
|
if c.reservedCapacity+length > c.capacity {
|
|
return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity)
|
|
}
|
|
|
|
if _, ok := c.reservedPacks[packID]; ok {
|
|
return nil, errors.Errorf("pack is already reserved %s", packID.Str())
|
|
}
|
|
|
|
// the pack is available in the cache and currently unused
|
|
if pack, ok := c.cachedPacks[packID]; ok {
|
|
// check if cached pack includes requested byte range
|
|
// the range can shrink, but it never grows bigger unless there is a bug elsewhere
|
|
if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) {
|
|
return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str())
|
|
}
|
|
|
|
// move the pack to the used map
|
|
delete(c.cachedPacks, packID)
|
|
c.reservedPacks[packID] = pack
|
|
c.reservedCapacity += len(pack.data)
|
|
|
|
debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data))
|
|
|
|
if pack.offset != offset || len(pack.data) != length {
|
|
// restrict returned record to requested range
|
|
return &packCacheRecord{
|
|
cache: c,
|
|
master: pack,
|
|
offset: offset,
|
|
data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length],
|
|
}, nil
|
|
}
|
|
|
|
return pack, nil
|
|
}
|
|
|
|
for c.allocatedCapacity+length > c.capacity {
|
|
// all cached packs will be needed at some point
|
|
// so it does not matter which one to purge
|
|
for _, cached := range c.cachedPacks {
|
|
delete(c.cachedPacks, cached.id)
|
|
c.allocatedCapacity -= len(cached.data)
|
|
debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data))
|
|
break
|
|
}
|
|
}
|
|
|
|
pack := &packCacheRecord{
|
|
cache: c,
|
|
id: packID,
|
|
offset: offset,
|
|
}
|
|
c.reservedPacks[pack.id] = pack
|
|
c.allocatedCapacity += length
|
|
c.reservedCapacity += length
|
|
|
|
return pack, nil
|
|
}
|
|
|
|
// get returns reader of the specified cached pack. Uses provided load func
|
|
// to download pack content if necessary.
|
|
// The returned reader is only able to read pack within byte range specified
|
|
// by offset and length parameters, attempts to read outside that range will
|
|
// result in an error.
|
|
// The returned reader must be closed before the same packID can be requested
|
|
// from the cache again.
|
|
func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) {
|
|
pack, err := c.reserve(packID, offset, length)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if pack.data == nil {
|
|
releasePack := func() {
|
|
delete(c.reservedPacks, pack.id)
|
|
c.reservedCapacity -= length
|
|
c.allocatedCapacity -= length
|
|
}
|
|
wr := &bytesWriteSeeker{data: make([]byte, length)}
|
|
err = load(offset, length, wr)
|
|
if err != nil {
|
|
releasePack()
|
|
return nil, err
|
|
}
|
|
if wr.pos != length {
|
|
releasePack()
|
|
return nil, errors.Errorf("invalid read size")
|
|
}
|
|
pack.data = wr.data
|
|
debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data))
|
|
}
|
|
|
|
return pack, nil
|
|
}
|
|
|
|
// releases the pack record back to the cache
|
|
func (c *packCache) release(pack *packCacheRecord) error {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if _, ok := c.reservedPacks[pack.id]; !ok {
|
|
return errors.Errorf("invalid pack release request")
|
|
}
|
|
|
|
delete(c.reservedPacks, pack.id)
|
|
c.cachedPacks[pack.id] = pack
|
|
c.reservedCapacity -= len(pack.data)
|
|
|
|
return nil
|
|
}
|
|
|
|
// remove removes specified pack from the cache and frees
|
|
// corresponding cache space. should be called after the pack
|
|
// was fully used up by the restorer.
|
|
func (c *packCache) remove(packID restic.ID) error {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if _, ok := c.reservedPacks[packID]; ok {
|
|
return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str())
|
|
}
|
|
|
|
pack, ok := c.cachedPacks[packID]
|
|
if !ok {
|
|
return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str())
|
|
}
|
|
|
|
delete(c.cachedPacks, pack.id)
|
|
c.allocatedCapacity -= len(pack.data)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadAt reads len(b) bytes from the pack starting at byte offset off.
|
|
// It returns the number of bytes read and the error, if any.
|
|
func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) {
|
|
if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) {
|
|
return -1, errors.Errorf("read outside available range")
|
|
}
|
|
return copy(b, r.data[off-r.offset:]), nil
|
|
}
|
|
|
|
// Close closes the pack reader and releases corresponding cache record
|
|
// to the cache. Once closed, the record can be reused by subsequent
|
|
// requests for the same packID or it can be purged from the cache to make
|
|
// room for other packs
|
|
func (r *packCacheRecord) Close() (err error) {
|
|
if r.master != nil {
|
|
return r.cache.release(r.master)
|
|
}
|
|
return r.cache.release(r)
|
|
}
|