[#377] Reuse BucketInfo in layer

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-18 16:04:09 +03:00 committed by Angira Kekteeva
parent 46e4b28489
commit f0914b8a43
23 changed files with 341 additions and 521 deletions

View file

@ -180,13 +180,9 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket", reqInfo, err)
return
}
if err = checkOwner(bktInfo, r.Header.Get(api.AmzExpectedBucketOwner)); err != nil {
h.logAndSendError(w, "expected owner doesn't match", reqInfo, err)
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
@ -202,11 +198,11 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}
params := &layer.PutObjectParams{
Bucket: reqInfo.BucketName,
Object: reqInfo.ObjectName,
Reader: r.Body,
Size: r.ContentLength,
Header: metadata,
BktInfo: bktInfo,
Object: reqInfo.ObjectName,
Reader: r.Body,
Size: r.ContentLength,
Header: metadata,
}
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
@ -228,7 +224,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}
if containsACLHeaders(r) {
if newEaclTable, err = h.getNewEAclTable(r, info); err != nil {
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, info); err != nil {
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
return
}
@ -243,8 +239,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
if newEaclTable != nil {
p := &layer.PutBucketACLParams{
Name: reqInfo.BucketName,
EACL: newEaclTable,
BktInfo: bktInfo,
EACL: newEaclTable,
}
if err = h.obj.PutBucketACL(r.Context(), p); err != nil {
@ -253,7 +249,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
}
}
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
if settings, err = h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
} else if settings.VersioningEnabled {
w.Header().Set(api.AmzVersionID, info.Version())
@ -306,12 +302,18 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
return
}
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
params := &layer.PutObjectParams{
Bucket: reqInfo.BucketName,
Object: reqInfo.ObjectName,
Reader: contentReader,
Size: size,
Header: metadata,
BktInfo: bktInfo,
Object: reqInfo.ObjectName,
Reader: contentReader,
Size: size,
Header: metadata,
}
info, err := h.obj.PutObject(r.Context(), params)
@ -326,7 +328,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
r.Header.Set(api.AmzGrantWrite, "")
r.Header.Set(api.AmzGrantRead, "")
if newEaclTable, err = h.getNewEAclTable(r, info); err != nil {
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, info); err != nil {
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
return
}
@ -341,8 +343,8 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
if newEaclTable != nil {
p := &layer.PutBucketACLParams{
Name: reqInfo.BucketName,
EACL: newEaclTable,
BktInfo: bktInfo,
EACL: newEaclTable,
}
if err = h.obj.PutBucketACL(r.Context(), p); err != nil {
@ -351,12 +353,6 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
}
}
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
} else if settings.VersioningEnabled {
@ -446,7 +442,7 @@ func containsACLHeaders(r *http.Request) bool {
r.Header.Get(api.AmzGrantFullControl) != "" || r.Header.Get(api.AmzGrantWrite) != ""
}
func (h *handler) getNewEAclTable(r *http.Request, objInfo *data.ObjectInfo) (*eacl.Table, error) {
func (h *handler) getNewEAclTable(r *http.Request, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo) (*eacl.Table, error) {
var newEaclTable *eacl.Table
gateKey, err := h.gateKey(r.Context())
if err != nil {
@ -473,7 +469,7 @@ func (h *handler) getNewEAclTable(r *http.Request, objInfo *data.ObjectInfo) (*e
return nil, fmt.Errorf("could not translate policy to ast: %w", err)
}
bacl, err := h.obj.GetBucketACL(r.Context(), objInfo.Bucket)
bacl, err := h.obj.GetBucketACL(r.Context(), bktInfo)
if err != nil {
return nil, fmt.Errorf("could not get bucket eacl: %w", err)
}
@ -585,32 +581,25 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
p.ObjectLockEnabled = isLockEnabled(r.Header)
cid, err := h.obj.CreateBucket(r.Context(), &p)
bktInfo, err := h.obj.CreateBucket(r.Context(), &p)
if err != nil {
h.logAndSendError(w, "could not create bucket", reqInfo, err)
return
}
if p.ObjectLockEnabled {
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could get bucket info", reqInfo, err)
return
}
sp := &layer.PutSettingsParams{
BktInfo: bktInfo,
Settings: &data.BucketSettings{VersioningEnabled: true},
}
if err = h.obj.PutBucketSettings(r.Context(), sp); err != nil {
h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err,
zap.Stringer("container_id", cid))
zap.Stringer("container_id", bktInfo.CID))
return
}
}
h.log.Info("bucket is created",
zap.String("container_id", cid.String()))
h.log.Info("bucket is created", zap.Stringer("container_id", bktInfo.CID))
api.WriteSuccessResponseHeadersOnly(w)
}