detect outdated container listings during Stat() and getAllSegments()
Signed-off-by: Stefan Majewsky <stefan.majewsky@sap.com>
This commit is contained in:
parent
2a3d48fb82
commit
84aa48b56c
1 changed files with 64 additions and 20 deletions
|
@ -335,7 +335,7 @@ func (d *driver) Writer(ctx context.Context, path string, append bool) (storaged
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, segmentPath(segmentsPath, len(segments))); err != nil {
|
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegmentPath(segmentsPath, len(segments))); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
segments = []swift.Object{info}
|
segments = []swift.Object{info}
|
||||||
|
@ -376,23 +376,26 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo,
|
||||||
fi.IsDir = true
|
fi.IsDir = true
|
||||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||||
} else if obj.Name == swiftPath {
|
} else if obj.Name == swiftPath {
|
||||||
// On Swift 1.12, the 'bytes' field is always 0
|
// The file exists. But on Swift 1.12, the 'bytes' field is always 0 so
|
||||||
// so we need to do a second HEAD request
|
// we need to do a separate HEAD request.
|
||||||
info, _, err := d.Conn.Object(d.Container, swiftPath)
|
break
|
||||||
if err != nil {
|
|
||||||
if err == swift.ObjectNotFound {
|
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
fi.IsDir = false
|
|
||||||
fi.Size = info.Bytes
|
|
||||||
fi.ModTime = info.LastModified
|
|
||||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
//Don't trust an empty `objects` slice. A container listing can be
|
||||||
|
//outdated. For files, we can make a HEAD request on the object which
|
||||||
|
//reports existence (at least) much more reliably.
|
||||||
|
info, _, err := d.Conn.Object(d.Container, swiftPath)
|
||||||
|
if err != nil {
|
||||||
|
if err == swift.ObjectNotFound {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fi.IsDir = false
|
||||||
|
fi.Size = info.Bytes
|
||||||
|
fi.ModTime = info.LastModified
|
||||||
|
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of the objects that are direct descendants of the given path.
|
// List returns a list of the objects that are direct descendants of the given path.
|
||||||
|
@ -589,11 +592,52 @@ func (d *driver) swiftSegmentPath(path string) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
||||||
|
//a simple container listing works 99.9% of the time
|
||||||
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
|
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
|
||||||
if err == swift.ContainerNotFound {
|
if err != nil {
|
||||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
if err == swift.ContainerNotFound {
|
||||||
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
//build a lookup table by object name
|
||||||
|
hasObjectName := make(map[string]struct{})
|
||||||
|
for _, segment := range segments {
|
||||||
|
hasObjectName[segment.Name] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
//The container listing might be outdated (i.e. not contain all existing
|
||||||
|
//segment objects yet) because of temporary inconsistency (Swift is only
|
||||||
|
//eventually consistent!). Check its completeness.
|
||||||
|
segmentNumber := 0
|
||||||
|
for {
|
||||||
|
segmentNumber++
|
||||||
|
segmentPath := getSegmentPath(path, segmentNumber)
|
||||||
|
|
||||||
|
if _, seen := hasObjectName[segmentPath]; seen {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
//This segment is missing in the container listing. Use a more reliable
|
||||||
|
//request to check its existence. (HEAD requests on segments are
|
||||||
|
//guaranteed to return the correct metadata, except for the pathological
|
||||||
|
//case of an outage of large parts of the Swift cluster or its network,
|
||||||
|
//since every segment is only written once.)
|
||||||
|
segment, _, err := d.Conn.Object(d.Container, segmentPath)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
//found new segment -> keep going, more might be missing
|
||||||
|
segments = append(segments, segment)
|
||||||
|
continue
|
||||||
|
case swift.ObjectNotFound:
|
||||||
|
//This segment is missing. Since we upload segments sequentially,
|
||||||
|
//there won't be any more segments after it.
|
||||||
|
return segments, nil
|
||||||
|
default:
|
||||||
|
return nil, err //unexpected error
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return segments, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) createManifest(path string, segments string) error {
|
func (d *driver) createManifest(path string, segments string) error {
|
||||||
|
@ -632,7 +676,7 @@ func generateSecret() (string, error) {
|
||||||
return hex.EncodeToString(secretBytes[:]), nil
|
return hex.EncodeToString(secretBytes[:]), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func segmentPath(segmentsPath string, partNumber int) string {
|
func getSegmentPath(segmentsPath string, partNumber int) string {
|
||||||
return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
|
return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -769,7 +813,7 @@ func (sw *segmentWriter) Write(p []byte) (int, error) {
|
||||||
if offset+chunkSize > len(p) {
|
if offset+chunkSize > len(p) {
|
||||||
chunkSize = len(p) - offset
|
chunkSize = len(p) - offset
|
||||||
}
|
}
|
||||||
_, err := sw.conn.ObjectPut(sw.container, segmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
|
_, err := sw.conn.ObjectPut(sw.container, getSegmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue