swift: large file upload fixes
* Read metadata in file listing for 0 length files to fix syncs * Ignore non-existent files in isManifestFile to fix errors on copy * remove nsToSwiftFloatString - experiments with the swift program indicate that it puts a variable number of points after the decimal, so might as well use the one in the swift library. * Make sure segments get deleted properly when move from segmented to non segmented and vice versa * Use internal list routine to detect errors on listing * Remove the _segments container if possible * Remove manifest first when deleting
This commit is contained in:
parent
cc7b9af50e
commit
4a0a42c2f1
1 changed files with 130 additions and 81 deletions
211
swift/swift.go
211
swift/swift.go
|
@ -66,10 +66,11 @@ func init() {
|
|||
|
||||
// FsSwift represents a remote swift server
|
||||
type FsSwift struct {
|
||||
name string // name of this remote
|
||||
c swift.Connection // the connection to the swift server
|
||||
container string // the container we are working on
|
||||
root string // the path we are working on if any
|
||||
name string // name of this remote
|
||||
c swift.Connection // the connection to the swift server
|
||||
container string // the container we are working on
|
||||
segmentsContainer string // container to store the segments (if any) in
|
||||
root string // the path we are working on if any
|
||||
}
|
||||
|
||||
// FsObjectSwift describes a swift object
|
||||
|
@ -163,10 +164,11 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||
return nil, err
|
||||
}
|
||||
f := &FsSwift{
|
||||
name: name,
|
||||
c: *c,
|
||||
container: container,
|
||||
root: directory,
|
||||
name: name,
|
||||
c: *c,
|
||||
container: container,
|
||||
segmentsContainer: container + "_segments",
|
||||
root: directory,
|
||||
}
|
||||
if f.root != "" {
|
||||
f.root += "/"
|
||||
|
@ -216,21 +218,25 @@ func (f *FsSwift) NewFsObject(remote string) fs.Object {
|
|||
return f.newFsObjectWithInfo(remote, nil)
|
||||
}
|
||||
|
||||
// list the objects into the function supplied
|
||||
// listFn is called from list and listContainerRoot to handle an object
|
||||
type listFn func(string, *swift.Object) error
|
||||
|
||||
// listContainerRoot lists the objects into the function supplied from
|
||||
// the container and root supplied
|
||||
//
|
||||
// If directories is set it only sends directories
|
||||
func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) {
|
||||
func (f *FsSwift) listContainerRoot(container, root string, directories bool, fn listFn) error {
|
||||
// Options for ObjectsWalk
|
||||
opts := swift.ObjectsOpts{
|
||||
Prefix: f.root,
|
||||
Prefix: root,
|
||||
Limit: 256,
|
||||
}
|
||||
if directories {
|
||||
opts.Delimiter = '/'
|
||||
}
|
||||
rootLength := len(f.root)
|
||||
err := f.c.ObjectsWalk(f.container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
||||
objects, err := f.c.Objects(f.container, opts)
|
||||
rootLength := len(root)
|
||||
return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
||||
objects, err := f.c.Objects(container, opts)
|
||||
if err == nil {
|
||||
for i := range objects {
|
||||
object := &objects[i]
|
||||
|
@ -241,16 +247,26 @@ func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) {
|
|||
}
|
||||
object.Name = object.Name[:len(object.Name)-1]
|
||||
}
|
||||
if !strings.HasPrefix(object.Name, f.root) {
|
||||
if !strings.HasPrefix(object.Name, root) {
|
||||
fs.Log(f, "Odd name received %q", object.Name)
|
||||
continue
|
||||
}
|
||||
remote := object.Name[rootLength:]
|
||||
fn(remote, object)
|
||||
err = fn(remote, object)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return objects, err
|
||||
})
|
||||
}
|
||||
|
||||
// list the objects into the function supplied
|
||||
//
|
||||
// If directories is set it only sends directories
|
||||
func (f *FsSwift) list(directories bool, fn listFn) {
|
||||
err := f.listContainerRoot(f.container, f.root, directories, fn)
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.ErrorLog(f, "Couldn't read container %q: %s", f.container, err)
|
||||
|
@ -269,10 +285,18 @@ func (f *FsSwift) List() fs.ObjectsChan {
|
|||
// List the objects
|
||||
go func() {
|
||||
defer close(out)
|
||||
f.list(false, func(remote string, object *swift.Object) {
|
||||
if fs := f.newFsObjectWithInfo(remote, object); fs != nil {
|
||||
out <- fs
|
||||
f.list(false, func(remote string, object *swift.Object) error {
|
||||
if o := f.newFsObjectWithInfo(remote, object); o != nil {
|
||||
// Do full metadata read on 0 size objects which might be manifest files
|
||||
if o.Size() == 0 {
|
||||
err := o.(*FsObjectSwift).readMetaData()
|
||||
if err != nil {
|
||||
fs.Debug(o, "Failed to read metadata: %v", err)
|
||||
}
|
||||
}
|
||||
out <- o
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
@ -304,12 +328,13 @@ func (f *FsSwift) ListDir() fs.DirChan {
|
|||
// List the directories in the path in the container
|
||||
go func() {
|
||||
defer close(out)
|
||||
f.list(true, func(remote string, object *swift.Object) {
|
||||
f.list(true, func(remote string, object *swift.Object) error {
|
||||
out <- &fs.Dir{
|
||||
Name: remote,
|
||||
Bytes: object.Bytes,
|
||||
Count: 0,
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
@ -394,7 +419,7 @@ func (o *FsObjectSwift) Md5sum() (string, error) {
|
|||
return "", err
|
||||
}
|
||||
if isManifest {
|
||||
fs.Debug(o, "Return empty md5 for swift manifest file. Md5 of manifest file calculate as md5 of md5 of it's parts, so it's not original md5")
|
||||
fs.Debug(o, "Returning empty Md5sum for swift manifest file")
|
||||
return "", nil
|
||||
}
|
||||
return strings.ToLower(o.info.Hash), nil
|
||||
|
@ -404,6 +429,9 @@ func (o *FsObjectSwift) Md5sum() (string, error) {
|
|||
func (o *FsObjectSwift) isManifestFile() (bool, error) {
|
||||
err := o.readMetaData()
|
||||
if err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
_, isManifestFile := (*o.headers)["X-Object-Manifest"]
|
||||
|
@ -491,76 +519,100 @@ func min(x, y int64) int64 {
|
|||
return y
|
||||
}
|
||||
|
||||
// nsToSwiftFloatString turns a number of ns into a floating point
|
||||
// string in seconds the same way as the "swift" tool
|
||||
func nsToSwiftFloatString(ns int64) string {
|
||||
if ns < 0 {
|
||||
return "-" + nsToSwiftFloatString(-ns)
|
||||
// removeSegments removes any old segments from o
|
||||
//
|
||||
// if except is passed in then segments with that prefix won't be deleted
|
||||
func (o *FsObjectSwift) removeSegments(except string) error {
|
||||
segmentsRoot := o.swift.root + o.remote + "/"
|
||||
err := o.swift.listContainerRoot(o.swift.segmentsContainer, segmentsRoot, false, func(remote string, object *swift.Object) error {
|
||||
if except != "" && strings.HasPrefix(remote, except) {
|
||||
// fs.Debug(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, o.swift.segmentsContainer)
|
||||
return nil
|
||||
}
|
||||
segmentPath := segmentsRoot + remote
|
||||
fs.Debug(o, "Removing segment file %q in container %q", segmentPath, o.swift.segmentsContainer)
|
||||
return o.swift.c.ObjectDelete(o.swift.segmentsContainer, segmentPath)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result := fmt.Sprintf("%010d", ns)
|
||||
split := len(result) - 9
|
||||
result, decimals := result[:split], result[split:split+2]
|
||||
if decimals != "" {
|
||||
result += "."
|
||||
result += decimals
|
||||
// remove the segments container if empty, ignore errors
|
||||
err = o.swift.c.ContainerDelete(o.swift.segmentsContainer)
|
||||
if err == nil {
|
||||
fs.Debug(o, "Removed empty container %q", o.swift.segmentsContainer)
|
||||
}
|
||||
return result
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateChunks updates the existing object using chunks to a separate
|
||||
// container. It returns a string which prefixes current segments.
|
||||
func (o *FsObjectSwift) updateChunks(in io.Reader, headers swift.Headers, size int64) (string, error) {
|
||||
// Create the segmentsContainer if it doesn't exist
|
||||
err := o.swift.c.ContainerCreate(o.swift.segmentsContainer, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// Upload the chunks
|
||||
left := size
|
||||
i := 0
|
||||
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
|
||||
segmentsPath := fmt.Sprintf("%s%s/%s", o.swift.root, o.remote, uniquePrefix)
|
||||
for left > 0 {
|
||||
n := min(left, int64(chunkSize))
|
||||
segmentReader := io.LimitReader(in, n)
|
||||
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
||||
fs.Debug(o, "Uploading segment file %q into %q", segmentPath, o.swift.segmentsContainer)
|
||||
_, err := o.swift.c.ObjectPut(o.swift.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
left -= n
|
||||
i++
|
||||
}
|
||||
// Upload the manifest
|
||||
headers["X-Object-Manifest"] = fmt.Sprintf("%s/%s", o.swift.segmentsContainer, segmentsPath)
|
||||
emptyReader := bytes.NewReader(nil)
|
||||
manifestName := o.swift.root + o.remote
|
||||
_, err = o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", headers)
|
||||
return uniquePrefix + "/", err
|
||||
}
|
||||
|
||||
// Update the object with the contents of the io.Reader, modTime and size
|
||||
//
|
||||
// The new object may have been created if an error is returned
|
||||
func (o *FsObjectSwift) Update(in io.Reader, modTime time.Time, size int64) error {
|
||||
// Note whether this has a manifest before starting
|
||||
isManifest, err := o.isManifestFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the mtime
|
||||
m := swift.Metadata{}
|
||||
m.SetModTime(modTime)
|
||||
headers := m.ObjectHeaders()
|
||||
uniquePrefix := ""
|
||||
if size > int64(chunkSize) {
|
||||
segmentsContainerName := o.swift.container + "_segments"
|
||||
left := size
|
||||
i := 0
|
||||
nowFloat := nsToSwiftFloatString(time.Now().UnixNano())
|
||||
for left > 0 {
|
||||
n := min(left, int64(chunkSize))
|
||||
segmentReader := io.LimitReader(in, n)
|
||||
segmentPath := fmt.Sprintf("%s%s/%s/%d/%08d", o.swift.root, o.remote, nowFloat, size, i)
|
||||
_, err := o.swift.c.ObjectPut(segmentsContainerName, segmentPath, segmentReader, true, "", "", m.ObjectHeaders())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
left -= n
|
||||
i++
|
||||
}
|
||||
manifestHeaders := swift.Headers{"X-Object-Manifest": fmt.Sprintf("%s/%s%s/%s/%d", segmentsContainerName, o.swift.root, o.remote, nowFloat, size)}
|
||||
for k, v := range m.ObjectHeaders() {
|
||||
manifestHeaders[k] = v
|
||||
}
|
||||
emptyReader := bytes.NewReader(nil)
|
||||
manifestName := o.swift.root + o.remote
|
||||
_, err := o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", manifestHeaders)
|
||||
uniquePrefix, err = o.updateChunks(in, headers, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// remove old segments
|
||||
segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote)
|
||||
segmentsFs, err := NewFs(o.swift.name, segmentsPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for o := range segmentsFs.List() {
|
||||
if !strings.HasPrefix(o.Remote(), nowFloat) {
|
||||
fs.Log(o, "Remove old file segment '%s'", o.Remote())
|
||||
err := o.Remove()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
headers["X-Object-Manifest"] = "" // remove manifest
|
||||
_, err := o.swift.c.ObjectPut(o.swift.container, o.swift.root+o.remote, in, true, "", "", m.ObjectHeaders())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If file was a manifest then remove old/all segments
|
||||
if isManifest {
|
||||
err = o.removeSegments(uniquePrefix)
|
||||
if err != nil {
|
||||
fs.Log(o, "Failed to remove old segments - carrying on with upload: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Read the metadata from the newly created object
|
||||
o.headers = nil // wipe old metadata
|
||||
return o.readMetaData()
|
||||
|
@ -572,22 +624,19 @@ func (o *FsObjectSwift) Remove() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Remove file/manifest first
|
||||
err = o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// ...then segments if required
|
||||
if isManifestFile {
|
||||
// remove segments
|
||||
segmentsContainerName := o.swift.container + "_segments"
|
||||
segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote)
|
||||
segmentsFs, err := NewFs(o.swift.name, segmentsPath)
|
||||
err = o.removeSegments("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for o := range segmentsFs.List() {
|
||||
err := o.Remove()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
|
|
Loading…
Reference in a new issue