EC put #1064
21 changed files with 802 additions and 92 deletions
12
Makefile
12
Makefile
|
@ -276,15 +276,19 @@ env-up: all
|
||||||
echo "Frostfs contracts not found"; exit 1; \
|
echo "Frostfs contracts not found"; exit 1; \
|
||||||
fi
|
fi
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph init --contracts ${FROSTFS_CONTRACTS_PATH}
|
||||||
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet.json --gas 10.0
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet01.json --gas 10.0
|
||||||
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet02.json --gas 10.0
|
||||||
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet03.json --gas 10.0
|
||||||
|
${BIN}/frostfs-adm --config ./dev/adm/frostfs-adm.yml morph refill-gas --storage-wallet ./dev/storage/wallet04.json --gas 10.0
|
||||||
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
|
@if [ ! -f "$(LOCODE_DB_PATH)" ]; then \
|
||||||
make locode-download; \
|
make locode-download; \
|
||||||
fi
|
fi
|
||||||
|
mkdir -p ./$(TMP_DIR)/state
|
||||||
|
mkdir -p ./$(TMP_DIR)/storage
|
||||||
|
|
||||||
# Shutdown dev environment
|
# Shutdown dev environment
|
||||||
env-down:
|
env-down:
|
||||||
docker compose -f dev/docker-compose.yml down
|
docker compose -f dev/docker-compose.yml down
|
||||||
docker volume rm -f frostfs-node_neo-go
|
docker volume rm -f frostfs-node_neo-go
|
||||||
rm -f ./.cache/.frostfs-ir-state
|
rm -rf ./$(TMP_DIR)/state
|
||||||
rm -f ./.cache/.frostfs-node-state
|
rm -rf ./$(TMP_DIR)/storage
|
||||||
rm -rf ./.cache/storage
|
|
||||||
|
|
|
@ -171,6 +171,15 @@ func printHeader(cmd *cobra.Command, obj *objectSDK.Object) error {
|
||||||
cmd.Printf(" signature: %s\n", hex.EncodeToString(sigV2.GetSign()))
|
cmd.Printf(" signature: %s\n", hex.EncodeToString(sigV2.GetSign()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
||||||
|
cmd.Print("EC header:\n")
|
||||||
|
|
||||||
|
cmd.Printf(" parent object ID: %s\n", ecHeader.Parent().EncodeToString())
|
||||||
|
cmd.Printf(" index: %d\n", ecHeader.Index())
|
||||||
|
cmd.Printf(" total: %d\n", ecHeader.Total())
|
||||||
|
cmd.Printf(" header length: %d\n", ecHeader.HeaderLength())
|
||||||
|
}
|
||||||
|
|
||||||
return printSplitHeader(cmd, obj)
|
return printSplitHeader(cmd, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,12 +27,12 @@
|
||||||
"FROSTFS_IR_NETMAP_CLEANER_THRESHOLD":"3",
|
"FROSTFS_IR_NETMAP_CLEANER_THRESHOLD":"3",
|
||||||
"FROSTFS_IR_LOCODE_DB_PATH":"${workspaceFolder}/.cache/locode_db",
|
"FROSTFS_IR_LOCODE_DB_PATH":"${workspaceFolder}/.cache/locode_db",
|
||||||
"FROSTFS_IR_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8090",
|
"FROSTFS_IR_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8090",
|
||||||
"FROSTFS_IR_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-ir-state"
|
"FROSTFS_IR_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-ir-state"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "Storage node",
|
"name": "Storage node 1",
|
||||||
"type": "go",
|
"type": "go",
|
||||||
"request": "launch",
|
"request": "launch",
|
||||||
"mode": "debug",
|
"mode": "debug",
|
||||||
|
@ -42,7 +42,8 @@
|
||||||
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet.json",
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet01.json",
|
||||||
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
||||||
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
||||||
|
@ -50,31 +51,187 @@
|
||||||
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s1-state",
|
||||||
"FROSTFS_TREE_ENABLED":"true",
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/wc0",
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s1/wc0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/meta0",
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s1/meta0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/blobovnicza0",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s1/blobovnicza0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/fstree0",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s1/fstree0",
|
||||||
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/pilorama0",
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama0",
|
||||||
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/wc1",
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s1/wc1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/meta1",
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s1/meta1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/blobovnicza1",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s1/blobovnicza1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/fstree1",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s1/fstree1",
|
||||||
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/pilorama1"
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
|
},
|
||||||
|
"postDebugTask": "env-down"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Storage node 2",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "cmd/frostfs-node",
|
||||||
|
"env": {
|
||||||
|
"FROSTFS_LOGGER_LEVEL":"debug",
|
||||||
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet02.json",
|
||||||
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8082",
|
||||||
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8082",
|
||||||
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8083",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s2-state",
|
||||||
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s2/wc0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s2/meta0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s2/blobovnicza0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s2/fstree0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s2/wc1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s2/meta1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s2/blobovnicza1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s2/fstree1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
|
},
|
||||||
|
"postDebugTask": "env-down"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Storage node 3",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "cmd/frostfs-node",
|
||||||
|
"env": {
|
||||||
|
"FROSTFS_LOGGER_LEVEL":"debug",
|
||||||
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet03.json",
|
||||||
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8084",
|
||||||
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8084",
|
||||||
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8085",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s3-state",
|
||||||
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s3/wc0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s3/meta0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s3/blobovnicza0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s3/fstree0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s3/wc1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s3/meta1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s3/blobovnicza1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s3/fstree1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
|
},
|
||||||
|
"postDebugTask": "env-down"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Storage node 4",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "debug",
|
||||||
|
"program": "cmd/frostfs-node",
|
||||||
|
"env": {
|
||||||
|
"FROSTFS_LOGGER_LEVEL":"debug",
|
||||||
|
"FROSTFS_MORPH_DIAL_TIMEOUT":"30s",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_ADDRESS":"ws://127.0.0.1:30333/ws",
|
||||||
|
"FROSTFS_MORPH_RPC_ENDPOINT_0_PRIORITY":"0",
|
||||||
|
"FROSTFS_MORPH_INACTIVITY_TIMEOUT":"60s",
|
||||||
|
"FROSTFS_NODE_WALLET_PATH":"${workspaceFolder}/dev/storage/wallet04.json",
|
||||||
|
"FROSTFS_NODE_WALLET_PASSWORD":"",
|
||||||
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8086",
|
||||||
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8086",
|
||||||
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8087",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/state/.frostfs-node-s4-state",
|
||||||
|
"FROSTFS_TREE_ENABLED":"true",
|
||||||
|
"FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME":"10",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s4/wc0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_METABASE_PATH":"${workspaceFolder}/.cache/storage/s4/meta0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s4/blobovnicza0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s4/fstree0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_0_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama0",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED":"true",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH":"${workspaceFolder}/.cache/storage/s4/wc1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_METABASE_PATH":"${workspaceFolder}/.cache/storage/s4/meta1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE":"blobovnicza",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH":"${workspaceFolder}/.cache/storage/s4/blobovnicza1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH":"4",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE":"fstree",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH":"${workspaceFolder}/.cache/storage/s4/fstree1",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_DEPTH":"2",
|
||||||
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
|
||||||
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
|
||||||
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
}
|
}
|
||||||
|
@ -82,9 +239,15 @@
|
||||||
"compounds": [
|
"compounds": [
|
||||||
{
|
{
|
||||||
"name": "IR+Storage node",
|
"name": "IR+Storage node",
|
||||||
"configurations": ["IR", "Storage node"],
|
"configurations": ["IR", "Storage node 1"],
|
||||||
"preLaunchTask": "env-up",
|
"preLaunchTask": "env-up",
|
||||||
"stopAll": true
|
"stopAll": true
|
||||||
}
|
},
|
||||||
|
{
|
||||||
|
"name": "IR + 4 storage nodes",
|
||||||
|
"configurations": ["IR", "Storage node 1", "Storage node 2", "Storage node 3", "Storage node 4"],
|
||||||
|
"preLaunchTask": "env-up",
|
||||||
|
"stopAll": true
|
||||||
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
30
dev/storage/wallet02.json
Normal file
30
dev/storage/wallet02.json
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{
|
||||||
|
"version":"3.0",
|
||||||
|
"accounts":[
|
||||||
|
{
|
||||||
|
"address":"NVXXy3hNTvwVEZa2dAibALyJB3Q86aiHvL",
|
||||||
|
"key":"6PYXd9hxMYfaCkgeZp3q1RoMB921RQFkRxYftcacTJ2S7MUwnivrxi6Yk5",
|
||||||
|
"label":"",
|
||||||
|
"contract":{
|
||||||
|
"script":"DCED/2W2rnkTSk3OnQ0504Uem6tO6Xq/hugeHFu8UM0oJq5BVuezJw==",
|
||||||
|
"parameters":[
|
||||||
|
{
|
||||||
|
"name":"parameter0",
|
||||||
|
"type":"Signature"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"deployed":false
|
||||||
|
},
|
||||||
|
"lock":false,
|
||||||
|
"isDefault":false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"scrypt":{
|
||||||
|
"n":16384,
|
||||||
|
"r":8,
|
||||||
|
"p":8
|
||||||
|
},
|
||||||
|
"extra":{
|
||||||
|
"Tokens":null
|
||||||
|
}
|
||||||
|
}
|
30
dev/storage/wallet03.json
Normal file
30
dev/storage/wallet03.json
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{
|
||||||
|
"version":"3.0",
|
||||||
|
"accounts":[
|
||||||
|
{
|
||||||
|
"address":"NPTmih9X14Y7xLvmD6RVtDHdH1Y9qJwoTe",
|
||||||
|
"key":"6PYXNeQzge9fWztVnWYRbr5Mh9q1y4npKVARHYGb484Hct1iNd3vXGR1kk",
|
||||||
|
"label":"",
|
||||||
|
"contract":{
|
||||||
|
"script":"DCECrJIM198LYbKJBy5rlG4tpOGjG5qxxiG7R14w+kqxAsNBVuezJw==",
|
||||||
|
"parameters":[
|
||||||
|
{
|
||||||
|
"name":"parameter0",
|
||||||
|
"type":"Signature"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"deployed":false
|
||||||
|
},
|
||||||
|
"lock":false,
|
||||||
|
"isDefault":false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"scrypt":{
|
||||||
|
"n":16384,
|
||||||
|
"r":8,
|
||||||
|
"p":8
|
||||||
|
},
|
||||||
|
"extra":{
|
||||||
|
"Tokens":null
|
||||||
|
}
|
||||||
|
}
|
30
dev/storage/wallet04.json
Normal file
30
dev/storage/wallet04.json
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{
|
||||||
|
"version":"3.0",
|
||||||
|
"accounts":[
|
||||||
|
{
|
||||||
|
"address":"Ne2DAQbWvP1s7TbtFc7BStKMnjKJdBaVRm",
|
||||||
|
"key":"6PYWCsGWx8uSVYK94tvK7Ccit8x8Z3f3dHADTFTgLhT9NBXTBqBECL8AyC",
|
||||||
|
"label":"",
|
||||||
|
"contract":{
|
||||||
|
"script":"DCEDjIYpWeVrQ+IPeRh8T+ngvHyMZsFgPmzw7H+Hq2sI3DVBVuezJw==",
|
||||||
|
"parameters":[
|
||||||
|
{
|
||||||
|
"name":"parameter0",
|
||||||
|
"type":"Signature"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"deployed":false
|
||||||
|
},
|
||||||
|
"lock":false,
|
||||||
|
"isDefault":false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"scrypt":{
|
||||||
|
"n":16384,
|
||||||
|
"r":8,
|
||||||
|
"p":8
|
||||||
|
},
|
||||||
|
"extra":{
|
||||||
|
"Tokens":null
|
||||||
|
}
|
||||||
|
}
|
1
go.mod
1
go.mod
|
@ -85,6 +85,7 @@ require (
|
||||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||||
|
github.com/klauspost/reedsolomon v1.12.1 // indirect
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/magiconair/properties v1.8.7 // indirect
|
||||||
github.com/mailru/easyjson v0.7.7 // indirect
|
github.com/mailru/easyjson v0.7.7 // indirect
|
||||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -579,4 +579,6 @@ const (
|
||||||
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
||||||
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
||||||
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
||||||
|
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||||
|
ECFailedToSaveECPart = "failed to save EC part"
|
||||||
)
|
)
|
||||||
|
|
11
pkg/core/container/ec.go
Normal file
11
pkg/core/container/ec.go
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsECContainer returns True if container has erasure coding policy.
|
||||||
|
func IsECContainer(cnr containerSDK.Container) bool {
|
||||||
|
return policy.IsECPlacement(cnr.PlacementPolicy())
|
||||||
|
}
|
13
pkg/core/object/ec.go
Normal file
13
pkg/core/object/ec.go
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsECSupported returns True if EC supported for object.
|
||||||
|
//
|
||||||
|
// EC supported only for regular, not linking objects.
|
||||||
|
func IsECSupported(obj *objectSDK.Object) bool {
|
||||||
|
return obj.Type() == objectSDK.TypeRegular &&
|
||||||
|
len(obj.Children()) == 0
|
||||||
|
}
|
20
pkg/core/policy/ec.go
Normal file
20
pkg/core/policy/ec.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package policy
|
||||||
|
|
||||||
|
import (
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IsECPlacement returns True if policy is erasure coding policy.
|
||||||
|
func IsECPlacement(policy netmapSDK.PlacementPolicy) bool {
|
||||||
|
return policy.NumberOfReplicas() == 1 && policy.ReplicaDescriptor(0).GetECDataCount() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECDataCount returns EC data count for EC placement policy.
|
||||||
|
func ECDataCount(policy netmapSDK.PlacementPolicy) int {
|
||||||
|
return int(policy.ReplicaDescriptor(0).GetECDataCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECParityCount returns EC parity count for EC placement policy.
|
||||||
|
func ECParityCount(policy netmapSDK.PlacementPolicy) int {
|
||||||
|
return int(policy.ReplicaDescriptor(0).GetECParityCount())
|
||||||
|
}
|
54
pkg/services/object/put/builder.go
Normal file
54
pkg/services/object/put/builder.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ChunkedObjectWriter = (*inMemoryObjectBuilder)(nil)
|
||||||
|
|
||||||
|
type inMemoryObjectBuilder struct {
|
||||||
|
objectWriter transformer.ObjectWriter
|
||||||
|
payload *payload
|
||||||
|
|
||||||
|
obj *objectSDK.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func newInMemoryObjectBuilder(objectWriter transformer.ObjectWriter) *inMemoryObjectBuilder {
|
||||||
|
return &inMemoryObjectBuilder{
|
||||||
|
objectWriter: objectWriter,
|
||||||
|
payload: getPayload(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
||||||
|
defer func() {
|
||||||
|
putPayload(b.payload)
|
||||||
|
b.payload = nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
b.obj.SetPayload(b.payload.Data)
|
||||||
|
|
||||||
|
if err := b.objectWriter.WriteObject(ctx, b.obj); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _ := b.obj.ID()
|
||||||
|
return &transformer.AccessIdentifiers{
|
||||||
|
SelfID: id,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) Write(_ context.Context, p []byte) (int, error) {
|
||||||
|
b.payload.Data = append(b.payload.Data, p...)
|
||||||
|
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *inMemoryObjectBuilder) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
|
||||||
|
b.obj = obj
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type preparedObjectTarget interface {
|
type preparedObjectTarget interface {
|
||||||
|
@ -15,16 +14,13 @@ type preparedObjectTarget interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type distributedTarget struct {
|
type distributedTarget struct {
|
||||||
placementOpts []placement.Option
|
placementOpts []placement.Option
|
||||||
extraBroadcastEnabled bool
|
|
||||||
|
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
objMeta object.ContentMeta
|
objMeta object.ContentMeta
|
||||||
|
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
payload *payload
|
|
||||||
|
|
||||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||||
|
|
||||||
relay func(context.Context, nodeDesc) error
|
relay func(context.Context, nodeDesc) error
|
||||||
|
@ -91,36 +87,6 @@ func (x errIncompletePut) Error() string {
|
||||||
return commonMsg
|
return commonMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) WriteHeader(_ context.Context, obj *objectSDK.Object) error {
|
|
||||||
t.obj = obj
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) {
|
|
||||||
t.payload.Data = append(t.payload.Data, p...)
|
|
||||||
|
|
||||||
return len(p), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
|
|
||||||
defer func() {
|
|
||||||
putPayload(t.payload)
|
|
||||||
t.payload = nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
t.obj.SetPayload(t.payload.Data)
|
|
||||||
|
|
||||||
if err := t.WriteObject(ctx, t.obj); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
id, _ := t.obj.ID()
|
|
||||||
return &transformer.AccessIdentifiers{
|
|
||||||
SelfID: id,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteObject implements the transformer.ObjectWriter interface.
|
// WriteObject implements the transformer.ObjectWriter interface.
|
||||||
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
t.obj = obj
|
t.obj = obj
|
||||||
|
|
265
pkg/services/object/put/ec.go
Normal file
265
pkg/services/object/put/ec.go
Normal file
|
@ -0,0 +1,265 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ObjectWriter = (*ecWriter)(nil)
|
||||||
|
|
||||||
|
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||||
|
|
||||||
|
type ecWriter struct {
|
||||||
|
cfg *cfg
|
||||||
|
placementOpts []placement.Option
|
||||||
|
container containerSDK.Container
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
commonPrm *svcutil.CommonPrm
|
||||||
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||||
|
|
||||||
|
objMeta object.ContentMeta
|
||||||
|
objMetaValid bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
relayed, err := e.relayIfNotContainerNode(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if relayed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !object.IsECSupported(obj) {
|
||||||
|
// must be resolved by caller
|
||||||
|
return errUnsupportedECObject
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.objMetaValid {
|
||||||
|
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
|
||||||
|
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
|
||||||
|
}
|
||||||
|
e.objMetaValid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj.ECHeader() != nil {
|
||||||
|
return e.writeECPart(ctx, obj)
|
||||||
|
}
|
||||||
|
return e.writeRawObject(ctx, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) {
|
||||||
|
if e.relay == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if currentNodeIsContainerNode {
|
||||||
|
// object can be splitted or saved local
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err := e.relayToContainerNode(ctx); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, node := range nodes {
|
||||||
|
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var lastErr error
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, node := range nodes {
|
||||||
|
var info client.NodeInfo
|
||||||
|
client.NodeInfoFromNetmapElement(&info, node)
|
||||||
|
|
||||||
|
c, err := e.cfg.clientConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
completed := make(chan interface{})
|
||||||
|
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||||
|
defer close(completed)
|
||||||
|
err = e.relay(ctx, info, c)
|
||||||
|
}); poolErr != nil {
|
||||||
|
close(completed)
|
||||||
|
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
|
||||||
|
return poolErr
|
||||||
|
}
|
||||||
|
<-completed
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastErr == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: lastErr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
// now only single EC policy is supported
|
||||||
|
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
parts, err := c.Split(obj, e.key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t, err := placement.NewTraverser(e.placementOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for {
|
||||||
|
nodes := t.Next()
|
||||||
|
if len(nodes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx := range parts {
|
||||||
|
idx := idx
|
||||||
|
eg.Go(func() error {
|
||||||
|
return e.writePart(egCtx, parts[idx], idx, nodes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return errIncompletePut{
|
||||||
|
singleErr: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error {
|
||||||
|
var err error
|
||||||
|
node := nodes[partIdx%len(nodes)]
|
||||||
|
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
err = e.writePartLocal(ctx, obj)
|
||||||
|
} else {
|
||||||
|
err = e.writePartRemote(ctx, obj, node)
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
var err error
|
||||||
|
localTarget := localTarget{
|
||||||
|
storage: e.cfg.localStore,
|
||||||
|
}
|
||||||
|
completed := make(chan interface{})
|
||||||
|
if poolErr := e.cfg.localPool.Submit(func() {
|
||||||
|
defer close(completed)
|
||||||
|
err = localTarget.WriteObject(ctx, obj, e.objMeta)
|
||||||
|
}); poolErr != nil {
|
||||||
|
close(completed)
|
||||||
|
return poolErr
|
||||||
|
}
|
||||||
|
<-completed
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||||
|
var clientNodeInfo client.NodeInfo
|
||||||
|
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||||
|
|
||||||
|
remoteTaget := remoteTarget{
|
||||||
|
privateKey: e.key,
|
||||||
|
clientConstructor: e.cfg.clientConstructor,
|
||||||
|
commonPrm: e.commonPrm,
|
||||||
|
nodeInfo: clientNodeInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
completed := make(chan interface{})
|
||||||
|
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||||
|
defer close(completed)
|
||||||
|
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
|
||||||
|
}); poolErr != nil {
|
||||||
|
close(completed)
|
||||||
|
return poolErr
|
||||||
|
}
|
||||||
|
<-completed
|
||||||
|
return err
|
||||||
|
}
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -25,6 +26,7 @@ import (
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -32,7 +34,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
var (
|
||||||
|
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||||
|
errInvalidECObject = errors.New("object must be splitted to EC parts")
|
||||||
|
)
|
||||||
|
|
||||||
type putSingleRequestSigner struct {
|
type putSingleRequestSigner struct {
|
||||||
req *objectAPI.PutSingleRequest
|
req *objectAPI.PutSingleRequest
|
||||||
|
@ -148,12 +153,20 @@ func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Ob
|
||||||
|
|
||||||
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
||||||
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
iter := s.cfg.newNodeIterator(placementOptions)
|
if placement.isEC {
|
||||||
|
return s.saveToECReplicas(ctx, placement, obj, req, meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
|
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||||
|
|
||||||
signer := &putSingleRequestSigner{
|
signer := &putSingleRequestSigner{
|
||||||
|
@ -167,38 +180,83 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
|
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
var result []placement.Option
|
if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil {
|
||||||
if len(copiesNumber) > 0 {
|
return errInvalidECObject
|
||||||
result = append(result, placement.WithCopyNumbers(copiesNumber))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
key, err := s.cfg.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
signer := &putSingleRequestSigner{
|
||||||
|
req: req,
|
||||||
|
keyStorage: s.keyStorage,
|
||||||
|
signer: &sync.Once{},
|
||||||
|
}
|
||||||
|
|
||||||
|
w := ecWriter{
|
||||||
|
cfg: s.cfg,
|
||||||
|
placementOpts: placement.placementOptions,
|
||||||
|
objMeta: meta,
|
||||||
|
objMetaValid: true,
|
||||||
|
commonPrm: commonPrm,
|
||||||
|
container: placement.container,
|
||||||
|
key: key,
|
||||||
|
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
||||||
|
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return w.WriteObject(ctx, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
type putSinglePlacement struct {
|
||||||
|
placementOptions []placement.Option
|
||||||
|
isEC bool
|
||||||
|
container containerSDK.Container
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
||||||
|
var result putSinglePlacement
|
||||||
|
|
||||||
cnrID, ok := obj.ContainerID()
|
cnrID, ok := obj.ContainerID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("missing container ID")
|
return result, errors.New("missing container ID")
|
||||||
}
|
}
|
||||||
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get container by ID: %w", err)
|
return result, fmt.Errorf("could not get container by ID: %w", err)
|
||||||
}
|
}
|
||||||
result = append(result, placement.ForContainer(cnrInfo.Value))
|
result.container = cnrInfo.Value
|
||||||
|
result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj)
|
||||||
|
if len(copiesNumber) > 0 && !result.isEC {
|
||||||
|
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
|
||||||
|
}
|
||||||
|
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
||||||
|
|
||||||
objID, ok := obj.ID()
|
objID, ok := obj.ID()
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("missing object ID")
|
return result, errors.New("missing object ID")
|
||||||
}
|
}
|
||||||
result = append(result, placement.ForObject(objID))
|
if obj.ECHeader() != nil {
|
||||||
|
objID = obj.ECHeader().Parent()
|
||||||
|
}
|
||||||
|
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
|
||||||
|
|
||||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get latest network map: %w", err)
|
return result, fmt.Errorf("could not get latest network map: %w", err)
|
||||||
}
|
}
|
||||||
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
||||||
if localOnly {
|
if localOnly {
|
||||||
result = append(result, placement.SuccessAfter(1))
|
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
|
||||||
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
||||||
}
|
}
|
||||||
result = append(result, placement.UseBuilder(builder))
|
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
)
|
)
|
||||||
|
@ -20,7 +21,7 @@ import (
|
||||||
type Streamer struct {
|
type Streamer struct {
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
sessionKey *ecdsa.PrivateKey
|
privateKey *ecdsa.PrivateKey
|
||||||
|
|
||||||
target transformer.ChunkedObjectWriter
|
target transformer.ChunkedObjectWriter
|
||||||
|
|
||||||
|
@ -77,9 +78,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||||
p.relay = prm.relay
|
p.relay = prm.relay
|
||||||
|
|
||||||
|
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.privateKey = nodeKey
|
||||||
|
|
||||||
// prepare untrusted-Put object target
|
// prepare untrusted-Put object target
|
||||||
p.target = &validatingPreparedTarget{
|
p.target = &validatingPreparedTarget{
|
||||||
nextTarget: p.newCommonTarget(prm),
|
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
|
|
||||||
maxPayloadSz: p.maxPayloadSz,
|
maxPayloadSz: p.maxPayloadSz,
|
||||||
|
@ -103,7 +110,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sessionKey, err := p.keyStorage.GetKey(sessionInfo)
|
key, err := p.keyStorage.GetKey(sessionInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
||||||
}
|
}
|
||||||
|
@ -117,7 +124,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
|
|
||||||
if sToken == nil {
|
if sToken == nil {
|
||||||
var ownerSession user.ID
|
var ownerSession user.ID
|
||||||
user.IDFromKey(&ownerSession, sessionKey.PublicKey)
|
user.IDFromKey(&ownerSession, key.PublicKey)
|
||||||
|
|
||||||
if !ownerObj.Equals(ownerSession) {
|
if !ownerObj.Equals(ownerSession) {
|
||||||
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
||||||
|
@ -128,12 +135,12 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.sessionKey = sessionKey
|
p.privateKey = key
|
||||||
p.target = &validatingTarget{
|
p.target = &validatingTarget{
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
Key: sessionKey,
|
Key: key,
|
||||||
NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) },
|
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
|
||||||
NetworkState: p.networkState,
|
NetworkState: p.networkState,
|
||||||
MaxSize: p.maxPayloadSz,
|
MaxSize: p.maxPayloadSz,
|
||||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
||||||
|
@ -172,7 +179,12 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
placement.ForContainer(prm.cnr),
|
placement.ForContainer(prm.cnr),
|
||||||
)
|
)
|
||||||
|
|
||||||
if id, ok := prm.hdr.ID(); ok {
|
if ech := prm.hdr.ECHeader(); ech != nil {
|
||||||
|
prm.traverseOpts = append(prm.traverseOpts,
|
||||||
|
// set identifier of the processing object
|
||||||
|
placement.ForObject(ech.Parent()),
|
||||||
|
)
|
||||||
|
} else if id, ok := prm.hdr.ID(); ok {
|
||||||
prm.traverseOpts = append(prm.traverseOpts,
|
prm.traverseOpts = append(prm.traverseOpts,
|
||||||
// set identifier of the processing object
|
// set identifier of the processing object
|
||||||
placement.ForObject(id),
|
placement.ForObject(id),
|
||||||
|
@ -196,7 +208,14 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
|
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
|
||||||
|
return p.newECWriter(prm)
|
||||||
|
}
|
||||||
|
return p.newDefaultObjectWriter(prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
var relay func(context.Context, nodeDesc) error
|
var relay func(context.Context, nodeDesc) error
|
||||||
if p.relay != nil {
|
if p.relay != nil {
|
||||||
relay = func(ctx context.Context, node nodeDesc) error {
|
relay = func(ctx context.Context, node nodeDesc) error {
|
||||||
|
@ -213,16 +232,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable additional container broadcast on non-local operation
|
|
||||||
// if object has TOMBSTONE or LOCK type.
|
|
||||||
typ := prm.hdr.Type()
|
|
||||||
withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock)
|
|
||||||
|
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
cfg: p.cfg,
|
cfg: p.cfg,
|
||||||
placementOpts: prm.traverseOpts,
|
placementOpts: prm.traverseOpts,
|
||||||
extraBroadcastEnabled: withBroadcast,
|
|
||||||
payload: getPayload(),
|
|
||||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||||
if node.local {
|
if node.local {
|
||||||
return localTarget{
|
return localTarget{
|
||||||
|
@ -231,7 +243,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
|
|
||||||
rt := &remoteTarget{
|
rt := &remoteTarget{
|
||||||
privateKey: p.sessionKey,
|
privateKey: p.privateKey,
|
||||||
commonPrm: prm.common,
|
commonPrm: prm.common,
|
||||||
clientConstructor: p.clientConstructor,
|
clientConstructor: p.clientConstructor,
|
||||||
}
|
}
|
||||||
|
@ -244,6 +256,20 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||||
|
return &objectWriterDispatcher{
|
||||||
|
ecWriter: &ecWriter{
|
||||||
|
cfg: p.cfg,
|
||||||
|
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||||
|
container: prm.cnr,
|
||||||
|
key: p.privateKey,
|
||||||
|
commonPrm: prm.common,
|
||||||
|
relay: p.relay,
|
||||||
|
},
|
||||||
|
repWriter: p.newDefaultObjectWriter(prm),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
||||||
if p.target == nil {
|
if p.target == nil {
|
||||||
return errNotInit
|
return errNotInit
|
||||||
|
|
23
pkg/services/object/put/writer.go
Normal file
23
pkg/services/object/put/writer.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
package putsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ transformer.ObjectWriter = (*objectWriterDispatcher)(nil)
|
||||||
|
|
||||||
|
type objectWriterDispatcher struct {
|
||||||
|
ecWriter transformer.ObjectWriter
|
||||||
|
repWriter transformer.ObjectWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *objectWriterDispatcher) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
if object.IsECSupported(obj) {
|
||||||
|
return m.ecWriter.WriteObject(ctx, obj)
|
||||||
|
}
|
||||||
|
return m.repWriter.WriteObject(ctx, obj)
|
||||||
|
}
|
|
@ -137,7 +137,7 @@ func defaultCopiesVector(policy netmap.PlacementPolicy) []int {
|
||||||
copyVector := make([]int, 0, replNum)
|
copyVector := make([]int, 0, replNum)
|
||||||
|
|
||||||
for i := 0; i < replNum; i++ {
|
for i := 0; i < replNum; i++ {
|
||||||
copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()))
|
copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()+policy.ReplicaDescriptor(i).GetECDataCount()+policy.ReplicaDescriptor(i).GetECParityCount()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return copyVector
|
return copyVector
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -40,6 +41,10 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add
|
||||||
}
|
}
|
||||||
|
|
||||||
policy := cnr.Value.PlacementPolicy()
|
policy := cnr.Value.PlacementPolicy()
|
||||||
|
if policycore.IsECPlacement(policy) {
|
||||||
|
// EC not supported yet by policer
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue