forked from TrueCloudLab/frostfs-s3-gw
[#413] Use tree service to put object
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
ab5c44ac14
commit
36f3c43af5
4 changed files with 201 additions and 46 deletions
|
@ -171,18 +171,14 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
own := n.Owner(ctx)
|
||||
|
||||
versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo)
|
||||
versions, err := n.headVersions(ctx, p.BktInfo, p.Object)
|
||||
if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) {
|
||||
return nil, err
|
||||
}
|
||||
idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled)
|
||||
newVersion := &NodeVersion{IsUnversioned: !versioningEnabled}
|
||||
|
||||
r := p.Reader
|
||||
if r != nil {
|
||||
if len(p.Header[api.ContentType]) == 0 {
|
||||
if contentType := MimeByFileName(p.Object); len(contentType) == 0 {
|
||||
d := newDetector(r)
|
||||
if contentType, err = d.Detect(); err == nil {
|
||||
if contentType, err := d.Detect(); err == nil {
|
||||
p.Header[api.ContentType] = contentType
|
||||
}
|
||||
r = d.MultiReader()
|
||||
|
@ -206,17 +202,16 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||
}
|
||||
|
||||
if p.Header[VersionsDeleteMarkAttr] == DelMarkFullObject {
|
||||
if last := versions.getLast(); last != nil {
|
||||
n.objCache.Delete(last.Address())
|
||||
}
|
||||
}
|
||||
|
||||
id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newVersion.OID = id
|
||||
if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, p.Object, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||
}
|
||||
|
||||
currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute))
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't get creation epoch",
|
||||
|
@ -242,23 +237,6 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
|
||||
n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
|
||||
|
||||
for _, id := range idsToDeleteArr {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, id); err != nil {
|
||||
n.log.Warn("couldn't delete object",
|
||||
zap.Stringer("version id", id),
|
||||
zap.Error(err))
|
||||
}
|
||||
if !versioningEnabled {
|
||||
if objVersion := versions.getVersion(id); objVersion != nil {
|
||||
if err = n.DeleteObjectTagging(ctx, p.BktInfo, objVersion); err != nil {
|
||||
n.log.Warn("couldn't delete object tagging",
|
||||
zap.Stringer("version id", id),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &data.ObjectInfo{
|
||||
ID: *id,
|
||||
CID: p.BktInfo.CID,
|
||||
|
|
|
@ -75,14 +75,6 @@ func (n *layer) DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo
|
|||
}
|
||||
|
||||
func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) {
|
||||
versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName)
|
||||
if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
idsToDeleteArr := updateCRDT2PSetHeaders(p.Metadata, versions, false) // false means "last write wins"
|
||||
// note that updateCRDT2PSetHeaders modifies p.Metadata and must be called further processing
|
||||
|
||||
prm := PrmObjectCreate{
|
||||
Container: p.BktInfo.CID,
|
||||
Creator: p.BktInfo.Owner,
|
||||
|
@ -121,6 +113,11 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject
|
|||
return nil, err
|
||||
}
|
||||
|
||||
newVersion := &BaseNodeVersion{OID: id}
|
||||
if err = n.treeService.AddSystemVersion(ctx, &p.BktInfo.CID, p.ObjName, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||
}
|
||||
|
||||
currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute))
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't get creation epoch",
|
||||
|
@ -129,15 +126,6 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject
|
|||
zap.Error(err))
|
||||
}
|
||||
|
||||
for _, id := range idsToDeleteArr {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, id); err != nil {
|
||||
n.log.Warn("couldn't delete system object",
|
||||
zap.Stringer("version id", id),
|
||||
zap.String("name", misc.SanitizeString(p.ObjName)),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
headers := make(map[string]string, len(p.Metadata))
|
||||
for _, attr := range prm.Attributes {
|
||||
headers[attr[0]] = attr[1]
|
||||
|
|
|
@ -29,6 +29,29 @@ type TreeService interface {
|
|||
PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error)
|
||||
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in NeoFS
|
||||
DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error)
|
||||
|
||||
GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*NodeVersion, error)
|
||||
|
||||
//GetUnversioned(context.Context, *cid.ID, string) (*NodeVersion, error)
|
||||
|
||||
AddVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *NodeVersion) error
|
||||
|
||||
RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
|
||||
AddSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *BaseNodeVersion) error
|
||||
|
||||
RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
}
|
||||
|
||||
type NodeVersion struct {
|
||||
BaseNodeVersion
|
||||
IsDeleteMarker bool
|
||||
IsUnversioned bool
|
||||
}
|
||||
|
||||
type BaseNodeVersion struct {
|
||||
ID uint64
|
||||
OID *oid.ID
|
||||
}
|
||||
|
||||
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
||||
|
|
|
@ -36,6 +36,7 @@ const (
|
|||
oidKv = "OID"
|
||||
fileNameKV = "FileName"
|
||||
systemNameKV = "SystemName"
|
||||
isUnversionedKV = "IsUnversioned"
|
||||
|
||||
settingsFileName = "bucket-settings"
|
||||
notifConfFileName = "bucket-notifications"
|
||||
|
@ -44,6 +45,11 @@ const (
|
|||
// bucketSystemObjectsTreeID -- ID of a tree with system objects for bucket
|
||||
// i.e. bucket settings with versioning and lock configuration, cors, notifications
|
||||
bucketSystemObjectsTreeID = "system-bucket"
|
||||
|
||||
versionTree = "version"
|
||||
systemTree = "system"
|
||||
|
||||
separator = "/"
|
||||
)
|
||||
|
||||
// NewTreeClient creates instance of TreeClient using provided address and create grpc connection.
|
||||
|
@ -201,6 +207,47 @@ func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetVersions(ctx context.Context, cnrID *cid.ID, filepath string) ([]*layer.NodeVersion, error) {
|
||||
return c.getVersions(ctx, cnrID, versionTree, filepath, false)
|
||||
}
|
||||
|
||||
func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID, filepath string) (*layer.NodeVersion, error) {
|
||||
nodes, err := c.getVersions(ctx, cnrID, treeID, filepath, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(nodes) > 1 {
|
||||
return nil, fmt.Errorf("found more than one unversioned node")
|
||||
}
|
||||
|
||||
if len(nodes) != 1 {
|
||||
return nil, layer.ErrNotFound
|
||||
}
|
||||
|
||||
return nodes[0], nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *layer.NodeVersion) error {
|
||||
return c.addVersion(ctx, cnrID, versionTree, filepath, version)
|
||||
}
|
||||
|
||||
func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *layer.BaseNodeVersion) error {
|
||||
newVersion := &layer.NodeVersion{
|
||||
BaseNodeVersion: *version,
|
||||
IsUnversioned: true,
|
||||
}
|
||||
return c.addVersion(ctx, cnrID, systemTree, filepath, newVersion)
|
||||
}
|
||||
|
||||
func (c *TreeClient) RemoveVersion(ctx context.Context, cnrID *cid.ID, id uint64) error {
|
||||
return c.removeVersion(ctx, cnrID, versionTree, id)
|
||||
}
|
||||
|
||||
func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id uint64) error {
|
||||
return c.removeVersion(ctx, cnrID, systemTree, id)
|
||||
}
|
||||
|
||||
func (c *TreeClient) Close() error {
|
||||
if c.conn != nil {
|
||||
return c.conn.Close()
|
||||
|
@ -209,6 +256,110 @@ func (c *TreeClient) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, filepath string, version *layer.NodeVersion) error {
|
||||
path := strings.Split(filepath, separator)
|
||||
meta := map[string]string{
|
||||
oidKV: version.OID.EncodeToString(),
|
||||
fileNameKV: path[len(path)-1],
|
||||
}
|
||||
|
||||
if version.IsUnversioned {
|
||||
meta[isUnversionedKV] = "true"
|
||||
|
||||
node, err := c.getUnversioned(ctx, cnrID, treeID, filepath)
|
||||
if err == nil {
|
||||
parentID, err := c.getParent(ctx, cnrID, treeID, node.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.moveNode(ctx, cnrID, treeID, version.ID, parentID, meta)
|
||||
}
|
||||
|
||||
if !errors.Is(err, layer.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return c.addNodeByPath(ctx, cnrID, treeID, path[:len(path)-1], meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) removeVersion(ctx context.Context, cnrID *cid.ID, treeID string, id uint64) error {
|
||||
request := &tree.RemoveRequest{
|
||||
Body: &tree.RemoveRequest_Body{
|
||||
ContainerId: []byte(cnrID.EncodeToString()),
|
||||
TreeId: treeID,
|
||||
NodeId: id,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := c.service.Remove(ctx, request)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, filepath string, onlyUnversioned bool) ([]*layer.NodeVersion, error) {
|
||||
keysToReturn := []string{versioningEnabledKV, lockConfigurationKV}
|
||||
path := strings.Split(filepath, separator)
|
||||
nodes, err := c.getNodes(ctx, cnrID, treeID, fileNameKV, path, keysToReturn)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("couldn't get nodes: %w", err)
|
||||
}
|
||||
|
||||
result := make([]*layer.NodeVersion, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
treeNode := newNode(node)
|
||||
|
||||
objIDStr, ok := treeNode.Get(oidKV)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
var objId oid.ID
|
||||
if err = objId.DecodeString(objIDStr); err != nil {
|
||||
return nil, fmt.Errorf("invalid object id '%s': %w", objIDStr, err)
|
||||
}
|
||||
|
||||
_, isUnversioned := treeNode.Get(isUnversionedKV)
|
||||
if onlyUnversioned && !isUnversioned {
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, &layer.NodeVersion{
|
||||
BaseNodeVersion: layer.BaseNodeVersion{
|
||||
ID: node.NodeId,
|
||||
OID: &objId,
|
||||
},
|
||||
IsUnversioned: isUnversioned,
|
||||
})
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) getParent(ctx context.Context, cnrID *cid.ID, treeID string, id uint64) (uint64, error) {
|
||||
request := &tree.GetSubTreeRequest{
|
||||
Body: &tree.GetSubTreeRequest_Body{
|
||||
ContainerId: []byte(cnrID.EncodeToString()),
|
||||
TreeId: treeID,
|
||||
RootId: id,
|
||||
},
|
||||
}
|
||||
|
||||
cli, err := c.service.GetSubTree(ctx, request)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get sub tree client: %w", err)
|
||||
}
|
||||
|
||||
resp, err := cli.Recv()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get sub tree: %w", err)
|
||||
}
|
||||
|
||||
return resp.GetBody().GetParentId(), nil
|
||||
}
|
||||
|
||||
func metaFromSettings(settings *data.BucketSettings) map[string]string {
|
||||
results := make(map[string]string, 3)
|
||||
|
||||
|
@ -264,6 +415,21 @@ func (c *TreeClient) addNode(ctx context.Context, cnrID *cid.ID, treeID string,
|
|||
return resp.GetBody().GetNodeId(), nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) addNodeByPath(ctx context.Context, cnrID *cid.ID, treeID string, path []string, meta map[string]string) error {
|
||||
request := &tree.AddByPathRequest{
|
||||
Body: &tree.AddByPathRequest_Body{
|
||||
ContainerId: []byte(cnrID.EncodeToString()),
|
||||
TreeId: treeID,
|
||||
Path: path,
|
||||
Meta: metaToKV(meta),
|
||||
PathAttribute: fileNameKV,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := c.service.AddByPath(ctx, request)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TreeClient) moveNode(ctx context.Context, cnrID *cid.ID, treeID string, nodeID, parentID uint64, meta map[string]string) error {
|
||||
request := &tree.MoveRequest{
|
||||
Body: &tree.MoveRequest_Body{
|
||||
|
|
Loading…
Reference in a new issue