Do not use Swift server side copy for manifests to handle >5G files
Signed-off-by: Sylvain Baubeau <sbaubeau@redhat.com>
This commit is contained in:
parent
000dec3c6f
commit
52d28ec81a
1 changed files with 76 additions and 27 deletions
|
@ -20,7 +20,10 @@ package swift
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/sha1"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -237,6 +240,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
paddingReader io.Reader
|
paddingReader io.Reader
|
||||||
currentLength int64
|
currentLength int64
|
||||||
cursor int64
|
cursor int64
|
||||||
|
segmentPath string
|
||||||
)
|
)
|
||||||
|
|
||||||
partNumber := 1
|
partNumber := 1
|
||||||
|
@ -244,7 +248,7 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
zeroBuf := make([]byte, d.ChunkSize)
|
zeroBuf := make([]byte, d.ChunkSize)
|
||||||
|
|
||||||
getSegment := func() string {
|
getSegment := func() string {
|
||||||
return fmt.Sprintf("%s/%016d", d.swiftSegmentPath(path), partNumber)
|
return fmt.Sprintf("%s/%016d", segmentPath, partNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
max := func(a int64, b int64) int64 {
|
max := func(a int64, b int64) int64 {
|
||||||
|
@ -254,24 +258,36 @@ func (d *driver) WriteStream(ctx context.Context, path string, offset int64, rea
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
info, _, err := d.Conn.Object(d.Container, d.swiftPath(path))
|
createManifest := true
|
||||||
if err != nil {
|
info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
|
||||||
if err == swift.ObjectNotFound {
|
if err == nil {
|
||||||
// Create a object manifest
|
manifest, ok := headers["X-Object-Manifest"]
|
||||||
manifest, err := d.createManifest(path)
|
if !ok {
|
||||||
if err != nil {
|
if segmentPath, err = d.swiftSegmentPath(path); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
manifest.Close()
|
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegment()); err != nil {
|
||||||
} else if err == swift.ContainerNotFound {
|
|
||||||
return 0, storagedriver.PathNotFoundError{Path: path}
|
|
||||||
} else {
|
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
segments = append(segments, info)
|
||||||
} else {
|
} else {
|
||||||
// The manifest already exists. Get all the segments
|
_, segmentPath = parseManifest(manifest)
|
||||||
|
if segments, err = d.getAllSegments(segmentPath); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
createManifest = false
|
||||||
|
}
|
||||||
currentLength = info.Bytes
|
currentLength = info.Bytes
|
||||||
if segments, err = d.getAllSegments(path); err != nil {
|
} else if err == swift.ObjectNotFound {
|
||||||
|
if segmentPath, err = d.swiftSegmentPath(path); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if createManifest {
|
||||||
|
if err := d.createManifest(path, d.Container+"/"+segmentPath); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -468,8 +484,18 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||||
// object.
|
// object.
|
||||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||||
err := d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
|
_, headers, err := d.Conn.Object(d.Container, d.swiftPath(sourcePath))
|
||||||
if err == swift.ContainerNotFound || err == swift.ObjectNotFound {
|
if err == nil {
|
||||||
|
if manifest, ok := headers["X-Object-Manifest"]; ok {
|
||||||
|
if err = d.createManifest(destPath, manifest); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = d.Conn.ObjectDelete(d.Container, d.swiftPath(sourcePath))
|
||||||
|
} else {
|
||||||
|
err = d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err == swift.ObjectNotFound {
|
||||||
return storagedriver.PathNotFoundError{Path: sourcePath}
|
return storagedriver.PathNotFoundError{Path: sourcePath}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -509,9 +535,8 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
||||||
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
|
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
|
||||||
manifest, ok := headers["X-Object-Manifest"]
|
manifest, ok := headers["X-Object-Manifest"]
|
||||||
if ok {
|
if ok {
|
||||||
components := strings.SplitN(manifest, "/", 2)
|
segContainer, prefix := parseManifest(manifest)
|
||||||
segContainer := components[0]
|
segments, err := d.getAllSegments(prefix)
|
||||||
segments, err := d.getAllSegments(components[1])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -566,8 +591,14 @@ func (d *driver) swiftPath(path string) string {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) swiftSegmentPath(path string) string {
|
func (d *driver) swiftSegmentPath(path string) (string, error) {
|
||||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments"+path, "/"), "/")
|
checksum := sha1.New()
|
||||||
|
random := make([]byte, 32)
|
||||||
|
if _, err := rand.Read(random); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
|
||||||
|
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) getContentType() string {
|
func (d *driver) getContentType() string {
|
||||||
|
@ -575,21 +606,30 @@ func (d *driver) getContentType() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
||||||
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: d.swiftSegmentPath(path)})
|
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
|
||||||
if err == swift.ContainerNotFound {
|
if err == swift.ContainerNotFound {
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
return segments, err
|
return segments, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) createManifest(path string) (*swift.ObjectCreateFile, error) {
|
func (d *driver) createManifest(path string, segments string) error {
|
||||||
headers := make(swift.Headers)
|
headers := make(swift.Headers)
|
||||||
headers["X-Object-Manifest"] = d.Container + "/" + d.swiftSegmentPath(path)
|
headers["X-Object-Manifest"] = segments
|
||||||
file, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", d.getContentType(), headers)
|
manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", d.getContentType(), headers)
|
||||||
if err == swift.ContainerNotFound {
|
if err != nil {
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
if err == swift.ObjectNotFound {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
}
|
||||||
return file, err
|
return err
|
||||||
|
}
|
||||||
|
if err := manifest.Close(); err != nil {
|
||||||
|
if err == swift.ObjectNotFound {
|
||||||
|
return storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func detectBulkDelete(authURL string) (bulkDelete bool) {
|
func detectBulkDelete(authURL string) (bulkDelete bool) {
|
||||||
|
@ -604,3 +644,12 @@ func detectBulkDelete(authURL string) (bulkDelete bool) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseManifest(manifest string) (container string, prefix string) {
|
||||||
|
components := strings.SplitN(manifest, "/", 2)
|
||||||
|
container = components[0]
|
||||||
|
if len(components) > 1 {
|
||||||
|
prefix = components[1]
|
||||||
|
}
|
||||||
|
return container, prefix
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue