forked from TrueCloudLab/frostfs-http-gw
parent
9475786df8
commit
5080b43a04
7 changed files with 111 additions and 67 deletions
31
app.go
31
app.go
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/fasthttp/router"
|
||||
"github.com/nspcc-dev/neo-go/cli/flags"
|
||||
"github.com/nspcc-dev/neo-go/cli/input"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/nspcc-dev/neofs-http-gw/downloader"
|
||||
|
@ -92,7 +93,13 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
if err != nil {
|
||||
a.log.Fatal("failed to get neofs credentials", zap.Error(err))
|
||||
}
|
||||
pb := new(pool.Builder)
|
||||
|
||||
var prm pool.InitParameters
|
||||
prm.SetKey(key)
|
||||
prm.SetNodeDialTimeout(a.cfg.GetDuration(cfgConTimeout))
|
||||
prm.SetHealthcheckTimeout(a.cfg.GetDuration(cfgReqTimeout))
|
||||
prm.SetClientRebalanceInterval(a.cfg.GetDuration(cfgRebalance))
|
||||
|
||||
for i := 0; ; i++ {
|
||||
address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address")
|
||||
weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight")
|
||||
|
@ -106,20 +113,20 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
if priority <= 0 { // unspecified or wrong
|
||||
priority = 1
|
||||
}
|
||||
pb.AddNode(address, priority, weight)
|
||||
prm.AddNode(pool.NewNodeParam(priority, address, weight))
|
||||
a.log.Info("add connection", zap.String("address", address),
|
||||
zap.Float64("weight", weight), zap.Int("priority", priority))
|
||||
}
|
||||
opts := &pool.BuilderOptions{
|
||||
Key: key,
|
||||
NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout),
|
||||
NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout),
|
||||
ClientRebalanceInterval: a.cfg.GetDuration(cfgRebalance),
|
||||
}
|
||||
a.pool, err = pb.Build(ctx, opts)
|
||||
|
||||
a.pool, err = pool.NewPool(prm)
|
||||
if err != nil {
|
||||
a.log.Fatal("failed to create connection pool", zap.Error(err))
|
||||
}
|
||||
|
||||
err = a.pool.Dial(ctx)
|
||||
if err != nil {
|
||||
a.log.Fatal("failed to dial pool", zap.Error(err))
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
|
@ -127,7 +134,11 @@ func getNeoFSKey(a *app) (*ecdsa.PrivateKey, error) {
|
|||
walletPath := a.cfg.GetString(cmdWallet)
|
||||
if len(walletPath) == 0 {
|
||||
a.log.Info("no wallet path specified, creating ephemeral key automatically for this run")
|
||||
return pool.NewEphemeralKey()
|
||||
key, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &key.PrivateKey, nil
|
||||
}
|
||||
w, err := wallet.NewWalletFromFile(walletPath)
|
||||
if err != nil {
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/token"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -97,7 +98,11 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress *address.Address) {
|
|||
return
|
||||
}
|
||||
|
||||
rObj, err := clnt.GetObject(r.RequestCtx, *objectAddress, bearerOpts(r.RequestCtx))
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(*objectAddress)
|
||||
prm.UseBearer(bearerToken(r.RequestCtx))
|
||||
|
||||
rObj, err := clnt.GetObject(r.RequestCtx, prm)
|
||||
if err != nil {
|
||||
r.handleNeoFSErr(err, start)
|
||||
return
|
||||
|
@ -195,11 +200,11 @@ func systemBackwardTranslator(key string) string {
|
|||
return res.String()
|
||||
}
|
||||
|
||||
func bearerOpts(ctx context.Context) pool.CallOption {
|
||||
func bearerToken(ctx context.Context) *token.BearerToken {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||
return pool.WithBearer(tkn)
|
||||
return tkn
|
||||
}
|
||||
return pool.WithBearer(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *request) handleNeoFSErr(err error, start time.Time) {
|
||||
|
@ -335,7 +340,12 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string
|
|||
filters.AddRootFilter()
|
||||
filters.AddFilter(key, val, op)
|
||||
|
||||
return d.pool.SearchObjects(c, *cid, filters)
|
||||
var prm pool.PrmObjectSearch
|
||||
prm.SetContainerID(*cid)
|
||||
prm.SetFilters(filters)
|
||||
prm.UseBearer(bearerToken(c))
|
||||
|
||||
return d.pool.SearchObjects(c, prm)
|
||||
}
|
||||
|
||||
// DownloadZipped handles zip by prefix requests.
|
||||
|
@ -385,7 +395,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
|
||||
addr.SetContainerID(containerID)
|
||||
|
||||
optBearer := bearerOpts(c)
|
||||
btoken := bearerToken(c)
|
||||
empty := true
|
||||
called := false
|
||||
|
||||
|
@ -400,7 +410,11 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
|
||||
addr.SetObjectID(&id)
|
||||
|
||||
resGet, err = d.pool.GetObject(c, addr, optBearer)
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(addr)
|
||||
prm.UseBearer(btoken)
|
||||
|
||||
resGet, err = d.pool.GetObject(c, prm)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("get NeoFS object: %v", err)
|
||||
return true
|
||||
|
|
|
@ -33,8 +33,13 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) {
|
|||
return
|
||||
}
|
||||
|
||||
bearerOpt := bearerOpts(r.RequestCtx)
|
||||
obj, err := clnt.HeadObject(r.RequestCtx, *objectAddress, bearerOpt)
|
||||
btoken := bearerToken(r.RequestCtx)
|
||||
|
||||
var prm pool.PrmObjectHead
|
||||
prm.SetAddress(*objectAddress)
|
||||
prm.UseBearer(btoken)
|
||||
|
||||
obj, err := clnt.HeadObject(r.RequestCtx, prm)
|
||||
if err != nil {
|
||||
r.handleNeoFSErr(err, start)
|
||||
return
|
||||
|
@ -69,7 +74,12 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) {
|
|||
|
||||
if len(contentType) == 0 {
|
||||
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||
return clnt.ObjectRange(r.RequestCtx, *objectAddress, 0, sz, bearerOpt)
|
||||
var prmRange pool.PrmObjectRange
|
||||
prmRange.SetAddress(*objectAddress)
|
||||
prmRange.SetLength(sz)
|
||||
prmRange.UseBearer(btoken)
|
||||
|
||||
return clnt.ObjectRange(r.RequestCtx, prmRange)
|
||||
})
|
||||
if err != nil && err != io.EOF {
|
||||
r.handleNeoFSErr(err, start)
|
||||
|
|
2
go.mod
2
go.mod
|
@ -10,7 +10,7 @@ require (
|
|||
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
|
||||
github.com/nspcc-dev/neo-go v0.98.0
|
||||
github.com/nspcc-dev/neofs-api-go/v2 v2.12.1
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/prometheus/common v0.29.0
|
||||
github.com/prometheus/procfs v0.7.1 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -596,8 +596,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BE
|
|||
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
||||
github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
||||
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20211201182451-a5b61c4f6477/go.mod h1:dfMtQWmBHYpl9Dez23TGtIUKiFvCIxUZq/CkSIhEpz4=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3 h1:ofaiKPYY67a0cQMF+YSChDO48SBQtWlpZnK++cAeqQM=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3/go.mod h1:0hTXmyJnbw8j4BR1oltN7mFIIrVp1IFLdh8qBzAR464=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d h1:OHyq8+zyQtARFWj3quRPabcfQWJZEiU7HYp6QGCSjaM=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.3.0.20220407103316-e50e6d28280d/go.mod h1:Hl7a1l0ntZ4b1ZABpGX6fuAuFS3c6+hyMCUNVvZv/w4=
|
||||
github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
||||
|
|
|
@ -119,7 +119,10 @@ func simplePut(ctx context.Context, t *testing.T, clientPool *pool.Pool, CID *ci
|
|||
|
||||
payload := bytes.NewBuffer(nil)
|
||||
|
||||
res, err := clientPool.GetObject(ctx, *objectAddress)
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(*objectAddress)
|
||||
|
||||
res, err := clientPool.GetObject(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = io.Copy(payload, res.Payload)
|
||||
|
@ -272,15 +275,15 @@ func getDefaultConfig() *viper.Viper {
|
|||
}
|
||||
|
||||
func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool {
|
||||
pb := new(pool.Builder)
|
||||
pb.AddNode("localhost:8080", 1, 1)
|
||||
var prm pool.InitParameters
|
||||
prm.SetKey(&key.PrivateKey)
|
||||
prm.SetNodeDialTimeout(5 * time.Second)
|
||||
prm.AddNode(pool.NewNodeParam(1, "localhost:8080", 1))
|
||||
|
||||
opts := &pool.BuilderOptions{
|
||||
Key: &key.PrivateKey,
|
||||
NodeConnectionTimeout: 5 * time.Second,
|
||||
NodeRequestTimeout: 5 * time.Second,
|
||||
}
|
||||
clientPool, err := pb.Build(ctx, opts)
|
||||
clientPool, err := pool.NewPool(prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = clientPool.Dial(ctx)
|
||||
require.NoError(t, err)
|
||||
return clientPool
|
||||
}
|
||||
|
@ -296,17 +299,20 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool) (
|
|||
container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)))
|
||||
cnr.SetOwnerID(clientPool.OwnerID())
|
||||
|
||||
CID, err := clientPool.PutContainer(ctx, cnr)
|
||||
var waitPrm pool.WaitParams
|
||||
waitPrm.SetTimeout(15 * time.Second)
|
||||
waitPrm.SetPollInterval(3 * time.Second)
|
||||
|
||||
var prm pool.PrmContainerPut
|
||||
prm.SetContainer(*cnr)
|
||||
prm.SetWaitParams(waitPrm)
|
||||
|
||||
CID, err := clientPool.PutContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Println(CID.String())
|
||||
|
||||
err = clientPool.WaitForContainerPresence(ctx, CID, &pool.ContainerPollingParams{
|
||||
CreationTimeout: 15 * time.Second,
|
||||
PollInterval: 3 * time.Second,
|
||||
})
|
||||
|
||||
return CID, err
|
||||
}
|
||||
|
||||
|
@ -324,7 +330,11 @@ func putObject(ctx context.Context, t *testing.T, clientPool *pool.Pool, CID *ci
|
|||
}
|
||||
obj.SetAttributes(attrs...)
|
||||
|
||||
id, err := clientPool.PutObject(ctx, *obj, bytes.NewBufferString(content))
|
||||
var prm pool.PrmObjectPut
|
||||
prm.SetHeader(*obj)
|
||||
prm.SetPayload(bytes.NewBufferString(content))
|
||||
|
||||
id, err := clientPool.PutObject(ctx, prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
return id
|
||||
|
|
|
@ -12,8 +12,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-http-gw/response"
|
||||
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
||||
"github.com/nspcc-dev/neofs-http-gw/utils"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -139,7 +137,12 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|||
ctx, cancel := context.WithCancel(c)
|
||||
defer cancel()
|
||||
|
||||
if idObj, err = u.pool.PutObject(ctx, *obj, file, pool.WithBearer(bt)); err != nil {
|
||||
var prm pool.PrmObjectPut
|
||||
prm.SetHeader(*obj)
|
||||
prm.SetPayload(file)
|
||||
prm.UseBearer(bt)
|
||||
|
||||
if idObj, err = u.pool.PutObject(ctx, prm); err != nil {
|
||||
log.Error("could not store file in neofs", zap.Error(err))
|
||||
response.Error(c, "could not store file in neofs", fasthttp.StatusBadRequest)
|
||||
return
|
||||
|
@ -198,14 +201,11 @@ func (pr *putResponse) encode(w io.Writer) error {
|
|||
}
|
||||
|
||||
func getEpochDurations(ctx context.Context, p *pool.Pool) (*epochDurations, error) {
|
||||
if conn, _, err := p.Connection(); err != nil {
|
||||
networkInfo, err := p.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if networkInfoRes, err := conn.NetworkInfo(ctx, client.PrmNetworkInfo{}); err != nil {
|
||||
return nil, err
|
||||
} else if err = apistatus.ErrFromStatus(networkInfoRes.Status()); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
networkInfo := networkInfoRes.Info()
|
||||
}
|
||||
|
||||
res := &epochDurations{
|
||||
currentEpoch: networkInfo.CurrentEpoch(),
|
||||
msPerBlock: networkInfo.MsPerBlock(),
|
||||
|
@ -224,7 +224,6 @@ func getEpochDurations(ctx context.Context, p *pool.Pool) (*epochDurations, erro
|
|||
return nil, fmt.Errorf("not found param: EpochDuration")
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
|
||||
func needParseExpiration(headers map[string]string) bool {
|
||||
|
|
Loading…
Reference in a new issue