9dff07200c
There is a need to process expired `LOCK` objects similar to `TOMBSTONE` ones: we collect them on `Shard`, notify all other shards about expiration so they could unlock the objects, and only after that mark lockers as garbage. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
175 lines
5 KiB
Go
175 lines
5 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
|
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
// bucket name for locked objects.
|
|
var bucketNameLocked = []byte(invalidBase58String + "Locked")
|
|
|
|
// suffix for container buckets with objects of type LOCK.
|
|
const bucketNameSuffixLockers = invalidBase58String + "LOCKER"
|
|
|
|
// returns name of the bucket with objects of type LOCK for specified container.
|
|
func bucketNameLockers(idCnr cid.ID) []byte {
|
|
return []byte(idCnr.String() + bucketNameSuffixLockers)
|
|
}
|
|
|
|
// Lock marks objects as locked with another object. All objects are from the
|
|
// specified container.
|
|
//
|
|
// Allows locking regular objects only (otherwise returns apistatus.LockNonRegularObject).
|
|
//
|
|
// Locked list should be unique. Panics if it is empty.
|
|
func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
|
if len(locked) == 0 {
|
|
panic("empty locked list")
|
|
}
|
|
|
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
// check if all objects are regular
|
|
bucketKeysLocked := make([][]byte, len(locked))
|
|
|
|
for i := range locked {
|
|
bucketKeysLocked[i] = objectKey(&locked[i])
|
|
}
|
|
|
|
if firstIrregularObjectType(tx, cnr, bucketKeysLocked...) != object.TypeRegular {
|
|
return apistatus.LockNonRegularObject{}
|
|
}
|
|
|
|
bucketLocked, err := tx.CreateBucketIfNotExists(bucketNameLocked)
|
|
if err != nil {
|
|
return fmt.Errorf("create global bucket for locked objects: %w", err)
|
|
}
|
|
|
|
bucketLockedContainer, err := bucketLocked.CreateBucketIfNotExists([]byte(cnr.String()))
|
|
if err != nil {
|
|
return fmt.Errorf("create container bucket for locked objects %v: %w", cnr, err)
|
|
}
|
|
|
|
keyLocker := objectKey(&locker)
|
|
var exLockers [][]byte
|
|
var updLockers []byte
|
|
|
|
loop:
|
|
for i := range bucketKeysLocked {
|
|
// decode list of already existing lockers
|
|
exLockers, err = decodeList(bucketLockedContainer.Get(bucketKeysLocked[i]))
|
|
if err != nil {
|
|
return fmt.Errorf("decode list of object lockers: %w", err)
|
|
}
|
|
|
|
for i := range exLockers {
|
|
if bytes.Equal(exLockers[i], keyLocker) {
|
|
continue loop
|
|
}
|
|
}
|
|
|
|
// update the list of lockers
|
|
updLockers, err = encodeList(append(exLockers, keyLocker))
|
|
if err != nil {
|
|
// maybe continue for the best effort?
|
|
return fmt.Errorf("encode list of object lockers: %w", err)
|
|
}
|
|
|
|
// write updated list of lockers
|
|
err = bucketLockedContainer.Put(bucketKeysLocked[i], updLockers)
|
|
if err != nil {
|
|
return fmt.Errorf("update list of object lockers: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
|
|
func (db *DB) FreeLockedBy(lockers []*addressSDK.Address) error {
|
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
var err error
|
|
|
|
for _, addr := range lockers {
|
|
err = freePotentialLocks(tx, *addr.ContainerID(), *addr.ObjectID())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return err
|
|
})
|
|
}
|
|
|
|
// checks if specified object is locked in the specified container.
|
|
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
if bucketLocked != nil {
|
|
bucketLockedContainer := bucketLocked.Bucket([]byte(idCnr.String()))
|
|
if bucketLockedContainer != nil {
|
|
return bucketLockedContainer.Get(objectKey(&idObj)) != nil
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// releases all records about the objects locked by the locker.
|
|
//
|
|
// Operation is very resource-intensive, which is caused by the admissibility
|
|
// of multiple locks. Also, if we knew what objects are locked, it would be
|
|
// possible to speed up the execution.
|
|
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
|
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
|
if bucketLocked != nil {
|
|
bucketLockedContainer := bucketLocked.Bucket([]byte(idCnr.String()))
|
|
if bucketLockedContainer != nil {
|
|
keyLocker := objectKey(&locker)
|
|
return bucketLockedContainer.ForEach(func(k, v []byte) error {
|
|
keyLockers, err := decodeList(v)
|
|
if err != nil {
|
|
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
|
}
|
|
|
|
for i := range keyLockers {
|
|
if bytes.Equal(keyLockers[i], keyLocker) {
|
|
if len(keyLockers) == 1 {
|
|
// locker was all alone
|
|
err = bucketLockedContainer.Delete(k)
|
|
if err != nil {
|
|
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
|
}
|
|
} else {
|
|
// exclude locker
|
|
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
|
|
|
v, err = encodeList(append(keyLockers[:i], keyLockers[i+1:]...))
|
|
if err != nil {
|
|
return fmt.Errorf("encode updated list of lockers: %w", err)
|
|
}
|
|
|
|
// update the record
|
|
err = bucketLockedContainer.Put(k, v)
|
|
if err != nil {
|
|
return fmt.Errorf("update list of lockers: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|