mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 20:32:56 +00:00
Compare commits
2 Commits
fcdm/minor
...
vlad/try-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ffbf39d00e | ||
|
|
a37a2af69e |
12
.github/actions/neon-project-create/action.yml
vendored
12
.github/actions/neon-project-create/action.yml
vendored
@@ -14,8 +14,11 @@ inputs:
|
||||
api_host:
|
||||
description: 'Neon API host'
|
||||
default: console-stage.neon.build
|
||||
provisioner:
|
||||
description: 'k8s-pod or k8s-neonvm'
|
||||
default: 'k8s-pod'
|
||||
compute_units:
|
||||
description: '[Min, Max] compute units'
|
||||
description: '[Min, Max] compute units; Min and Max are used for k8s-neonvm with autoscaling, for k8s-pod values Min and Max should be equal'
|
||||
default: '[1, 1]'
|
||||
|
||||
outputs:
|
||||
@@ -34,6 +37,10 @@ runs:
|
||||
# A shell without `set -x` to not to expose password/dsn in logs
|
||||
shell: bash -euo pipefail {0}
|
||||
run: |
|
||||
if [ "${PROVISIONER}" == "k8s-pod" ] && [ "${MIN_CU}" != "${MAX_CU}" ]; then
|
||||
echo >&2 "For k8s-pod provisioner MIN_CU should be equal to MAX_CU"
|
||||
fi
|
||||
|
||||
project=$(curl \
|
||||
"https://${API_HOST}/api/v2/projects" \
|
||||
--fail \
|
||||
@@ -45,7 +52,7 @@ runs:
|
||||
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
|
||||
\"pg_version\": ${POSTGRES_VERSION},
|
||||
\"region_id\": \"${REGION_ID}\",
|
||||
\"provisioner\": \"k8s-neonvm\",
|
||||
\"provisioner\": \"${PROVISIONER}\",
|
||||
\"autoscaling_limit_min_cu\": ${MIN_CU},
|
||||
\"autoscaling_limit_max_cu\": ${MAX_CU},
|
||||
\"settings\": { }
|
||||
@@ -68,5 +75,6 @@ runs:
|
||||
API_KEY: ${{ inputs.api_key }}
|
||||
REGION_ID: ${{ inputs.region_id }}
|
||||
POSTGRES_VERSION: ${{ inputs.postgres_version }}
|
||||
PROVISIONER: ${{ inputs.provisioner }}
|
||||
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
|
||||
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
|
||||
|
||||
@@ -131,8 +131,8 @@ runs:
|
||||
exit 1
|
||||
fi
|
||||
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
|
||||
# -n sets the number of parallel processes that pytest-xdist will run
|
||||
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
|
||||
# -n16 uses sixteen processes to run tests via pytest-xdist
|
||||
EXTRA_PARAMS="-n16 $EXTRA_PARAMS"
|
||||
|
||||
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
|
||||
# to the same worker to make @pytest.mark.order work with xdist
|
||||
|
||||
@@ -278,6 +278,9 @@ jobs:
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
BUILD_TAG: ${{ inputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
PAGESERVER_GET_IMPL: vectored
|
||||
PAGESERVER_VALIDATE_VEC_GET: true
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
# Ref https://github.com/neondatabase/neon/issues/4540
|
||||
|
||||
52
.github/workflows/benchmarking.yml
vendored
52
.github/workflows/benchmarking.yml
vendored
@@ -63,9 +63,11 @@ jobs:
|
||||
- DEFAULT_PG_VERSION: 16
|
||||
PLATFORM: "neon-staging"
|
||||
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
|
||||
provisioner: 'k8s-pod'
|
||||
- DEFAULT_PG_VERSION: 16
|
||||
PLATFORM: "azure-staging"
|
||||
region_id: 'azure-eastus2'
|
||||
provisioner: 'k8s-neonvm'
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
|
||||
@@ -98,6 +100,7 @@ jobs:
|
||||
region_id: ${{ matrix.region_id }}
|
||||
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
provisioner: ${{ matrix.provisioner }}
|
||||
|
||||
- name: Run benchmark
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
@@ -213,11 +216,11 @@ jobs:
|
||||
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
|
||||
#
|
||||
# Available platforms:
|
||||
# - neonvm-captest-new: Freshly created project (1 CU)
|
||||
# - neonvm-captest-freetier: Use freetier-sized compute (0.25 CU)
|
||||
# - neon-captest-new: Freshly created project (1 CU)
|
||||
# - neon-captest-freetier: Use freetier-sized compute (0.25 CU)
|
||||
# - neonvm-captest-azure-new: Freshly created project (1 CU) in azure region
|
||||
# - neonvm-captest-azure-freetier: Use freetier-sized compute (0.25 CU) in azure region
|
||||
# - neonvm-captest-reuse: Reusing existing project
|
||||
# - neon-captest-reuse: Reusing existing project
|
||||
# - rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
|
||||
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
|
||||
env:
|
||||
@@ -242,16 +245,18 @@ jobs:
|
||||
"'"$region_id_default"'"
|
||||
],
|
||||
"platform": [
|
||||
"neonvm-captest-new",
|
||||
"neonvm-captest-reuse",
|
||||
"neon-captest-new",
|
||||
"neon-captest-reuse",
|
||||
"neonvm-captest-new"
|
||||
],
|
||||
"db_size": [ "10gb" ],
|
||||
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" },
|
||||
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neon-captest-freetier", "db_size": "3gb" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neon-captest-new", "db_size": "50gb" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "50gb" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb" },
|
||||
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "50gb" },
|
||||
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-sharding-reuse", "db_size": "50gb" }]
|
||||
}'
|
||||
|
||||
@@ -266,7 +271,7 @@ jobs:
|
||||
run: |
|
||||
matrix='{
|
||||
"platform": [
|
||||
"neonvm-captest-reuse"
|
||||
"neon-captest-reuse"
|
||||
]
|
||||
}'
|
||||
|
||||
@@ -282,7 +287,7 @@ jobs:
|
||||
run: |
|
||||
matrix='{
|
||||
"platform": [
|
||||
"neonvm-captest-reuse"
|
||||
"neon-captest-reuse"
|
||||
],
|
||||
"scale": [
|
||||
"10"
|
||||
@@ -333,7 +338,7 @@ jobs:
|
||||
prefix: latest
|
||||
|
||||
- name: Create Neon Project
|
||||
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
|
||||
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
@@ -341,18 +346,19 @@ jobs:
|
||||
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
compute_units: ${{ (contains(matrix.platform, 'captest-freetier') && '[0.25, 0.25]') || '[1, 1]' }}
|
||||
provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }}
|
||||
|
||||
- name: Set up Connection String
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neonvm-captest-reuse)
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
|
||||
;;
|
||||
neonvm-captest-sharding-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
|
||||
;;
|
||||
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
|
||||
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
|
||||
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -436,9 +442,9 @@ jobs:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- PLATFORM: "neonvm-captest-pgvector"
|
||||
- PLATFORM: "neon-captest-pgvector"
|
||||
- PLATFORM: "azure-captest-pgvector"
|
||||
|
||||
|
||||
env:
|
||||
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
|
||||
TEST_PG_BENCH_SCALES_MATRIX: "1"
|
||||
@@ -480,7 +486,7 @@ jobs:
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neonvm-captest-pgvector)
|
||||
neon-captest-pgvector)
|
||||
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
|
||||
;;
|
||||
azure-captest-pgvector)
|
||||
@@ -579,7 +585,7 @@ jobs:
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neonvm-captest-reuse)
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -589,7 +595,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -666,7 +672,7 @@ jobs:
|
||||
- name: Get Connstring Secret Name
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neonvm-captest-reuse)
|
||||
neon-captest-reuse)
|
||||
ENV_PLATFORM=CAPTEST_TPCH
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -676,7 +682,7 @@ jobs:
|
||||
ENV_PLATFORM=RDS_AURORA_TPCH
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -753,7 +759,7 @@ jobs:
|
||||
id: set-up-connstr
|
||||
run: |
|
||||
case "${PLATFORM}" in
|
||||
neonvm-captest-reuse)
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
|
||||
;;
|
||||
rds-aurora)
|
||||
@@ -763,7 +769,7 @@ jobs:
|
||||
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
|
||||
;;
|
||||
*)
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
28
.github/workflows/build_and_test.yml
vendored
28
.github/workflows/build_and_test.yml
vendored
@@ -286,6 +286,9 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_IMPL: vectored
|
||||
PAGESERVER_GET_IMPL: vectored
|
||||
PAGESERVER_VALIDATE_VEC_GET: false
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
@@ -833,9 +836,6 @@ jobs:
|
||||
rm -rf .docker-custom
|
||||
|
||||
promote-images:
|
||||
permissions:
|
||||
contents: read # This is required for actions/checkout
|
||||
id-token: write # This is required for Azure Login to work.
|
||||
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
@@ -862,28 +862,6 @@ jobs:
|
||||
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
|
||||
- name: Azure login
|
||||
if: github.ref_name == 'main'
|
||||
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
|
||||
with:
|
||||
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
|
||||
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
|
||||
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
|
||||
|
||||
- name: Login to ACR
|
||||
if: github.ref_name == 'main'
|
||||
run: |
|
||||
az acr login --name=neoneastus2
|
||||
|
||||
- name: Copy docker images to ACR-dev
|
||||
if: github.ref_name == 'main'
|
||||
run: |
|
||||
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
|
||||
docker buildx imagetools create \
|
||||
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
|
||||
- name: Add latest tag to images
|
||||
if: github.ref_name == 'main'
|
||||
run: |
|
||||
|
||||
72
.github/workflows/pg-clients.yml
vendored
72
.github/workflows/pg-clients.yml
vendored
@@ -13,7 +13,6 @@ on:
|
||||
paths:
|
||||
- '.github/workflows/pg-clients.yml'
|
||||
- 'test_runner/pg_clients/**'
|
||||
- 'test_runner/logical_repl/**'
|
||||
- 'poetry.lock'
|
||||
workflow_dispatch:
|
||||
|
||||
@@ -50,77 +49,6 @@ jobs:
|
||||
image-tag: ${{ needs.check-build-tools-image.outputs.image-tag }}
|
||||
secrets: inherit
|
||||
|
||||
test-logical-replication:
|
||||
needs: [ build-build-tools-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
options: --init --user root
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server:24.6.3.64
|
||||
ports:
|
||||
- 9000:9000
|
||||
- 8123:8123
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Download Neon artifact
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Create Neon Project
|
||||
id: create-neon-project
|
||||
uses: ./.github/actions/neon-project-create
|
||||
with:
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
|
||||
- name: Run tests
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: remote
|
||||
test_selection: logical_repl
|
||||
run_in_parallel: false
|
||||
extra_params: -m remote_cluster
|
||||
pg_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
|
||||
- name: Delete Neon Project
|
||||
if: always()
|
||||
uses: ./.github/actions/neon-project-delete
|
||||
with:
|
||||
project_id: ${{ steps.create-neon-project.outputs.project_id }}
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
|
||||
- name: Create Allure report
|
||||
if: ${{ !cancelled() }}
|
||||
id: create-allure-report
|
||||
uses: ./.github/actions/allure-report-generate
|
||||
with:
|
||||
store-test-results-into-db: true
|
||||
env:
|
||||
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: github.event.schedule && failure()
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
|
||||
slack-message: |
|
||||
Testing the logical replication: <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|${{ job.status }}> (<${{ steps.create-allure-report.outputs.report-url }}|test report>)
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
test-postgres-client-libs:
|
||||
needs: [ build-build-tools-image ]
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
|
||||
/storage_controller @neondatabase/storage
|
||||
/libs/pageserver_api/ @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
|
||||
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/safekeepers
|
||||
/libs/remote_storage/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/storage
|
||||
/libs/safekeeper_api/ @neondatabase/safekeepers
|
||||
/libs/vm_monitor/ @neondatabase/autoscaling
|
||||
/pageserver/ @neondatabase/storage
|
||||
/pgxn/ @neondatabase/compute
|
||||
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
|
||||
/pgxn/neon/ @neondatabase/compute @neondatabase/safekeepers
|
||||
/proxy/ @neondatabase/proxy
|
||||
/safekeeper/ @neondatabase/storage
|
||||
/safekeeper/ @neondatabase/safekeepers
|
||||
/vendor/ @neondatabase/compute
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1672,7 +1672,6 @@ checksum = "62d6dcd069e7b5fe49a302411f759d4cf1cf2c27fe798ef46fb8baefc053dd2b"
|
||||
dependencies = [
|
||||
"bitflags 2.4.1",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"diesel_derives",
|
||||
"itoa",
|
||||
"pq-sys",
|
||||
@@ -5719,7 +5718,6 @@ dependencies = [
|
||||
"aws-config",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"clap",
|
||||
"control_plane",
|
||||
"diesel",
|
||||
|
||||
@@ -313,5 +313,3 @@ To get more familiar with this aspect, refer to:
|
||||
- Read [CONTRIBUTING.md](/CONTRIBUTING.md) to learn about project code style and practices.
|
||||
- To get familiar with a source tree layout, use [sourcetree.md](/docs/sourcetree.md).
|
||||
- To learn more about PostgreSQL internals, check http://www.interdb.jp/pg/index.html
|
||||
|
||||
.
|
||||
|
||||
@@ -514,6 +514,7 @@ impl LocalEnv {
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
// (allow unknown fields, unlike PageServerConf)
|
||||
struct PageserverConfigTomlSubset {
|
||||
id: NodeId,
|
||||
listen_pg_addr: String,
|
||||
listen_http_addr: String,
|
||||
pg_auth_type: AuthType,
|
||||
@@ -525,30 +526,18 @@ impl LocalEnv {
|
||||
.with_context(|| format!("read {:?}", config_toml_path))?,
|
||||
)
|
||||
.context("parse pageserver.toml")?;
|
||||
let identity_toml_path = dentry.path().join("identity.toml");
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct IdentityTomlSubset {
|
||||
id: NodeId,
|
||||
}
|
||||
let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
|
||||
&std::fs::read_to_string(&identity_toml_path)
|
||||
.with_context(|| format!("read {:?}", identity_toml_path))?,
|
||||
)
|
||||
.context("parse identity.toml")?;
|
||||
let PageserverConfigTomlSubset {
|
||||
id: config_toml_id,
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
} = config_toml;
|
||||
let IdentityTomlSubset {
|
||||
id: identity_toml_id,
|
||||
} = identity_toml;
|
||||
let conf = PageServerConf {
|
||||
id: {
|
||||
anyhow::ensure!(
|
||||
identity_toml_id == id,
|
||||
"id mismatch: identity.toml:id={identity_toml_id} pageserver_(.*) id={id}",
|
||||
config_toml_id == id,
|
||||
"id mismatch: config_toml.id={config_toml_id} id={id}",
|
||||
);
|
||||
id
|
||||
},
|
||||
|
||||
@@ -127,13 +127,10 @@ impl PageServerNode {
|
||||
}
|
||||
|
||||
// Apply the user-provided overrides
|
||||
overrides.push({
|
||||
let mut doc =
|
||||
toml_edit::ser::to_document(&conf).expect("we deserialized this from toml earlier");
|
||||
// `id` is written out to `identity.toml` instead of `pageserver.toml`
|
||||
doc.remove("id").expect("it's part of the struct");
|
||||
doc.to_string()
|
||||
});
|
||||
overrides.push(
|
||||
toml_edit::ser::to_string_pretty(&conf)
|
||||
.expect("we deserialized this from toml earlier"),
|
||||
);
|
||||
|
||||
// Turn `overrides` into a toml document.
|
||||
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Instant;
|
||||
|
||||
/// Request/response types for the storage controller
|
||||
/// API (`/control/v1` prefix). Implemented by the server
|
||||
@@ -294,42 +294,6 @@ pub enum PlacementPolicy {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateResponse {}
|
||||
|
||||
/// Metadata health record posted from scrubber.
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MetadataHealthRecord {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub healthy: bool,
|
||||
pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MetadataHealthUpdateRequest {
|
||||
pub healthy_tenant_shards: Vec<TenantShardId>,
|
||||
pub unhealthy_tenant_shards: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct MetadataHealthUpdateResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
pub struct MetadataHealthListUnhealthyResponse {
|
||||
pub unhealthy_tenant_shards: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
pub struct MetadataHealthListOutdatedRequest {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub not_scrubbed_for: Duration,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
|
||||
pub struct MetadataHealthListOutdatedResponse {
|
||||
pub health_records: Vec<MetadataHealthRecord>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
@@ -355,8 +355,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
.blobs()
|
||||
.map(|k| ListingObject{
|
||||
key: self.name_to_relative_path(&k.name),
|
||||
last_modified: k.properties.last_modified.into(),
|
||||
size: k.properties.content_length,
|
||||
last_modified: k.properties.last_modified.into()
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
@@ -153,7 +153,6 @@ pub enum ListingMode {
|
||||
pub struct ListingObject {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -195,7 +194,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>>;
|
||||
|
||||
async fn list(
|
||||
&self,
|
||||
@@ -352,10 +351,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &'a CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a {
|
||||
match self {
|
||||
Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
|
||||
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
|
||||
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>>>>,
|
||||
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
|
||||
|
||||
@@ -368,7 +368,6 @@ impl RemoteStorage for LocalFs {
|
||||
key: k.clone(),
|
||||
// LocalFs is just for testing, so just specify a dummy time
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
})
|
||||
@@ -412,7 +411,6 @@ impl RemoteStorage for LocalFs {
|
||||
key: RemotePath::from_string(&relative_key).unwrap(),
|
||||
// LocalFs is just for testing
|
||||
last_modified: SystemTime::now(),
|
||||
size: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -565,12 +565,9 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
};
|
||||
|
||||
let size = object.size.unwrap_or(0) as u64;
|
||||
|
||||
result.keys.push(ListingObject{
|
||||
key,
|
||||
last_modified,
|
||||
size,
|
||||
last_modified
|
||||
});
|
||||
if let Some(mut mk) = max_keys {
|
||||
assert!(mk > 0);
|
||||
|
||||
@@ -114,7 +114,7 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
async_stream::stream! {
|
||||
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
@@ -18,20 +18,20 @@ const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Scope {
|
||||
/// Provides access to all data for a specific tenant (specified in `struct Claims` below)
|
||||
// Provides access to all data for a specific tenant (specified in `struct Claims` below)
|
||||
// TODO: join these two?
|
||||
Tenant,
|
||||
/// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
|
||||
/// Should only be used e.g. for status check/tenant creation/list.
|
||||
// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
|
||||
// Should only be used e.g. for status check/tenant creation/list.
|
||||
PageServerApi,
|
||||
/// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
|
||||
/// Should only be used e.g. for status check.
|
||||
/// Currently also used for connection from any pageserver to any safekeeper.
|
||||
// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
|
||||
// Should only be used e.g. for status check.
|
||||
// Currently also used for connection from any pageserver to any safekeeper.
|
||||
SafekeeperData,
|
||||
/// The scope used by pageservers in upcalls to storage controller and cloud control plane
|
||||
// The scope used by pageservers in upcalls to storage controller and cloud control plane
|
||||
#[serde(rename = "generations_api")]
|
||||
GenerationsApi,
|
||||
/// Allows access to control plane managment API and some storage controller endpoints.
|
||||
// Allows access to control plane managment API and some storage controller endpoints.
|
||||
Admin,
|
||||
|
||||
/// Allows access to storage controller APIs used by the scrubber, to interrogate the state
|
||||
|
||||
@@ -52,7 +52,7 @@ pub mod defaults {
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
|
||||
@@ -83,16 +83,16 @@ pub mod defaults {
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
|
||||
|
||||
pub const DEFAULT_GET_VECTORED_IMPL: &str = "vectored";
|
||||
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
|
||||
|
||||
pub const DEFAULT_GET_IMPL: &str = "vectored";
|
||||
pub const DEFAULT_GET_IMPL: &str = "legacy";
|
||||
|
||||
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
|
||||
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
|
||||
ImageCompressionAlgorithm::Disabled;
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
@@ -356,6 +356,8 @@ struct PageServerConfigBuilder {
|
||||
auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
|
||||
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
|
||||
|
||||
id: BuilderValue<NodeId>,
|
||||
|
||||
broker_endpoint: BuilderValue<Uri>,
|
||||
broker_keepalive_interval: BuilderValue<Duration>,
|
||||
|
||||
@@ -404,8 +406,11 @@ struct PageServerConfigBuilder {
|
||||
}
|
||||
|
||||
impl PageServerConfigBuilder {
|
||||
fn new() -> Self {
|
||||
Self::default()
|
||||
fn new(node_id: NodeId) -> Self {
|
||||
let mut this = Self::default();
|
||||
this.id(node_id);
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -433,6 +438,7 @@ impl PageServerConfigBuilder {
|
||||
pg_auth_type: Set(AuthType::Trust),
|
||||
auth_validation_public_key_path: Set(None),
|
||||
remote_storage_config: Set(None),
|
||||
id: NotSet,
|
||||
broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint")),
|
||||
@@ -562,6 +568,10 @@ impl PageServerConfigBuilder {
|
||||
self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
|
||||
}
|
||||
|
||||
pub fn id(&mut self, node_id: NodeId) {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
|
||||
pub fn log_format(&mut self, log_format: LogFormat) {
|
||||
self.log_format = BuilderValue::Set(log_format)
|
||||
}
|
||||
@@ -673,7 +683,7 @@ impl PageServerConfigBuilder {
|
||||
self.l0_flush = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let default = Self::default_values();
|
||||
|
||||
macro_rules! conf {
|
||||
@@ -706,6 +716,7 @@ impl PageServerConfigBuilder {
|
||||
pg_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage_config,
|
||||
id,
|
||||
broker_endpoint,
|
||||
broker_keepalive_interval,
|
||||
log_format,
|
||||
@@ -733,7 +744,6 @@ impl PageServerConfigBuilder {
|
||||
}
|
||||
CUSTOM LOGIC
|
||||
{
|
||||
id: id,
|
||||
// TenantConf is handled separately
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
concurrent_tenant_warmup: ConfigurableSemaphore::new({
|
||||
@@ -883,7 +893,7 @@ impl PageServerConf {
|
||||
toml: &Document,
|
||||
workdir: &Utf8Path,
|
||||
) -> anyhow::Result<Self> {
|
||||
let mut builder = PageServerConfigBuilder::new();
|
||||
let mut builder = PageServerConfigBuilder::new(node_id);
|
||||
builder.workdir(workdir.to_owned());
|
||||
|
||||
let mut t_conf = TenantConfOpt::default();
|
||||
@@ -914,6 +924,8 @@ impl PageServerConf {
|
||||
"tenant_config" => {
|
||||
t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
|
||||
}
|
||||
"id" => {}, // Ignoring `id` field in pageserver.toml - using identity.toml as the source of truth
|
||||
// Logging is not set up yet, so we can't do it.
|
||||
"broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
|
||||
"broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
|
||||
"log_format" => builder.log_format(
|
||||
@@ -1006,7 +1018,7 @@ impl PageServerConf {
|
||||
}
|
||||
}
|
||||
|
||||
let mut conf = builder.build(node_id).context("invalid config")?;
|
||||
let mut conf = builder.build().context("invalid config")?;
|
||||
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
let auth_validation_public_key_path = conf
|
||||
@@ -1243,6 +1255,7 @@ max_file_descriptors = 333
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
initial_superuser_name = 'zzzz'
|
||||
id = 10
|
||||
|
||||
metric_collection_interval = '222 s'
|
||||
metric_collection_endpoint = 'http://localhost:80/metrics'
|
||||
@@ -1259,8 +1272,9 @@ background_task_maximum_delay = '334 s'
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
|
||||
// we have to create dummy values to overcome the validation errors
|
||||
let config_string =
|
||||
format!("pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",);
|
||||
let config_string = format!(
|
||||
"pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
|
||||
);
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_config = PageServerConf::parse_and_validate(NodeId(10), &toml, &workdir)
|
||||
@@ -1565,6 +1579,7 @@ broker_endpoint = '{broker_endpoint}'
|
||||
r#"pg_distrib_dir = "{pg_distrib_dir}"
|
||||
metric_collection_endpoint = "http://sample.url"
|
||||
metric_collection_interval = "10min"
|
||||
id = 222
|
||||
|
||||
[disk_usage_based_eviction]
|
||||
max_usage_pct = 80
|
||||
@@ -1634,6 +1649,7 @@ threshold = "20m"
|
||||
r#"pg_distrib_dir = "{pg_distrib_dir}"
|
||||
metric_collection_endpoint = "http://sample.url"
|
||||
metric_collection_interval = "10min"
|
||||
id = 222
|
||||
|
||||
[tenant_config]
|
||||
evictions_low_residence_duration_metric_threshold = "20m"
|
||||
|
||||
@@ -2129,24 +2129,14 @@ async fn secondary_download_handler(
|
||||
|
||||
let timeout = wait.unwrap_or(Duration::MAX);
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
let status = match tokio::time::timeout(
|
||||
timeout,
|
||||
state.secondary_controller.download_tenant(tenant_shard_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
let progress = secondary_tenant.progress.lock().unwrap().clone();
|
||||
|
||||
let status = match result {
|
||||
Ok(Ok(())) => {
|
||||
if progress.layers_downloaded >= progress.layers_total {
|
||||
// Download job ran to completion
|
||||
StatusCode::OK
|
||||
} else {
|
||||
// Download dropped out without errors because it ran out of time budget
|
||||
StatusCode::ACCEPTED
|
||||
}
|
||||
}
|
||||
.await
|
||||
{
|
||||
// Download job ran to completion.
|
||||
Ok(Ok(())) => StatusCode::OK,
|
||||
// Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
|
||||
// okay. We could get an error here in the unlikely edge case that the tenant
|
||||
// was detached between our check above and executing the download job.
|
||||
@@ -2156,6 +2146,8 @@ async fn secondary_download_handler(
|
||||
Err(_) => StatusCode::ACCEPTED,
|
||||
};
|
||||
|
||||
let progress = secondary_tenant.progress.lock().unwrap().clone();
|
||||
|
||||
json_response(status, progress)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,23 +2,13 @@ use std::{num::NonZeroUsize, sync::Arc};
|
||||
|
||||
use crate::tenant::ephemeral_file;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
|
||||
#[derive(Default, Debug, PartialEq, Eq, Clone, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum L0FlushConfig {
|
||||
#[default]
|
||||
PageCached,
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct {
|
||||
max_concurrency: NonZeroUsize,
|
||||
},
|
||||
}
|
||||
|
||||
impl Default for L0FlushConfig {
|
||||
fn default() -> Self {
|
||||
Self::Direct {
|
||||
// TODO: using num_cpus results in different peak memory usage on different instance types.
|
||||
max_concurrency: NonZeroUsize::new(usize::max(1, num_cpus::get())).unwrap(),
|
||||
}
|
||||
}
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -613,23 +613,7 @@ pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy<IntCounter> = Lazy::new(|| {
|
||||
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_compression_image_in_bytes_total",
|
||||
"Size of data written into image layers before compression"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_compression_image_in_bytes_considered",
|
||||
"Size of potentially compressible data written into image layers before compression"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_compression_image_in_bytes_chosen",
|
||||
"Size of data whose compressed form was written into image layers"
|
||||
"Size of uncompressed data written into image layers"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -102,7 +102,8 @@ use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -1226,29 +1227,11 @@ impl Tenant {
|
||||
Ok(timeline_preloads)
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_timeline_archival_config(
|
||||
pub async fn apply_timeline_archival_config(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineArchivalState,
|
||||
_timeline_id: TimelineId,
|
||||
_config: TimelineArchivalState,
|
||||
) -> anyhow::Result<()> {
|
||||
let timeline = self
|
||||
.get_timeline(timeline_id, false)
|
||||
.context("Cannot apply timeline archival config to inexistent timeline")?;
|
||||
|
||||
let upload_needed = timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_archival_state(state)?;
|
||||
|
||||
if upload_needed {
|
||||
const MAX_WAIT: Duration = Duration::from_secs(10);
|
||||
let Ok(v) =
|
||||
tokio::time::timeout(MAX_WAIT, timeline.remote_client.wait_completion()).await
|
||||
else {
|
||||
tracing::warn!("reached timeout for waiting on upload queue");
|
||||
bail!("reached timeout for upload queue flush");
|
||||
};
|
||||
v?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1633,23 +1616,21 @@ impl Tenant {
|
||||
/// This function is periodically called by compactor task.
|
||||
/// Also it can be explicitly requested per timeline through page server
|
||||
/// api's 'compact' command.
|
||||
///
|
||||
/// Returns whether we have pending compaction task.
|
||||
async fn compaction_iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, timeline::CompactionError> {
|
||||
) -> Result<(), timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.load();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
info!("Skipping compaction in location state {:?}", conf.location);
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1676,13 +1657,11 @@ impl Tenant {
|
||||
// Before doing any I/O work, check our circuit breaker
|
||||
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
|
||||
info!("Skipping compaction due to previous failures");
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut has_pending_task = false;
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
has_pending_task |= timeline
|
||||
timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await
|
||||
@@ -1702,7 +1681,7 @@ impl Tenant {
|
||||
.unwrap()
|
||||
.success(&CIRCUIT_BREAKERS_UNBROKEN);
|
||||
|
||||
Ok(has_pending_task)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Call through to all timelines to freeze ephemeral layers if needed. Usually
|
||||
|
||||
@@ -28,12 +28,6 @@ use crate::virtual_file::VirtualFile;
|
||||
use std::cmp::min;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CompressionInfo {
|
||||
pub written_compressed: bool,
|
||||
pub compressed_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
@@ -279,10 +273,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
let (buf, res) = self
|
||||
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
|
||||
.await;
|
||||
(buf, res.map(|(off, _compression_info)| off))
|
||||
self.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
@@ -292,12 +284,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
srcbuf: B,
|
||||
ctx: &RequestContext,
|
||||
algorithm: ImageCompressionAlgorithm,
|
||||
) -> (B::Buf, Result<(u64, CompressionInfo), Error>) {
|
||||
) -> (B::Buf, Result<u64, Error>) {
|
||||
let offset = self.offset;
|
||||
let mut compression_info = CompressionInfo {
|
||||
written_compressed: false,
|
||||
compressed_size: None,
|
||||
};
|
||||
|
||||
let len = srcbuf.bytes_init();
|
||||
|
||||
@@ -340,9 +328,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
encoder.write_all(&slice[..]).await.unwrap();
|
||||
encoder.shutdown().await.unwrap();
|
||||
let compressed = encoder.into_inner();
|
||||
compression_info.compressed_size = Some(compressed.len());
|
||||
if compressed.len() < len {
|
||||
compression_info.written_compressed = true;
|
||||
let compressed_len = compressed.len();
|
||||
compressed_buf = Some(compressed);
|
||||
(BYTE_ZSTD, compressed_len, slice.into_inner())
|
||||
@@ -373,7 +359,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
} else {
|
||||
self.write_all(srcbuf, ctx).await
|
||||
};
|
||||
(srcbuf, res.map(|_| (offset, compression_info)))
|
||||
(srcbuf, res.map(|_| offset))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,14 +416,12 @@ pub(crate) mod tests {
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
.write_blob_maybe_compressed(
|
||||
blob.clone(),
|
||||
ctx,
|
||||
ImageCompressionAlgorithm::Zstd { level: Some(1) },
|
||||
)
|
||||
.await;
|
||||
(res.0, res.1.map(|(off, _)| off))
|
||||
wtr.write_blob_maybe_compressed(
|
||||
blob.clone(),
|
||||
ctx,
|
||||
ImageCompressionAlgorithm::Zstd { level: Some(1) },
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
wtr.write_blob(blob.clone(), ctx).await
|
||||
};
|
||||
|
||||
@@ -1384,32 +1384,34 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let remote_path = remote_tenant_path(&tenant_shard_id);
|
||||
let mut keys_stream = self.resources.remote_storage.list_streaming(
|
||||
Some(&remote_path),
|
||||
remote_storage::ListingMode::NoDelimiter,
|
||||
None,
|
||||
&self.cancel,
|
||||
);
|
||||
while let Some(chunk) = keys_stream.next().await {
|
||||
let keys = match chunk {
|
||||
Ok(listing) => listing.keys,
|
||||
Err(remote_storage::DownloadError::Cancelled) => {
|
||||
return Err(DeleteTenantError::Cancelled)
|
||||
}
|
||||
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
|
||||
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
|
||||
};
|
||||
|
||||
if keys.is_empty() {
|
||||
tracing::info!("Remote storage already deleted");
|
||||
} else {
|
||||
tracing::info!("Deleting {} keys from remote storage", keys.len());
|
||||
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
|
||||
self.resources
|
||||
.remote_storage
|
||||
.delete_objects(&keys, &self.cancel)
|
||||
.await?;
|
||||
let keys = match self
|
||||
.resources
|
||||
.remote_storage
|
||||
.list(
|
||||
Some(&remote_path),
|
||||
remote_storage::ListingMode::NoDelimiter,
|
||||
None,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(listing) => listing.keys,
|
||||
Err(remote_storage::DownloadError::Cancelled) => {
|
||||
return Err(DeleteTenantError::Cancelled)
|
||||
}
|
||||
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
|
||||
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
|
||||
};
|
||||
|
||||
if keys.is_empty() {
|
||||
tracing::info!("Remote storage already deleted");
|
||||
} else {
|
||||
tracing::info!("Deleting {} keys from remote storage", keys.len());
|
||||
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
|
||||
self.resources
|
||||
.remote_storage
|
||||
.delete_objects(&keys, &self.cancel)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -187,7 +187,7 @@ use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
pub(crate) use download::download_initdb_tar_zst;
|
||||
use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -457,17 +457,6 @@ impl RemoteTimelineClient {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Returns whether the timeline is archived.
|
||||
/// Return None if the remote index_part hasn't been downloaded yet.
|
||||
pub(crate) fn is_archived(&self) -> Option<bool> {
|
||||
self.upload_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.initialized_mut()
|
||||
.map(|q| q.clean.0.archived_at.is_some())
|
||||
.ok()
|
||||
}
|
||||
|
||||
fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
|
||||
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
|
||||
current_remote_index_part
|
||||
@@ -628,7 +617,7 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
|
||||
/// Launch an index-file upload operation in the background, with only aux_file_policy flag updated.
|
||||
pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
|
||||
self: &Arc<Self>,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
@@ -639,48 +628,6 @@ impl RemoteTimelineClient {
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
|
||||
///
|
||||
/// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
|
||||
/// so either if the change is already sitting in the queue, but not commited yet, or the change has not
|
||||
/// been in the queue yet.
|
||||
pub(crate) fn schedule_index_upload_for_timeline_archival_state(
|
||||
self: &Arc<Self>,
|
||||
state: TimelineArchivalState,
|
||||
) -> anyhow::Result<bool> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
/// Returns Some(_) if a change is needed, and Some(true) if it's a
|
||||
/// change needed to set archived_at.
|
||||
fn need_change(
|
||||
archived_at: &Option<NaiveDateTime>,
|
||||
state: TimelineArchivalState,
|
||||
) -> Option<bool> {
|
||||
match (archived_at, state) {
|
||||
(Some(_), TimelineArchivalState::Archived)
|
||||
| (None, TimelineArchivalState::Unarchived) => {
|
||||
// Nothing to do
|
||||
tracing::info!("intended state matches present state");
|
||||
None
|
||||
}
|
||||
(None, TimelineArchivalState::Archived) => Some(true),
|
||||
(Some(_), TimelineArchivalState::Unarchived) => Some(false),
|
||||
}
|
||||
}
|
||||
let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
|
||||
|
||||
if let Some(archived_at_set) = need_upload_scheduled {
|
||||
let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
|
||||
upload_queue.dirty.archived_at = intended_archived_at;
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
}
|
||||
|
||||
let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
|
||||
Ok(need_wait)
|
||||
}
|
||||
|
||||
///
|
||||
/// Launch an index-file upload operation in the background, if necessary.
|
||||
///
|
||||
|
||||
@@ -32,10 +32,6 @@ pub struct IndexPart {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub deleted_at: Option<NaiveDateTime>,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub archived_at: Option<NaiveDateTime>,
|
||||
|
||||
/// Per layer file name metadata, which can be present for a present or missing layer file.
|
||||
///
|
||||
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
|
||||
@@ -84,11 +80,10 @@ impl IndexPart {
|
||||
/// - 5: lineage was added
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
/// - 7: metadata_bytes is no longer written, but still read
|
||||
/// - 8: added `archived_at`
|
||||
const LATEST_VERSION: usize = 8;
|
||||
const LATEST_VERSION: usize = 7;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -99,7 +94,6 @@ impl IndexPart {
|
||||
disk_consistent_lsn: metadata.disk_consistent_lsn(),
|
||||
metadata,
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Default::default(),
|
||||
last_aux_file_policy: None,
|
||||
}
|
||||
@@ -290,7 +284,6 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
@@ -333,7 +326,6 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
@@ -377,7 +369,6 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
@@ -424,7 +415,6 @@ mod tests {
|
||||
])
|
||||
.unwrap(),
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
@@ -466,7 +456,6 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
@@ -507,7 +496,6 @@ mod tests {
|
||||
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[226,88,25,241,0,46,0,4,0,0,0,0,1,90,118,24,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,78,244,32,0,0,0,0,1,78,244,32,0,0,0,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage {
|
||||
reparenting_history_truncated: false,
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
@@ -557,7 +545,6 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Lineage {
|
||||
reparenting_history_truncated: false,
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
@@ -616,63 +603,6 @@ mod tests {
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Default::default(),
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v8_indexpart_is_parsed() {
|
||||
let example = r#"{
|
||||
"version": 8,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"deleted_at": "2023-07-31T09:00:00.123",
|
||||
"archived_at": "2023-04-29T09:00:00.123"
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 8,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: Some(parse_naive_datetime("2023-04-29T09:00:00.123000000")),
|
||||
lineage: Default::default(),
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
@@ -307,10 +307,12 @@ impl DeltaLayer {
|
||||
.with_context(|| format!("Failed to load delta layer {}", self.path()))
|
||||
}
|
||||
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
|
||||
let loaded = DeltaLayerInner::load(&path, None, None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// not production code
|
||||
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
|
||||
@@ -467,7 +469,7 @@ impl DeltaLayerWriterInner {
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Ok(off) => off,
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
};
|
||||
|
||||
@@ -758,24 +760,27 @@ impl DeltaLayerInner {
|
||||
&self.layer_lsn_range
|
||||
}
|
||||
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
/// - outer has the permanent failure
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
summary: Option<Summary>,
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path, ctx).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let file_id = page_cache::next_file_id();
|
||||
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
|
||||
let summary_blk = block_reader
|
||||
.read_blk(0, ctx)
|
||||
.await
|
||||
.context("read first block")?;
|
||||
let summary_blk = match block_reader.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
|
||||
// TODO: this should be an assertion instead; see ImageLayerInner::load
|
||||
let actual_summary =
|
||||
@@ -797,7 +802,7 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DeltaLayerInner {
|
||||
Ok(Ok(DeltaLayerInner {
|
||||
file,
|
||||
file_id,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
@@ -805,7 +810,7 @@ impl DeltaLayerInner {
|
||||
max_vectored_read_bytes,
|
||||
layer_key_range: actual_summary.key_range,
|
||||
layer_lsn_range: actual_summary.lsn_range,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
|
||||
@@ -265,8 +265,9 @@ impl ImageLayer {
|
||||
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
let loaded =
|
||||
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
|
||||
.await
|
||||
.and_then(|res| res)?;
|
||||
|
||||
// not production code
|
||||
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
|
||||
@@ -384,16 +385,17 @@ impl ImageLayerInner {
|
||||
summary: Option<Summary>,
|
||||
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path, ctx)
|
||||
.await
|
||||
.context("open layer file")?;
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path, ctx).await {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
let file_id = page_cache::next_file_id();
|
||||
let block_reader = FileBlockReader::new(&file, file_id);
|
||||
let summary_blk = block_reader
|
||||
.read_blk(0, ctx)
|
||||
.await
|
||||
.context("read first block")?;
|
||||
let summary_blk = match block_reader.read_blk(0, ctx).await {
|
||||
Ok(blk) => blk,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
|
||||
};
|
||||
|
||||
// length is the only way how this could fail, so it's not actually likely at all unless
|
||||
// read_blk returns wrong sized block.
|
||||
@@ -418,7 +420,7 @@ impl ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ImageLayerInner {
|
||||
Ok(Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn,
|
||||
@@ -426,7 +428,7 @@ impl ImageLayerInner {
|
||||
file_id,
|
||||
max_vectored_read_bytes,
|
||||
key_range: actual_summary.key_range,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
@@ -734,14 +736,6 @@ struct ImageLayerWriterInner {
|
||||
// Total uncompressed bytes passed into put_image
|
||||
uncompressed_bytes: u64,
|
||||
|
||||
// Like `uncompressed_bytes`,
|
||||
// but only of images we might consider for compression
|
||||
uncompressed_bytes_eligible: u64,
|
||||
|
||||
// Like `uncompressed_bytes`, but only of images
|
||||
// where we have chosen their compressed form
|
||||
uncompressed_bytes_chosen: u64,
|
||||
|
||||
blob_writer: BlobWriter<false>,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
}
|
||||
@@ -798,8 +792,6 @@ impl ImageLayerWriterInner {
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
uncompressed_bytes: 0,
|
||||
uncompressed_bytes_eligible: 0,
|
||||
uncompressed_bytes_chosen: 0,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -818,22 +810,13 @@ impl ImageLayerWriterInner {
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
self.uncompressed_bytes += img.len() as u64;
|
||||
let (_img, res) = self
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(img, ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res?;
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
}
|
||||
if compression_info.written_compressed {
|
||||
// The image has been compressed
|
||||
self.uncompressed_bytes_chosen += uncompressed_len;
|
||||
}
|
||||
let off = res?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
@@ -856,9 +839,6 @@ impl ImageLayerWriterInner {
|
||||
// Calculate compression ratio
|
||||
let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
|
||||
.inc_by(self.uncompressed_bytes_eligible);
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
|
||||
let mut file = self.blob_writer.into_inner();
|
||||
|
||||
@@ -1651,9 +1651,8 @@ impl Drop for DownloadedLayer {
|
||||
}
|
||||
|
||||
impl DownloadedLayer {
|
||||
/// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
|
||||
/// Failure to load the layer is sticky, i.e., future `get()` calls will return
|
||||
/// the initial load failure immediately.
|
||||
/// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
|
||||
/// initialize it permanently.
|
||||
///
|
||||
/// `owner` parameter is a strong reference at the same `LayerInner` as the
|
||||
/// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
|
||||
@@ -1684,7 +1683,7 @@ impl DownloadedLayer {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map(LayerKind::Delta)
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
@@ -1701,29 +1700,32 @@ impl DownloadedLayer {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map(LayerKind::Image)
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(layer) => Ok(layer),
|
||||
Err(err) => {
|
||||
Ok(Ok(layer)) => Ok(Ok(layer)),
|
||||
Ok(Err(transient)) => Err(transient),
|
||||
Err(permanent) => {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// We log this message once over the lifetime of `Self`
|
||||
// => Ok and good to log backtrace and path here.
|
||||
tracing::error!(
|
||||
"layer load failed, assuming permanent failure: {}: {err:?}",
|
||||
owner.path
|
||||
);
|
||||
Err(err)
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them **once**
|
||||
// here as well
|
||||
let permanent = permanent.context("load layer");
|
||||
tracing::error!("layer loading failed permanently: {permanent:#}");
|
||||
Ok(Err(permanent))
|
||||
}
|
||||
}
|
||||
};
|
||||
self.kind
|
||||
.get_or_init(init)
|
||||
.await
|
||||
.get_or_try_init(init)
|
||||
// return transient errors using `?`
|
||||
.await?
|
||||
.as_ref()
|
||||
// We already logged the full backtrace above, once. Don't repeat that here.
|
||||
.map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
|
||||
.map_err(|e| {
|
||||
// errors are not clonabled, cannot but stringify
|
||||
// test_broken_timeline matches this string
|
||||
anyhow::anyhow!("layer loading failed: {e:#}")
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_value_reconstruct_data(
|
||||
@@ -1758,11 +1760,7 @@ impl DownloadedLayer {
|
||||
) -> Result<(), GetVectoredError> {
|
||||
use LayerKind::*;
|
||||
|
||||
match self
|
||||
.get(owner, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?
|
||||
{
|
||||
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
|
||||
Delta(d) => {
|
||||
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
|
||||
.await
|
||||
|
||||
@@ -210,28 +210,24 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run compaction
|
||||
match tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
Err(e) => {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
Ok(has_pending_task) => {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
if has_pending_task { Duration::from_secs(0) } else { period }
|
||||
}
|
||||
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
1.0,
|
||||
MAX_BACKOFF_SECS,
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
} else {
|
||||
error_run_count = 0;
|
||||
period
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1769,14 +1769,13 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
|
||||
/// compaction tasks.
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
pub(crate) async fn compact(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, CompactionError> {
|
||||
) -> Result<(), CompactionError> {
|
||||
// most likely the cancellation token is from background task, but in tests it could be the
|
||||
// request task as well.
|
||||
|
||||
@@ -1796,8 +1795,8 @@ impl Timeline {
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let (_guard, _permit) = tokio::select! {
|
||||
tuple = prepare => { tuple },
|
||||
_ = self.cancel.cancelled() => return Ok(false),
|
||||
_ = cancel.cancelled() => return Ok(false),
|
||||
_ = self.cancel.cancelled() => return Ok(()),
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -1805,14 +1804,11 @@ impl Timeline {
|
||||
// Last record Lsn could be zero in case the timeline was just created
|
||||
if !last_record_lsn.is_valid() {
|
||||
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
|
||||
return Ok(false);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self.get_compaction_algorithm_settings().kind {
|
||||
CompactionAlgorithm::Tiered => {
|
||||
self.compact_tiered(cancel, ctx).await?;
|
||||
Ok(false)
|
||||
}
|
||||
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
|
||||
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
|
||||
}
|
||||
}
|
||||
@@ -2001,11 +1997,6 @@ impl Timeline {
|
||||
self.current_state() == TimelineState::Active
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn is_archived(&self) -> Option<bool> {
|
||||
self.remote_client.is_archived()
|
||||
}
|
||||
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
self.current_state() == TimelineState::Stopping
|
||||
}
|
||||
|
||||
@@ -102,19 +102,17 @@ impl KeyHistoryRetention {
|
||||
|
||||
impl Timeline {
|
||||
/// TODO: cancellation
|
||||
///
|
||||
/// Returns whether the compaction has pending tasks.
|
||||
pub(crate) async fn compact_legacy(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, CompactionError> {
|
||||
) -> Result<(), CompactionError> {
|
||||
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
|
||||
self.compact_with_gc(cancel, ctx)
|
||||
return self
|
||||
.compact_with_gc(cancel, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
return Ok(false);
|
||||
.map_err(CompactionError::Other);
|
||||
}
|
||||
|
||||
// High level strategy for compaction / image creation:
|
||||
@@ -162,7 +160,7 @@ impl Timeline {
|
||||
// Define partitioning schema if needed
|
||||
|
||||
// FIXME: the match should only cover repartitioning, not the next steps
|
||||
let (partition_count, has_pending_tasks) = match self
|
||||
let partition_count = match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
@@ -179,35 +177,30 @@ impl Timeline {
|
||||
|
||||
// 2. Compact
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
|
||||
self.compact_level0(target_file_size, ctx).await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if flags.contains(CompactFlags::ForceImageLayerCreation) {
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough". Skip image layer creation if L0 compaction cannot keep up.
|
||||
if fully_compacted {
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if flags.contains(CompactFlags::ForceImageLayerCreation) {
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
} else {
|
||||
info!("skipping image layer generation due to L0 compaction did not include all layers.");
|
||||
}
|
||||
(partitioning.parts.len(), !fully_compacted)
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
partitioning.parts.len()
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
@@ -219,7 +212,7 @@ impl Timeline {
|
||||
if !self.cancel.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
(1, false)
|
||||
1
|
||||
}
|
||||
};
|
||||
|
||||
@@ -232,7 +225,7 @@ impl Timeline {
|
||||
self.compact_shard_ancestors(rewrite_max, ctx).await?;
|
||||
}
|
||||
|
||||
Ok(has_pending_tasks)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check for layers that are elegible to be rewritten:
|
||||
@@ -439,16 +432,15 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files. Returns whether the L0 layers are fully compacted.
|
||||
/// as Level 1 files.
|
||||
async fn compact_level0(
|
||||
self: &Arc<Self>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, CompactionError> {
|
||||
) -> Result<(), CompactionError> {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
fully_compacted,
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let ctx = ctx.attached_child();
|
||||
@@ -471,12 +463,12 @@ impl Timeline {
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
return Ok(true);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
|
||||
.await?;
|
||||
Ok(fully_compacted)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
|
||||
@@ -543,8 +535,6 @@ impl Timeline {
|
||||
) as u64
|
||||
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
|
||||
|
||||
let mut fully_compacted = true;
|
||||
|
||||
deltas_to_compact.push(
|
||||
first_level0_delta
|
||||
.download_and_keep_resident()
|
||||
@@ -572,7 +562,6 @@ impl Timeline {
|
||||
"L0 compaction picker hit max delta layer size limit: {}",
|
||||
delta_size_limit
|
||||
);
|
||||
fully_compacted = false;
|
||||
|
||||
// Proceed with compaction, but only a subset of L0s
|
||||
break;
|
||||
@@ -934,7 +923,6 @@ impl Timeline {
|
||||
.into_iter()
|
||||
.map(|x| x.drop_eviction_guard())
|
||||
.collect::<Vec<_>>(),
|
||||
fully_compacted,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -943,9 +931,6 @@ impl Timeline {
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<ResidentLayer>,
|
||||
deltas_to_compact: Vec<Layer>,
|
||||
// Whether we have included all L0 layers, or selected only part of them due to the
|
||||
// L0 compaction size limit.
|
||||
fully_compacted: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
153
poetry.lock
generated
153
poetry.lock
generated
@@ -870,96 +870,6 @@ files = [
|
||||
[package.dependencies]
|
||||
colorama = {version = "*", markers = "platform_system == \"Windows\""}
|
||||
|
||||
[[package]]
|
||||
name = "clickhouse-connect"
|
||||
version = "0.7.17"
|
||||
description = "ClickHouse Database Core Driver for Python, Pandas, and Superset"
|
||||
optional = false
|
||||
python-versions = "~=3.8"
|
||||
files = [
|
||||
{file = "clickhouse-connect-0.7.17.tar.gz", hash = "sha256:854f1f9f3e024e7f89ae5d57cd3289d7a4c3dc91a9f24c4d233014f0ea19cb2d"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:aca36f5f28be1ada2981fce87724bbf451f267c918015baec59e527de3c9c882"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:66209e4634f457604c263bea176336079d26c284e251e68a8435b0b80c1a25ff"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4d86c5a561a2a99321c8b4af22257461b8e67142f34cfea6e70f39b45b1f406"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d200c9afa2725a96f9f3718221f641276b80c11bf504d8a2fbaafb5a05b2f0d3"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:004d867b1005445a46e6742db1054bf2a717a451372663b46e09b5e9e90a31e3"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:4ef94a4a8e008882259151833c3c47cfbb9c8f08de0f100aaf3b95c366dcfb24"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:ee732c3df50c8b07d16b5836ff85e6b84569922455c03837c3add5cf1388fe1f"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:d9dbe1235465bb946e24b90b0ca5b8800b5d645acb2d7d6ee819448c3e2fd959"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-win32.whl", hash = "sha256:e5db0d68dfb63db0297d44dc91406bcfd7d333708d7cd55086c8550fbf870b78"},
|
||||
{file = "clickhouse_connect-0.7.17-cp310-cp310-win_amd64.whl", hash = "sha256:800750f568c097ea312887785025006d6098bffd8ed2dd6a57048fb3ced6d778"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4eb390623b3d15dc9cda78f5c68f83ef9ad11743797e70af8fabc384b015a73c"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:35f172ca950f218f63072024c81d5b4ff6e5399620c255506c321ccc7b17c9a5"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ae7918f060f7576fc931c692e0122b1b07576fabd81444af22e1f8582300d200"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff2881b93c7a1afb9c99fb59ad5fd666850421325d0931e2b77f3f4ba872303d"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8a4d9b4f97271addf66aadbaf7f154f19a0ad6c22026d575a995c55ebd8576db"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e431469b1ff2d5c3e4c406d55c6afdf7102f5d2524c2ceb5481b94ac24412aa3"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2b6f80115176559f181a6b3ecad11aa3d70ef6014c3d2905b90fcef3f27d25c2"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d8ac694f40dfafc8a3cc877116b4bc73e8877ebf66d4d96ee092484ee4c0b481"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-win32.whl", hash = "sha256:78b7a3f6b0fad4eaf8afb5f9a2e855bde53e82ea5804960e9cf779538f4606a1"},
|
||||
{file = "clickhouse_connect-0.7.17-cp311-cp311-win_amd64.whl", hash = "sha256:efd390cc045334ecc3f2a9c18cc07c041d0288b145967805fdcab65abeefa75f"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:9228334a17dc0a7842222f54ba5b89fc563532424aad4f66be799df70ab37e9f"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e432a42bb788bda77e88eda2774392a60fbbb5ee2a79cb2881d182d26c45fe49"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c85152ed2879965ee1fa2bd5e31fb27d281fd5f50d6e86a401efd95cd85b29ef"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:29a126104aa5e11df570cbd89fca4988784084602ba77d17b2396b334c54fd75"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:882d8f9570549258e6eb6a97915fbf64ed29fe395d5e360866ea8d42c8283a35"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:06ebf99111171442f462fb8b357364c3e276da3e8f8557b2e8fee9eb55ab37d1"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:e0cf6f99b2777b0d164bf8b65ec39104cdc0789a56bcb52d98289bbd6f5cc70e"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:ee46c508fddfff3b7ac52326788e0c6dd8dfb416b6d7e02e5d30e8110749dac2"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-win32.whl", hash = "sha256:eb708b590a37d56b069a6088254ffa55d73b8cb65527339df81ef03fe67ffdec"},
|
||||
{file = "clickhouse_connect-0.7.17-cp312-cp312-win_amd64.whl", hash = "sha256:17f00dccddaeaf43733faa1fa21f7d24641454a73669fda862545ba7c88627f5"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ab5d4b37a6dcc39e94c63beac0f22d9dda914f5eb865d166c64cf04dfadb7d16"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:32aa90387f45f34cbc5a984789ed4c12760a3c0056c190ab0123ceafc36b1002"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:21277b6bdd6c8ff14170bfcd52125c5c39f442ec4bafbb643ad7d0ca915f0029"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ca68d8b7dee3fb4e7229e06152f5b0faaccafb4c87d9c2d48fa5bd117a3cc1c0"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:841c56282102b2fba1e0b332bb1c7a0c50992fbc321746af8d3e0e6ca2450e8b"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8d7ffde5a4b95d8fe9ed38e08e504e497310e3d7a17691bd40bf65734648fdfc"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:055960086b6b92b6e44f5ba04c81c40c10b038588e4b3908b033c99f66125332"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:36491fec63ceb8503b6344c23477647030139f346b749dc5ee672c505939dbbe"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-win32.whl", hash = "sha256:8779a907e026db32e6bc0bc0c8d5de0e2e3afd166afc2d4adcc0603399af5539"},
|
||||
{file = "clickhouse_connect-0.7.17-cp38-cp38-win_amd64.whl", hash = "sha256:309854fa197885c6278438ddd032ab52e6fec56f162074e343c3635ca7266078"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e8009f94550178dc971aeb4f8787ba7a5b473c22647490428b7229f540a51d2b"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:70f8422f407b13a404b3670fd097855abd5adaf890c710d6678d2b46ab61ac48"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:082783eb1e8baf7b3465dd045132dc5cb5a91432c899dc4e19891c5f782d8d23"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1c30aad2a9c7584c4ee19e646a087b3bbd2d4daab3d88a2afeeae1a7f6febf9"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fc8e245a9f4f0dce39f155e626405f60f1d3cf4d1e52dd2c793ea6b603ca111b"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:802372cb8a69c9ffdf4260e9f01616c8601ba531825ed6f08834827e0b880cd1"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:193a60271a3b105cdbde96fb20b40eab8a50fca3bb1f397546f7a18b53d9aa9c"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:59d58932916792fdbd09cb961a245a0c2d87b07b8296f9138915b998f4522941"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-win32.whl", hash = "sha256:3cfd0edabb589f640636a97ffc38d1b3d760faef208d44e50829cc1ad3f0d3e5"},
|
||||
{file = "clickhouse_connect-0.7.17-cp39-cp39-win_amd64.whl", hash = "sha256:5661b4629aac228481219abf2e149119af1a71d897f191665e182d9d192d7033"},
|
||||
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:7429d309109e7e4a70fd867d69fcfea9ddcb1a1e910caa6b0e2c3776b71f4613"},
|
||||
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e5ae619151006da84a0b1585a9bcc81be32459d8061aeb2e116bad5bbaa7d108"},
|
||||
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec0c84a0880621cb2389656a89886ef3133f0b3f8dc016eee6f25bbb49ff6f70"},
|
||||
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:705464c23f821666b76f8f619cf2870225156276562756b3933aaa24708e0ff8"},
|
||||
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:1822016f4b769e89264fe26cefe0bc5e50e4c3ca0747d89bb52d57dc4f1e5ffb"},
|
||||
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:6c92b0c342c1fbfa666010e8175e05026dc570a7ef91d8fa81ce503180f318aa"},
|
||||
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2e106536540e906c3c866f8615fcf870a9a77c1bfab9ef4b042febfd2fdb953"},
|
||||
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bac9a32e62384b4341ba51a451084eb3b00c6e59aaac1499145dd8b897cb585c"},
|
||||
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0feed93b9912b7862a8c41be1febcd44b68a824a5c1059b19d5c567afdaa6273"},
|
||||
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:2e2dd6db52e799f065fd565143fde5a872cfe903de1bee7775bc3a349856a790"},
|
||||
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:ed13add5d579a5960155f3000420544368501c9703d2fb94f103b4a6126081f6"},
|
||||
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c257a23ed3bf1858593fb03927d9d073fbbdfa24dc2afee537c3314bd66b4e24"},
|
||||
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d47866f64cbdc2d5cc4f8a7a8c49e3ee90c9e487091b9eda7c3a3576418e1cbe"},
|
||||
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9b850e2f17e0a0b5a37d996d3fb728050227489d64d271d678d166abea94f26e"},
|
||||
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:349682288987dc84ac7695f7cd6b510be8d0ec0eee7c1b72dbf2146b4e9efdb8"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = "*"
|
||||
lz4 = "*"
|
||||
pytz = "*"
|
||||
urllib3 = ">=1.26"
|
||||
zstandard = "*"
|
||||
|
||||
[package.extras]
|
||||
arrow = ["pyarrow"]
|
||||
numpy = ["numpy"]
|
||||
orjson = ["orjson"]
|
||||
pandas = ["pandas"]
|
||||
sqlalchemy = ["sqlalchemy (>1.3.21,<2.0)"]
|
||||
tzlocal = ["tzlocal (>=4.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "colorama"
|
||||
version = "0.4.5"
|
||||
@@ -1560,56 +1470,6 @@ files = [
|
||||
{file = "lazy_object_proxy-1.10.0-pp310.pp311.pp312.pp38.pp39-none-any.whl", hash = "sha256:80fa48bd89c8f2f456fc0765c11c23bf5af827febacd2f523ca5bc1893fcc09d"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lz4"
|
||||
version = "4.3.3"
|
||||
description = "LZ4 Bindings for Python"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "lz4-4.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b891880c187e96339474af2a3b2bfb11a8e4732ff5034be919aa9029484cd201"},
|
||||
{file = "lz4-4.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:222a7e35137d7539c9c33bb53fcbb26510c5748779364014235afc62b0ec797f"},
|
||||
{file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f76176492ff082657ada0d0f10c794b6da5800249ef1692b35cf49b1e93e8ef7"},
|
||||
{file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1d18718f9d78182c6b60f568c9a9cec8a7204d7cb6fad4e511a2ef279e4cb05"},
|
||||
{file = "lz4-4.3.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6cdc60e21ec70266947a48839b437d46025076eb4b12c76bd47f8e5eb8a75dcc"},
|
||||
{file = "lz4-4.3.3-cp310-cp310-win32.whl", hash = "sha256:c81703b12475da73a5d66618856d04b1307e43428a7e59d98cfe5a5d608a74c6"},
|
||||
{file = "lz4-4.3.3-cp310-cp310-win_amd64.whl", hash = "sha256:43cf03059c0f941b772c8aeb42a0813d68d7081c009542301637e5782f8a33e2"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:30e8c20b8857adef7be045c65f47ab1e2c4fabba86a9fa9a997d7674a31ea6b6"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2f7b1839f795315e480fb87d9bc60b186a98e3e5d17203c6e757611ef7dcef61"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:edfd858985c23523f4e5a7526ca6ee65ff930207a7ec8a8f57a01eae506aaee7"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e9c410b11a31dbdc94c05ac3c480cb4b222460faf9231f12538d0074e56c563"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d2507ee9c99dbddd191c86f0e0c8b724c76d26b0602db9ea23232304382e1f21"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-win32.whl", hash = "sha256:f180904f33bdd1e92967923a43c22899e303906d19b2cf8bb547db6653ea6e7d"},
|
||||
{file = "lz4-4.3.3-cp311-cp311-win_amd64.whl", hash = "sha256:b14d948e6dce389f9a7afc666d60dd1e35fa2138a8ec5306d30cd2e30d36b40c"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e36cd7b9d4d920d3bfc2369840da506fa68258f7bb176b8743189793c055e43d"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:31ea4be9d0059c00b2572d700bf2c1bc82f241f2c3282034a759c9a4d6ca4dc2"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c9a6fd20767ccaf70649982f8f3eeb0884035c150c0b818ea660152cf3c809"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca8fccc15e3add173da91be8f34121578dc777711ffd98d399be35487c934bf"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e7d84b479ddf39fe3ea05387f10b779155fc0990125f4fb35d636114e1c63a2e"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-win32.whl", hash = "sha256:337cb94488a1b060ef1685187d6ad4ba8bc61d26d631d7ba909ee984ea736be1"},
|
||||
{file = "lz4-4.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:5d35533bf2cee56f38ced91f766cd0038b6abf46f438a80d50c52750088be93f"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:363ab65bf31338eb364062a15f302fc0fab0a49426051429866d71c793c23394"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0a136e44a16fc98b1abc404fbabf7f1fada2bdab6a7e970974fb81cf55b636d0"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:abc197e4aca8b63f5ae200af03eb95fb4b5055a8f990079b5bdf042f568469dd"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:56f4fe9c6327adb97406f27a66420b22ce02d71a5c365c48d6b656b4aaeb7775"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0e822cd7644995d9ba248cb4b67859701748a93e2ab7fc9bc18c599a52e4604"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-win32.whl", hash = "sha256:24b3206de56b7a537eda3a8123c644a2b7bf111f0af53bc14bed90ce5562d1aa"},
|
||||
{file = "lz4-4.3.3-cp38-cp38-win_amd64.whl", hash = "sha256:b47839b53956e2737229d70714f1d75f33e8ac26e52c267f0197b3189ca6de24"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6756212507405f270b66b3ff7f564618de0606395c0fe10a7ae2ffcbbe0b1fba"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee9ff50557a942d187ec85462bb0960207e7ec5b19b3b48949263993771c6205"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2b901c7784caac9a1ded4555258207d9e9697e746cc8532129f150ffe1f6ba0d"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b6d9ec061b9eca86e4dcc003d93334b95d53909afd5a32c6e4f222157b50c071"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f4c7bf687303ca47d69f9f0133274958fd672efaa33fb5bcde467862d6c621f0"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-win32.whl", hash = "sha256:054b4631a355606e99a42396f5db4d22046a3397ffc3269a348ec41eaebd69d2"},
|
||||
{file = "lz4-4.3.3-cp39-cp39-win_amd64.whl", hash = "sha256:eac9af361e0d98335a02ff12fb56caeb7ea1196cf1a49dbf6f17828a131da807"},
|
||||
{file = "lz4-4.3.3.tar.gz", hash = "sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
docs = ["sphinx (>=1.6.0)", "sphinx-bootstrap-theme"]
|
||||
flake8 = ["flake8"]
|
||||
tests = ["psutil", "pytest (!=3.3.0)", "pytest-cov"]
|
||||
|
||||
[[package]]
|
||||
name = "markupsafe"
|
||||
version = "2.1.1"
|
||||
@@ -2501,17 +2361,6 @@ files = [
|
||||
[package.dependencies]
|
||||
six = ">=1.5"
|
||||
|
||||
[[package]]
|
||||
name = "pytz"
|
||||
version = "2024.1"
|
||||
description = "World timezone definitions, modern and historical"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "pytz-2024.1-py2.py3-none-any.whl", hash = "sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319"},
|
||||
{file = "pytz-2024.1.tar.gz", hash = "sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pywin32"
|
||||
version = "301"
|
||||
@@ -3357,4 +3206,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "7cee6a8c30bc7f4bfb0a87c6bad3952dfb4da127fad853d2710a93ac3eab8a00"
|
||||
content-hash = "16ebd6a46768be7f67dbdb4ee5903b167d94edc9965f29252f038c67e9e907b0"
|
||||
|
||||
@@ -41,7 +41,6 @@ zstandard = "^0.21.0"
|
||||
httpx = {extras = ["http2"], version = "^0.26.0"}
|
||||
pytest-repeat = "^0.9.3"
|
||||
websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
mypy = "==1.3.0"
|
||||
|
||||
@@ -143,12 +143,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
self.tenant_id.unwrap_or(TenantId::from([0u8; 16])),
|
||||
self.timeline_id.unwrap_or(TimelineId::from([0u8; 16])),
|
||||
);
|
||||
tracing::Span::current()
|
||||
.record("ttid", tracing::field::display(ttid))
|
||||
.record(
|
||||
"application_name",
|
||||
tracing::field::debug(self.appname.clone()),
|
||||
);
|
||||
tracing::Span::current().record("ttid", tracing::field::display(ttid));
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
|
||||
@@ -43,7 +43,7 @@ pub async fn task_main(
|
||||
error!("connection handler exited: {}", err);
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty)),
|
||||
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ anyhow.workspace = true
|
||||
aws-config.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -45,12 +44,7 @@ scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
diesel = { version = "2.1.4", features = [
|
||||
"serde_json",
|
||||
"postgres",
|
||||
"r2d2",
|
||||
"chrono",
|
||||
] }
|
||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
|
||||
diesel_migrations = { version = "2.1.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
|
||||
@@ -58,3 +52,4 @@ utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
DROP TABLE metadata_health;
|
||||
@@ -1,14 +0,0 @@
|
||||
CREATE TABLE metadata_health (
|
||||
tenant_id VARCHAR NOT NULL,
|
||||
shard_number INTEGER NOT NULL,
|
||||
shard_count INTEGER NOT NULL,
|
||||
PRIMARY KEY(tenant_id, shard_number, shard_count),
|
||||
-- Rely on cascade behavior for delete
|
||||
FOREIGN KEY(tenant_id, shard_number, shard_count) REFERENCES tenant_shards ON DELETE CASCADE,
|
||||
healthy BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
last_scrubbed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
|
||||
INSERT INTO metadata_health(tenant_id, shard_number, shard_count)
|
||||
SELECT tenant_id, shard_number, shard_count FROM tenant_shards;
|
||||
@@ -3,18 +3,14 @@ use crate::metrics::{
|
||||
METRICS_REGISTRY,
|
||||
};
|
||||
use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use metrics::{BuildInfo, NeonMetrics};
|
||||
use pageserver_api::controller_api::{
|
||||
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
|
||||
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
|
||||
TenantCreateRequest,
|
||||
};
|
||||
use pageserver_api::controller_api::TenantCreateRequest;
|
||||
use pageserver_api::models::{
|
||||
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
|
||||
TenantTimeTravelRequest, TimelineCreateRequest,
|
||||
@@ -564,51 +560,6 @@ async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, A
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_metadata_health_update(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Scrubber)?;
|
||||
|
||||
let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
|
||||
let state = get_state(&req);
|
||||
|
||||
state.service.metadata_health_update(update_req).await?;
|
||||
|
||||
json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
|
||||
}
|
||||
|
||||
async fn handle_metadata_health_list_unhealthy(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
MetadataHealthListUnhealthyResponse {
|
||||
unhealthy_tenant_shards,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_metadata_health_list_outdated(
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
|
||||
let state = get_state(&req);
|
||||
let health_records = state
|
||||
.service
|
||||
.metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
|
||||
.await?;
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
MetadataHealthListOutdatedResponse { health_records },
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -656,13 +607,6 @@ async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
json_response(StatusCode::OK, state.service.step_down().await)
|
||||
}
|
||||
|
||||
async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
@@ -790,47 +734,6 @@ struct RequestMeta {
|
||||
at: Instant,
|
||||
}
|
||||
|
||||
pub fn prologue_leadership_status_check_middleware<
|
||||
B: hyper::body::HttpBody + Send + Sync + 'static,
|
||||
>() -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
let state = get_state(&req);
|
||||
let leadership_status = state.service.get_leadership_status();
|
||||
|
||||
enum AllowedRoutes<'a> {
|
||||
All,
|
||||
Some(Vec<&'a str>),
|
||||
}
|
||||
|
||||
let allowed_routes = match leadership_status {
|
||||
LeadershipStatus::Leader => AllowedRoutes::All,
|
||||
LeadershipStatus::SteppedDown => {
|
||||
// TODO: does it make sense to allow /status here?
|
||||
AllowedRoutes::Some(["/control/v1/step_down", "/status", "/metrics"].to_vec())
|
||||
}
|
||||
LeadershipStatus::Candidate => {
|
||||
AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
|
||||
}
|
||||
};
|
||||
|
||||
let uri = req.uri().to_string();
|
||||
match allowed_routes {
|
||||
AllowedRoutes::All => Ok(req),
|
||||
AllowedRoutes::Some(allowed) if allowed.contains(&uri.as_str()) => Ok(req),
|
||||
_ => {
|
||||
tracing::info!(
|
||||
"Request {} not allowed due to current leadership state",
|
||||
req.uri()
|
||||
);
|
||||
|
||||
Err(ApiError::ResourceUnavailable(
|
||||
format!("Current leadership status is {leadership_status}").into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
) -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
@@ -917,7 +820,6 @@ pub fn make_router(
|
||||
build_info: BuildInfo,
|
||||
) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let mut router = endpoint::make_router()
|
||||
.middleware(prologue_leadership_status_check_middleware())
|
||||
.middleware(prologue_metrics_middleware())
|
||||
.middleware(epilogue_metrics_middleware());
|
||||
if auth.is_some() {
|
||||
@@ -1036,28 +938,6 @@ pub fn make_router(
|
||||
RequestName("control_v1_cancel_node_fill"),
|
||||
)
|
||||
})
|
||||
// Metadata health operations
|
||||
.post("/control/v1/metadata_health/update", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_metadata_health_update,
|
||||
RequestName("control_v1_metadata_health_update"),
|
||||
)
|
||||
})
|
||||
.get("/control/v1/metadata_health/unhealthy", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_metadata_health_list_unhealthy,
|
||||
RequestName("control_v1_metadata_health_list_unhealthy"),
|
||||
)
|
||||
})
|
||||
.post("/control/v1/metadata_health/outdated", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_metadata_health_list_outdated,
|
||||
RequestName("control_v1_metadata_health_list_outdated"),
|
||||
)
|
||||
})
|
||||
// TODO(vlad): endpoint for cancelling drain and fill
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
@@ -1091,9 +971,6 @@ pub fn make_router(
|
||||
RequestName("control_v1_tenant_policy"),
|
||||
)
|
||||
})
|
||||
.put("/control/v1/step_down", |r| {
|
||||
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
|
||||
@@ -13,10 +13,7 @@ use metrics::NeonMetrics;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
persistence::{DatabaseError, DatabaseOperation},
|
||||
service::LeadershipStatus,
|
||||
};
|
||||
use crate::persistence::{DatabaseError, DatabaseOperation};
|
||||
|
||||
pub(crate) static METRICS_REGISTRY: Lazy<StorageControllerMetrics> =
|
||||
Lazy::new(StorageControllerMetrics::default);
|
||||
@@ -84,8 +81,6 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
|
||||
pub(crate) storage_controller_database_query_latency:
|
||||
measured::HistogramVec<DatabaseQueryLatencyLabelGroupSet, 5>,
|
||||
|
||||
pub(crate) storage_controller_leadership_status: measured::GaugeVec<LeadershipStatusGroupSet>,
|
||||
}
|
||||
|
||||
impl StorageControllerMetrics {
|
||||
@@ -161,12 +156,6 @@ pub(crate) struct DatabaseQueryLatencyLabelGroup {
|
||||
pub(crate) operation: DatabaseOperation,
|
||||
}
|
||||
|
||||
#[derive(measured::LabelGroup)]
|
||||
#[label(set = LeadershipStatusGroupSet)]
|
||||
pub(crate) struct LeadershipStatusGroup {
|
||||
pub(crate) status: LeadershipStatus,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy)]
|
||||
pub(crate) enum ReconcileOutcome {
|
||||
#[label(rename = "ok")]
|
||||
|
||||
@@ -8,7 +8,6 @@ use self::split_state::SplitState;
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use pageserver_api::controller_api::MetadataHealthRecord;
|
||||
use pageserver_api::controller_api::ShardSchedulingPolicy;
|
||||
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
@@ -91,10 +90,6 @@ pub(crate) enum DatabaseOperation {
|
||||
UpdateTenantShard,
|
||||
DeleteTenant,
|
||||
UpdateTenantConfig,
|
||||
UpdateMetadataHealth,
|
||||
ListMetadataHealth,
|
||||
ListMetadataHealthUnhealthy,
|
||||
ListMetadataHealthOutdated,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -312,32 +307,15 @@ impl Persistence {
|
||||
&self,
|
||||
shards: Vec<TenantShardPersistence>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::metadata_health;
|
||||
use crate::schema::tenant_shards;
|
||||
|
||||
let now = chrono::Utc::now();
|
||||
|
||||
let metadata_health_records = shards
|
||||
.iter()
|
||||
.map(|t| MetadataHealthPersistence {
|
||||
tenant_id: t.tenant_id.clone(),
|
||||
shard_number: t.shard_number,
|
||||
shard_count: t.shard_count,
|
||||
healthy: true,
|
||||
last_scrubbed_at: now,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::InsertTenantShards,
|
||||
move |conn| -> DatabaseResult<()> {
|
||||
diesel::insert_into(tenant_shards::table)
|
||||
.values(&shards)
|
||||
.execute(conn)?;
|
||||
|
||||
diesel::insert_into(metadata_health::table)
|
||||
.values(&metadata_health_records)
|
||||
.execute(conn)?;
|
||||
for tenant in &shards {
|
||||
diesel::insert_into(tenant_shards)
|
||||
.values(tenant)
|
||||
.execute(conn)?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
@@ -351,10 +329,10 @@ impl Persistence {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::DeleteTenant,
|
||||
move |conn| -> DatabaseResult<()> {
|
||||
// `metadata_health` status (if exists) is also deleted based on the cascade behavior.
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(del_tenant_id.to_string()))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
@@ -697,94 +675,6 @@ impl Persistence {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
|
||||
///
|
||||
/// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn update_metadata_health_records(
|
||||
&self,
|
||||
healthy_records: Vec<MetadataHealthPersistence>,
|
||||
unhealthy_records: Vec<MetadataHealthPersistence>,
|
||||
now: chrono::DateTime<chrono::Utc>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::metadata_health::dsl::*;
|
||||
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::UpdateMetadataHealth,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
diesel::insert_into(metadata_health)
|
||||
.values(&healthy_records)
|
||||
.on_conflict((tenant_id, shard_number, shard_count))
|
||||
.do_update()
|
||||
.set((healthy.eq(true), last_scrubbed_at.eq(now)))
|
||||
.execute(conn)?;
|
||||
|
||||
diesel::insert_into(metadata_health)
|
||||
.values(&unhealthy_records)
|
||||
.on_conflict((tenant_id, shard_number, shard_count))
|
||||
.do_update()
|
||||
.set((healthy.eq(false), last_scrubbed_at.eq(now)))
|
||||
.execute(conn)?;
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Lists all the metadata health records.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn list_metadata_health_records(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::ListMetadataHealth,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
Ok(
|
||||
crate::schema::metadata_health::table
|
||||
.load::<MetadataHealthPersistence>(conn)?,
|
||||
)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Lists all the metadata health records that is unhealthy.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn list_unhealthy_metadata_health_records(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
|
||||
use crate::schema::metadata_health::dsl::*;
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::ListMetadataHealthUnhealthy,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
Ok(crate::schema::metadata_health::table
|
||||
.filter(healthy.eq(false))
|
||||
.load::<MetadataHealthPersistence>(conn)?)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Lists all the metadata health records that have not been updated since an `earlier` time.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn list_outdated_metadata_health_records(
|
||||
&self,
|
||||
earlier: chrono::DateTime<chrono::Utc>,
|
||||
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
|
||||
use crate::schema::metadata_health::dsl::*;
|
||||
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::ListMetadataHealthOutdated,
|
||||
move |conn| -> DatabaseResult<_> {
|
||||
let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
|
||||
let res = query.load::<MetadataHealthPersistence>(conn)?;
|
||||
|
||||
Ok(res)
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
@@ -854,59 +744,3 @@ pub(crate) struct NodePersistence {
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: i32,
|
||||
}
|
||||
|
||||
/// Tenant metadata health status that are stored durably.
|
||||
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
|
||||
#[diesel(table_name = crate::schema::metadata_health)]
|
||||
pub(crate) struct MetadataHealthPersistence {
|
||||
#[serde(default)]
|
||||
pub(crate) tenant_id: String,
|
||||
#[serde(default)]
|
||||
pub(crate) shard_number: i32,
|
||||
#[serde(default)]
|
||||
pub(crate) shard_count: i32,
|
||||
|
||||
pub(crate) healthy: bool,
|
||||
pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
impl MetadataHealthPersistence {
|
||||
pub fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
healthy: bool,
|
||||
last_scrubbed_at: chrono::DateTime<chrono::Utc>,
|
||||
) -> Self {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_number = tenant_shard_id.shard_number.0 as i32;
|
||||
let shard_count = tenant_shard_id.shard_count.literal() as i32;
|
||||
|
||||
MetadataHealthPersistence {
|
||||
tenant_id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
healthy,
|
||||
last_scrubbed_at,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
|
||||
Ok(TenantShardId {
|
||||
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
|
||||
shard_number: ShardNumber(self.shard_number as u8),
|
||||
shard_count: ShardCount::new(self.shard_count as u8),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MetadataHealthPersistence> for MetadataHealthRecord {
|
||||
fn from(value: MetadataHealthPersistence) -> Self {
|
||||
MetadataHealthRecord {
|
||||
tenant_shard_id: value
|
||||
.get_tenant_shard_id()
|
||||
.expect("stored tenant id should be valid"),
|
||||
healthy: value.healthy,
|
||||
last_scrubbed_at: value.last_scrubbed_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::failpoint_support;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -750,8 +749,6 @@ impl Reconciler {
|
||||
self.location_config(&node, conf, None, false).await?;
|
||||
}
|
||||
|
||||
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,5 @@
|
||||
// @generated automatically by Diesel CLI.
|
||||
|
||||
diesel::table! {
|
||||
metadata_health (tenant_id, shard_number, shard_count) {
|
||||
tenant_id -> Varchar,
|
||||
shard_number -> Int4,
|
||||
shard_count -> Int4,
|
||||
healthy -> Bool,
|
||||
last_scrubbed_at -> Timestamptz,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
nodes (node_id) {
|
||||
node_id -> Int8,
|
||||
@@ -36,4 +26,4 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);
|
||||
diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,);
|
||||
|
||||
@@ -15,8 +15,7 @@ use crate::{
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
metrics::LeadershipStatusGroup,
|
||||
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
@@ -33,11 +32,11 @@ use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
|
||||
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
|
||||
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
|
||||
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
|
||||
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
|
||||
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TenantShardMigrateResponse, UtilizationScore,
|
||||
},
|
||||
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
|
||||
};
|
||||
@@ -82,7 +81,6 @@ use crate::{
|
||||
ReconcilerWaiter, TenantShard,
|
||||
},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
@@ -133,24 +131,6 @@ enum NodeOperations {
|
||||
Delete,
|
||||
}
|
||||
|
||||
/// The leadership status for the storage controller process.
|
||||
/// Allowed transitions are:
|
||||
/// 1. Leader -> SteppedDown
|
||||
/// 2. Candidate -> Leader
|
||||
#[derive(Copy, Clone, strum_macros::Display, measured::FixedCardinalityLabel)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub(crate) enum LeadershipStatus {
|
||||
/// This is the steady state where the storage controller can produce
|
||||
/// side effects in the cluster.
|
||||
Leader,
|
||||
/// We've been notified to step down by another candidate. No reconciliations
|
||||
/// take place in this state.
|
||||
SteppedDown,
|
||||
/// Initial state for a new storage controller instance. Will attempt to assume leadership.
|
||||
#[allow(unused)]
|
||||
Candidate,
|
||||
}
|
||||
|
||||
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
|
||||
|
||||
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
|
||||
@@ -160,8 +140,6 @@ const MAX_DELAYED_RECONCILES: usize = 10000;
|
||||
|
||||
// Top level state available to all HTTP handlers
|
||||
struct ServiceState {
|
||||
leadership_status: LeadershipStatus,
|
||||
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
@@ -224,21 +202,7 @@ impl ServiceState {
|
||||
scheduler: Scheduler,
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
) -> Self {
|
||||
let status = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_leadership_status;
|
||||
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Leader,
|
||||
},
|
||||
1,
|
||||
);
|
||||
|
||||
Self {
|
||||
// TODO: Starting up as Leader is a transient state. Once we enable rolling
|
||||
// upgrades on the k8s side, we should start up as Candidate.
|
||||
leadership_status: LeadershipStatus::Leader,
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
@@ -256,37 +220,6 @@ impl ServiceState {
|
||||
) {
|
||||
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
|
||||
}
|
||||
|
||||
fn get_leadership_status(&self) -> LeadershipStatus {
|
||||
self.leadership_status
|
||||
}
|
||||
|
||||
fn step_down(&mut self) {
|
||||
self.leadership_status = LeadershipStatus::SteppedDown;
|
||||
|
||||
let status = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_leadership_status;
|
||||
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::SteppedDown,
|
||||
},
|
||||
1,
|
||||
);
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Leader,
|
||||
},
|
||||
0,
|
||||
);
|
||||
status.set(
|
||||
LeadershipStatusGroup {
|
||||
status: LeadershipStatus::Candidate,
|
||||
},
|
||||
0,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -470,30 +403,11 @@ struct ShardUpdate {
|
||||
generation: Option<Generation>,
|
||||
}
|
||||
|
||||
enum StopReconciliationsReason {
|
||||
ShuttingDown,
|
||||
SteppingDown,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for StopReconciliationsReason {
|
||||
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
let s = match self {
|
||||
Self::ShuttingDown => "Shutting down",
|
||||
Self::SteppingDown => "Stepping down",
|
||||
};
|
||||
write!(writer, "{}", s)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum ReconcileResultRequest {
|
||||
ReconcileResult(ReconcileResult),
|
||||
Stop,
|
||||
}
|
||||
|
||||
// TODO: move this into the storcon peer client when that gets added
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -5689,22 +5603,17 @@ impl Service {
|
||||
Ok(std::cmp::max(waiter_count, reconciles_spawned))
|
||||
}
|
||||
|
||||
async fn stop_reconciliations(&self, reason: StopReconciliationsReason) {
|
||||
pub async fn shutdown(&self) {
|
||||
// Cancel all on-going reconciles and wait for them to exit the gate.
|
||||
tracing::info!("{reason}: cancelling and waiting for in-flight reconciles");
|
||||
tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles");
|
||||
self.reconcilers_cancel.cancel();
|
||||
self.reconcilers_gate.close().await;
|
||||
|
||||
// Signal the background loop in [`Service::process_results`] to exit once
|
||||
// it has proccessed the results from all the reconciles we cancelled earlier.
|
||||
tracing::info!("{reason}: processing results from previously in-flight reconciles");
|
||||
tracing::info!("Shutting down: processing results from previously in-flight reconciles");
|
||||
self.result_tx.send(ReconcileResultRequest::Stop).ok();
|
||||
self.result_tx.closed().await;
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
self.stop_reconciliations(StopReconciliationsReason::ShuttingDown)
|
||||
.await;
|
||||
|
||||
// Background tasks hold gate guards: this notifies them of the cancellation and
|
||||
// waits for them all to complete.
|
||||
@@ -6094,89 +6003,4 @@ impl Service {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Updates scrubber metadata health check results.
|
||||
pub(crate) async fn metadata_health_update(
|
||||
&self,
|
||||
update_req: MetadataHealthUpdateRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
let now = chrono::offset::Utc::now();
|
||||
let (healthy_records, unhealthy_records) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let healthy_records = update_req
|
||||
.healthy_tenant_shards
|
||||
.into_iter()
|
||||
// Retain only health records associated with tenant shards managed by storage controller.
|
||||
.filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id))
|
||||
.map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, true, now))
|
||||
.collect();
|
||||
let unhealthy_records = update_req
|
||||
.unhealthy_tenant_shards
|
||||
.into_iter()
|
||||
.filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id))
|
||||
.map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, false, now))
|
||||
.collect();
|
||||
|
||||
(healthy_records, unhealthy_records)
|
||||
};
|
||||
|
||||
self.persistence
|
||||
.update_metadata_health_records(healthy_records, unhealthy_records, now)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Lists the tenant shards that has unhealthy metadata status.
|
||||
pub(crate) async fn metadata_health_list_unhealthy(
|
||||
&self,
|
||||
) -> Result<Vec<TenantShardId>, ApiError> {
|
||||
let result = self
|
||||
.persistence
|
||||
.list_unhealthy_metadata_health_records()
|
||||
.await?
|
||||
.iter()
|
||||
.map(|p| p.get_tenant_shard_id().unwrap())
|
||||
.collect();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Lists the tenant shards that have not been scrubbed for some duration.
|
||||
pub(crate) async fn metadata_health_list_outdated(
|
||||
&self,
|
||||
not_scrubbed_for: Duration,
|
||||
) -> Result<Vec<MetadataHealthRecord>, ApiError> {
|
||||
let earlier = chrono::offset::Utc::now() - not_scrubbed_for;
|
||||
let result = self
|
||||
.persistence
|
||||
.list_outdated_metadata_health_records(earlier)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|record| record.into())
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) fn get_leadership_status(&self) -> LeadershipStatus {
|
||||
self.inner.read().unwrap().get_leadership_status()
|
||||
}
|
||||
|
||||
pub(crate) async fn step_down(&self) -> GlobalObservedState {
|
||||
tracing::info!("Received step down request from peer");
|
||||
|
||||
self.inner.write().unwrap().step_down();
|
||||
// TODO: would it make sense to have a time-out for this?
|
||||
self.stop_reconciliations(StopReconciliationsReason::SteppingDown)
|
||||
.await;
|
||||
|
||||
let mut global_observed = GlobalObservedState::default();
|
||||
let locked = self.inner.read().unwrap();
|
||||
for (tid, tenant_shard) in locked.tenants.iter() {
|
||||
global_observed
|
||||
.0
|
||||
.insert(*tid, tenant_shard.observed.clone());
|
||||
}
|
||||
|
||||
global_observed
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::Serialize;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{instrument, Instrument};
|
||||
@@ -284,7 +284,7 @@ impl Drop for IntentState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
|
||||
#[derive(Default, Clone, Serialize)]
|
||||
pub(crate) struct ObservedState {
|
||||
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
|
||||
}
|
||||
@@ -298,7 +298,7 @@ pub(crate) struct ObservedState {
|
||||
/// what it is (e.g. we failed partway through configuring it)
|
||||
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
|
||||
/// and that configuration will still be present unless something external interfered.
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(crate) struct ObservedStateLocation {
|
||||
/// If None, it means we do not know the status of this shard's location on this node, but
|
||||
/// we know that we might have some state on this node.
|
||||
|
||||
@@ -87,8 +87,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
.push(format!("index_part.json version: {}", index_part.version()))
|
||||
}
|
||||
|
||||
let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(2);
|
||||
if !newest_versions.any(|ip| ip == &index_part.version()) {
|
||||
if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
|
||||
info!(
|
||||
"index_part.json version is not latest: {}",
|
||||
index_part.version()
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
use std::pin::pin;
|
||||
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use remote_storage::ListingMode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
|
||||
stream_objects_with_retries, BucketConfig, NodeKind,
|
||||
checks::parse_layer_object_name, init_remote, list_objects_with_retries,
|
||||
metadata_stream::stream_tenants, BucketConfig, NodeKind,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -50,38 +47,45 @@ pub async fn find_large_objects(
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (remote_client, target) =
|
||||
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
let remote_client = remote_client.clone();
|
||||
let s3_client = s3_client.clone();
|
||||
async move {
|
||||
let mut objects = Vec::new();
|
||||
let mut total_objects_ctr = 0u64;
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut objects_stream = pin!(stream_objects_with_retries(
|
||||
&remote_client,
|
||||
ListingMode::NoDelimiter,
|
||||
&tenant_root
|
||||
));
|
||||
while let Some(listing) = objects_stream.next().await {
|
||||
let listing = listing?;
|
||||
for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
|
||||
let key = obj.key.to_string();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
total_objects_ctr += listing.keys.len() as u64;
|
||||
total_objects_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok((tenant_shard_id, objects, total_objects_ctr))
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -19,7 +18,7 @@ use utils::id::TenantId;
|
||||
|
||||
use crate::{
|
||||
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
|
||||
init_remote, init_remote_generic, list_objects_with_retries,
|
||||
init_remote, init_remote_generic,
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants},
|
||||
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
|
||||
};
|
||||
@@ -28,11 +27,6 @@ use crate::{
|
||||
enum GarbageReason {
|
||||
DeletedInConsole,
|
||||
MissingInConsole,
|
||||
|
||||
// The remaining data relates to a known deletion issue, and we're sure that purging this
|
||||
// will not delete any real data, for example https://github.com/neondatabase/neon/pull/7928 where
|
||||
// there is nothing in a tenant path apart from a heatmap file.
|
||||
KnownBug,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -78,15 +72,6 @@ impl GarbageList {
|
||||
}
|
||||
}
|
||||
|
||||
/// If an entity has been identified as requiring purge due to a known bug, e.g.
|
||||
/// a particular type of object left behind after an incomplete deletion.
|
||||
fn append_buggy(&mut self, entity: GarbageEntity) {
|
||||
self.items.push(GarbageItem {
|
||||
entity,
|
||||
reason: GarbageReason::KnownBug,
|
||||
});
|
||||
}
|
||||
|
||||
/// Return true if appended, false if not. False means the result was not garbage.
|
||||
fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
|
||||
where
|
||||
@@ -234,71 +219,6 @@ async fn find_garbage_inner(
|
||||
assert!(project.tenant == tenant_shard_id.tenant_id);
|
||||
}
|
||||
|
||||
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
|
||||
// identify it as purge-able anyway
|
||||
if console_result.is_none() {
|
||||
let timelines = stream_tenant_timelines(&s3_client, &target, tenant_shard_id)
|
||||
.await?
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if timelines.is_empty() {
|
||||
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
|
||||
let tenant_objects = list_objects_with_retries(
|
||||
&s3_client,
|
||||
&target.tenant_root(&tenant_shard_id),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let object = tenant_objects.contents.as_ref().unwrap().first().unwrap();
|
||||
if object.key.as_ref().unwrap().ends_with("heatmap-v1.json") {
|
||||
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
|
||||
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
|
||||
continue;
|
||||
} else {
|
||||
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key.as_ref().unwrap());
|
||||
}
|
||||
} else {
|
||||
// A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
|
||||
// rollout of WAL DR in which we never deleted these.
|
||||
let mut any_non_initdb = false;
|
||||
|
||||
for timeline_r in timelines {
|
||||
let timeline = timeline_r?;
|
||||
let timeline_objects = list_objects_with_retries(
|
||||
&s3_client,
|
||||
&target.timeline_root(&timeline),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
if timeline_objects
|
||||
.common_prefixes
|
||||
.as_ref()
|
||||
.map(|v| v.len())
|
||||
.unwrap_or(0)
|
||||
> 0
|
||||
{
|
||||
// Sub-paths? Unexpected
|
||||
any_non_initdb = true;
|
||||
} else {
|
||||
let object = timeline_objects.contents.as_ref().unwrap().first().unwrap();
|
||||
if object.key.as_ref().unwrap().ends_with("initdb.tar.zst") {
|
||||
tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
|
||||
} else {
|
||||
any_non_initdb = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if any_non_initdb {
|
||||
tracing::info!("Tenant {tenant_shard_id}: is missing in console and contains timelines, one or more of which are more than just initdb");
|
||||
} else {
|
||||
tracing::info!("Tenant {tenant_shard_id}: is missing in console and contains only timelines that only contain initdb");
|
||||
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
|
||||
tracing::debug!("Tenant {tenant_shard_id} is garbage");
|
||||
} else {
|
||||
@@ -429,6 +349,9 @@ pub async fn get_timeline_objects(
|
||||
tracing::debug!("Listing objects in timeline {ttid}");
|
||||
let timeline_root = super::remote_timeline_path_id(&ttid);
|
||||
|
||||
// TODO: apply extra validation based on object modification time. Don't purge
|
||||
// timelines whose index_part.json has been touched recently.
|
||||
|
||||
let list = s3_client
|
||||
.list(
|
||||
Some(&timeline_root),
|
||||
@@ -499,7 +422,6 @@ impl DeletionProgressTracker {
|
||||
pub async fn purge_garbage(
|
||||
input_path: String,
|
||||
mode: PurgeMode,
|
||||
min_age: Duration,
|
||||
dry_run: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let list_bytes = tokio::fs::read(&input_path).await?;
|
||||
@@ -510,7 +432,7 @@ pub async fn purge_garbage(
|
||||
input_path
|
||||
);
|
||||
|
||||
let (remote_client, _target) =
|
||||
let remote_client =
|
||||
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
|
||||
|
||||
assert_eq!(
|
||||
@@ -537,7 +459,6 @@ pub async fn purge_garbage(
|
||||
.filter(|i| match (&mode, &i.reason) {
|
||||
(PurgeMode::DeletedAndMissing, _) => true,
|
||||
(PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
|
||||
(PurgeMode::DeletedOnly, GarbageReason::KnownBug) => true,
|
||||
(PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
|
||||
});
|
||||
|
||||
@@ -566,37 +487,6 @@ pub async fn purge_garbage(
|
||||
let mut progress_tracker = DeletionProgressTracker::default();
|
||||
while let Some(result) = get_objects_results.next().await {
|
||||
let mut object_list = result?;
|
||||
|
||||
// Extra safety check: even if a collection of objects is garbage, check max() of modification
|
||||
// times before purging, so that if we incorrectly marked a live tenant as garbage then we would
|
||||
// notice that its index has been written recently and would omit deleting it.
|
||||
if object_list.is_empty() {
|
||||
// Simplify subsequent code by ensuring list always has at least one item
|
||||
// Usually, this only occurs if there is parallel deletions racing us, as there is no empty prefixes
|
||||
continue;
|
||||
}
|
||||
let max_mtime = object_list.iter().map(|o| o.last_modified).max().unwrap();
|
||||
let age = max_mtime.elapsed();
|
||||
match age {
|
||||
Err(_) => {
|
||||
tracing::warn!("Bad last_modified time");
|
||||
continue;
|
||||
}
|
||||
Ok(a) if a < min_age => {
|
||||
// Failed age check. This doesn't mean we did something wrong: a tenant might really be garbage and recently
|
||||
// written, but out of an abundance of caution we still don't purge it.
|
||||
tracing::info!(
|
||||
"Skipping tenant with young objects {}..{}",
|
||||
object_list.first().as_ref().unwrap().key,
|
||||
object_list.last().as_ref().unwrap().key
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Ok(_) => {
|
||||
// Passed age check
|
||||
}
|
||||
}
|
||||
|
||||
objects_to_delete.append(&mut object_list);
|
||||
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
|
||||
do_delete(
|
||||
|
||||
@@ -22,18 +22,16 @@ use aws_sdk_s3::Client;
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::ValueEnum;
|
||||
use futures::{Stream, StreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, RemoteStorageKind,
|
||||
S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
@@ -321,35 +319,27 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_root_target(
|
||||
bucket_name: String,
|
||||
prefix_in_bucket: String,
|
||||
node_kind: NodeKind,
|
||||
) -> RootTarget {
|
||||
let s3_target = S3Target {
|
||||
bucket_name,
|
||||
prefix_in_bucket,
|
||||
delimiter: "/".to_string(),
|
||||
};
|
||||
match node_kind {
|
||||
NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
|
||||
NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_remote(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
|
||||
let bucket_region = Region::new(bucket_config.region);
|
||||
let delimiter = "/".to_string();
|
||||
let s3_client = Arc::new(init_s3_client(bucket_region).await);
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
|
||||
let s3_root = make_root_target(
|
||||
bucket_config.bucket,
|
||||
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
node_kind,
|
||||
);
|
||||
let s3_root = match node_kind {
|
||||
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
|
||||
bucket_name: bucket_config.bucket,
|
||||
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
delimiter,
|
||||
}),
|
||||
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
|
||||
bucket_name: bucket_config.bucket,
|
||||
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
delimiter,
|
||||
}),
|
||||
};
|
||||
|
||||
Ok((s3_client, s3_root))
|
||||
}
|
||||
@@ -357,12 +347,12 @@ async fn init_remote(
|
||||
async fn init_remote_generic(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
let endpoint = env::var("AWS_ENDPOINT_URL").ok();
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
|
||||
let storage = S3Config {
|
||||
bucket_name: bucket_config.bucket.clone(),
|
||||
bucket_name: bucket_config.bucket,
|
||||
bucket_region: bucket_config.region,
|
||||
prefix_in_bucket,
|
||||
endpoint,
|
||||
@@ -376,13 +366,7 @@ async fn init_remote_generic(
|
||||
storage: RemoteStorageKind::AwsS3(storage),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
|
||||
// We already pass the prefix to the remote client above
|
||||
let prefix_in_root_target = String::new();
|
||||
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
|
||||
|
||||
let client = GenericRemoteStorage::from_config(&storage_config).await?;
|
||||
Ok((client, s3_root))
|
||||
GenericRemoteStorage::from_config(&storage_config).await
|
||||
}
|
||||
|
||||
async fn list_objects_with_retries(
|
||||
@@ -420,44 +404,6 @@ async fn list_objects_with_retries(
|
||||
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
|
||||
}
|
||||
|
||||
fn stream_objects_with_retries<'a>(
|
||||
storage_client: &'a GenericRemoteStorage,
|
||||
listing_mode: ListingMode,
|
||||
s3_target: &'a S3Target,
|
||||
) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
|
||||
async_stream::stream! {
|
||||
let mut trial = 0;
|
||||
let cancel = CancellationToken::new();
|
||||
let prefix_str = &s3_target
|
||||
.prefix_in_bucket
|
||||
.strip_prefix("/")
|
||||
.unwrap_or(&s3_target.prefix_in_bucket);
|
||||
let prefix = RemotePath::from_string(prefix_str)?;
|
||||
let mut list_stream =
|
||||
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
|
||||
while let Some(res) = list_stream.next().await {
|
||||
if let Err(err) = res {
|
||||
let yield_err = if err.is_permanent() {
|
||||
true
|
||||
} else {
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
trial += 1;
|
||||
trial == MAX_RETRIES - 1
|
||||
};
|
||||
if yield_err {
|
||||
yield Err(err)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
trial = 0;
|
||||
yield res.map_err(anyhow::Error::from);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_object_with_retries(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
|
||||
@@ -50,8 +50,6 @@ enum Command {
|
||||
input_path: String,
|
||||
#[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
|
||||
mode: PurgeMode,
|
||||
#[arg(long = "min-age")]
|
||||
min_age: humantime::Duration,
|
||||
},
|
||||
#[command(verbatim_doc_comment)]
|
||||
ScanMetadata {
|
||||
@@ -198,11 +196,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
|
||||
}
|
||||
Command::PurgeGarbage {
|
||||
input_path,
|
||||
mode,
|
||||
min_age,
|
||||
} => purge_garbage(input_path, mode, min_age.into(), !cli.delete).await,
|
||||
Command::PurgeGarbage { input_path, mode } => {
|
||||
purge_garbage(input_path, mode, !cli.delete).await
|
||||
}
|
||||
Command::TenantSnapshot {
|
||||
tenant_id,
|
||||
output_path,
|
||||
|
||||
@@ -1,41 +1,12 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use anyhow::Context;
|
||||
use async_stream::{stream, try_stream};
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use futures::StreamExt;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode};
|
||||
use tokio_stream::Stream;
|
||||
|
||||
use crate::{
|
||||
list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
|
||||
TenantShardTimelineId,
|
||||
};
|
||||
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
|
||||
pub fn stream_tenants_generic<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
try_stream! {
|
||||
let tenants_target = target.tenants_root();
|
||||
let mut tenants_stream =
|
||||
std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
|
||||
while let Some(chunk) = tenants_stream.next().await {
|
||||
let chunk = chunk?;
|
||||
let entry_ids = chunk.prefixes.iter()
|
||||
.map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
|
||||
for dir_name_res in entry_ids {
|
||||
let dir_name = dir_name_res?;
|
||||
let id = TenantShardId::from_str(dir_name)?;
|
||||
yield id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
||||
pub fn stream_tenants<'a>(
|
||||
s3_client: &'a Client,
|
||||
|
||||
@@ -81,7 +81,7 @@ should go.
|
||||
Useful parameters and commands:
|
||||
|
||||
`--preserve-database-files` to preserve pageserver (layer) and safekeer (segment) timeline files on disk
|
||||
after running a test suite. Such files might be large, so removed by default; but might be useful for debugging or creation of svg images with layer file contents. If `NeonEnvBuilder#preserve_database_files` set to `True` for a particular test, the whole `repo` directory will be attached to Allure report (thus uploaded to S3) as `everything.tar.zst` for this test.
|
||||
after running a test suite. Such files might be large, so removed by default; but might be useful for debugging or creation of svg images with layer file contents.
|
||||
|
||||
Let stdout, stderr and `INFO` log messages go to the terminal instead of capturing them:
|
||||
`./scripts/pytest -s --log-cli-level=INFO ...`
|
||||
|
||||
@@ -222,8 +222,6 @@ class NeonBenchmarker:
|
||||
function by the zenbenchmark fixture
|
||||
"""
|
||||
|
||||
PROPERTY_PREFIX = "neon_benchmarker_"
|
||||
|
||||
def __init__(self, property_recorder: Callable[[str, object], None]):
|
||||
# property recorder here is a pytest fixture provided by junitxml module
|
||||
# https://docs.pytest.org/en/6.2.x/reference.html#pytest.junitxml.record_property
|
||||
@@ -240,7 +238,7 @@ class NeonBenchmarker:
|
||||
Record a benchmark result.
|
||||
"""
|
||||
# just to namespace the value
|
||||
name = f"{self.PROPERTY_PREFIX}_{metric_name}"
|
||||
name = f"neon_benchmarker_{metric_name}"
|
||||
self.property_recorder(
|
||||
name,
|
||||
{
|
||||
@@ -251,18 +249,6 @@ class NeonBenchmarker:
|
||||
},
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def records(
|
||||
cls, user_properties: list[tuple[str, object]]
|
||||
) -> Iterator[tuple[str, dict[str, object]]]:
|
||||
"""
|
||||
Yield all records related to benchmarks
|
||||
"""
|
||||
for property_name, recorded_property in user_properties:
|
||||
if property_name.startswith(cls.PROPERTY_PREFIX):
|
||||
assert isinstance(recorded_property, dict)
|
||||
yield recorded_property["name"], recorded_property
|
||||
|
||||
@contextmanager
|
||||
def record_duration(self, metric_name: str) -> Iterator[None]:
|
||||
"""
|
||||
@@ -439,11 +425,10 @@ def zenbenchmark(
|
||||
yield benchmarker
|
||||
|
||||
results = {}
|
||||
for _, recorded_property in NeonBenchmarker.records(request.node.user_properties):
|
||||
for _, recorded_property in request.node.user_properties:
|
||||
name = recorded_property["name"]
|
||||
value = str(recorded_property["value"])
|
||||
unit = str(recorded_property["unit"]).strip()
|
||||
if unit != "":
|
||||
if (unit := recorded_property["unit"].strip()) != "":
|
||||
value += f" {unit}"
|
||||
results[name] = value
|
||||
|
||||
@@ -492,7 +477,7 @@ def pytest_terminal_summary(
|
||||
for test_report in terminalreporter.stats.get("passed", []):
|
||||
result_entry = []
|
||||
|
||||
for _, recorded_property in NeonBenchmarker.records(test_report.user_properties):
|
||||
for _, recorded_property in test_report.user_properties:
|
||||
if not is_header_printed:
|
||||
terminalreporter.section("Benchmark results", "-")
|
||||
is_header_printed = True
|
||||
|
||||
@@ -449,7 +449,6 @@ class TokenScope(str, Enum):
|
||||
GENERATIONS_API = "generations_api"
|
||||
SAFEKEEPER_DATA = "safekeeperdata"
|
||||
TENANT = "tenant"
|
||||
SCRUBBER = "scrubber"
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
@@ -543,6 +542,21 @@ class NeonEnvBuilder:
|
||||
f"Overriding pageserver default compaction algorithm to {self.pageserver_default_tenant_config_compaction_algorithm}"
|
||||
)
|
||||
|
||||
self.pageserver_get_vectored_impl: Optional[str] = None
|
||||
if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored":
|
||||
self.pageserver_get_vectored_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
|
||||
|
||||
self.pageserver_get_impl: Optional[str] = None
|
||||
if os.getenv("PAGESERVER_GET_IMPL", "") == "vectored":
|
||||
self.pageserver_get_impl = "vectored"
|
||||
log.debug('Overriding pageserver get_impl config to "vectored"')
|
||||
|
||||
self.pageserver_validate_vectored_get: Optional[bool] = None
|
||||
if (validate := os.getenv("PAGESERVER_VALIDATE_VEC_GET")) is not None:
|
||||
self.pageserver_validate_vectored_get = bool(validate)
|
||||
log.debug(f'Overriding pageserver validate_vectored_get config to "{validate}"')
|
||||
|
||||
self.pageserver_aux_file_policy = pageserver_aux_file_policy
|
||||
|
||||
self.safekeeper_extra_opts = safekeeper_extra_opts
|
||||
@@ -1143,6 +1157,12 @@ class NeonEnv:
|
||||
}
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
if config.pageserver_get_vectored_impl is not None:
|
||||
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
|
||||
if config.pageserver_get_impl is not None:
|
||||
ps_cfg["get_impl"] = config.pageserver_get_impl
|
||||
if config.pageserver_validate_vectored_get is not None:
|
||||
ps_cfg["validate_vectored_get"] = config.pageserver_validate_vectored_get
|
||||
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
tenant_config[
|
||||
@@ -1395,7 +1415,7 @@ def _shared_simple_env(
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
@@ -1442,7 +1462,6 @@ def neon_env_builder(
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
record_property: Callable[[str, object], None],
|
||||
) -> Iterator[NeonEnvBuilder]:
|
||||
"""
|
||||
Fixture to create a Neon environment for test.
|
||||
@@ -1471,7 +1490,7 @@ def neon_env_builder(
|
||||
pg_version=pg_version,
|
||||
broker=default_broker,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
@@ -1480,9 +1499,6 @@ def neon_env_builder(
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
) as builder:
|
||||
yield builder
|
||||
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
|
||||
# like `test_output_dir` fixture for attaching all database files to Allure report.
|
||||
record_property("preserve_database_files", builder.preserve_database_files)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -2587,62 +2603,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def metadata_health_update(self, healthy: List[TenantShardId], unhealthy: List[TenantShardId]):
|
||||
body: Dict[str, Any] = {
|
||||
"healthy_tenant_shards": [str(t) for t in healthy],
|
||||
"unhealthy_tenant_shards": [str(t) for t in unhealthy],
|
||||
}
|
||||
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.storage_controller_api}/control/v1/metadata_health/update",
|
||||
json=body,
|
||||
headers=self.headers(TokenScope.SCRUBBER),
|
||||
)
|
||||
|
||||
def metadata_health_list_unhealthy(self):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
def metadata_health_list_outdated(self, duration: str):
|
||||
body: Dict[str, Any] = {"not_scrubbed_for": duration}
|
||||
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated",
|
||||
json=body,
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
def metadata_health_is_healthy(self, outdated_duration: str = "1h") -> bool:
|
||||
"""Metadata is healthy if there is no unhealthy or outdated health records."""
|
||||
|
||||
unhealthy = self.metadata_health_list_unhealthy()
|
||||
outdated = self.metadata_health_list_outdated(outdated_duration)
|
||||
|
||||
healthy = (
|
||||
len(unhealthy["unhealthy_tenant_shards"]) == 0 and len(outdated["health_records"]) == 0
|
||||
)
|
||||
if not healthy:
|
||||
log.info(f"{unhealthy=}, {outdated=}")
|
||||
return healthy
|
||||
|
||||
def step_down(self):
|
||||
log.info("Asking storage controller to step down")
|
||||
response = self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/step_down",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
@@ -4528,16 +4488,7 @@ def test_output_dir(
|
||||
|
||||
yield test_dir
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
allure_attach_from_dir(test_dir)
|
||||
|
||||
|
||||
class FileAndThreadLock:
|
||||
|
||||
@@ -240,18 +240,9 @@ ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
)
|
||||
|
||||
|
||||
def allure_attach_from_dir(dir: Path, preserve_database_files: bool = False):
|
||||
def allure_attach_from_dir(dir: Path):
|
||||
"""Attach all non-empty files from `dir` that matches `ATTACHMENT_NAME_REGEX` to Allure report"""
|
||||
|
||||
if preserve_database_files:
|
||||
zst_file = dir.with_suffix(".tar.zst")
|
||||
with zst_file.open("wb") as zst:
|
||||
cctx = zstandard.ZstdCompressor()
|
||||
with cctx.stream_writer(zst) as compressor:
|
||||
with tarfile.open(fileobj=compressor, mode="w") as tar:
|
||||
tar.add(dir, arcname="")
|
||||
allure.attach.file(zst_file, "everything.tar.zst", "application/zstd", "tar.zst")
|
||||
|
||||
for attachment in Path(dir).glob("**/*"):
|
||||
if ATTACHMENT_NAME_REGEX.fullmatch(attachment.name) and attachment.stat().st_size > 0:
|
||||
name = str(attachment.relative_to(dir))
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
"""
|
||||
Test the logical replication in Neon with the different consumers
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
import clickhouse_connect
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def query_clickhouse(
|
||||
client,
|
||||
query: str,
|
||||
digest: str,
|
||||
) -> None:
|
||||
"""
|
||||
Run the query on the client
|
||||
return answer if successful, raise an exception otherwise
|
||||
"""
|
||||
log.debug("Query: %s", query)
|
||||
res = client.query(query)
|
||||
log.debug(res.result_rows)
|
||||
m = hashlib.sha1()
|
||||
m.update(repr(tuple(res.result_rows)).encode())
|
||||
hash_res = m.hexdigest()
|
||||
log.debug("Hash: %s", hash_res)
|
||||
if hash_res == digest:
|
||||
return
|
||||
raise ValueError("Hash mismatch")
|
||||
|
||||
|
||||
@pytest.mark.remote_cluster
|
||||
def test_clickhouse(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Test the logical replication having ClickHouse as a client
|
||||
"""
|
||||
conn_options = remote_pg.conn_options()
|
||||
for _ in range(5):
|
||||
try:
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
except psycopg2.OperationalError as perr:
|
||||
log.debug(perr)
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
raise TimeoutError
|
||||
cur = conn.cursor()
|
||||
cur.execute("DROP TABLE IF EXISTS table1")
|
||||
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
|
||||
conn.commit()
|
||||
client = clickhouse_connect.get_client(host="clickhouse")
|
||||
client.command("SET allow_experimental_database_materialized_postgresql=1")
|
||||
client.command(
|
||||
"CREATE DATABASE db1_postgres ENGINE = "
|
||||
f"MaterializedPostgreSQL('{conn_options['host']}', "
|
||||
f"'{conn_options['dbname']}', "
|
||||
f"'{conn_options['user']}', '{conn_options['password']}') "
|
||||
"SETTINGS materialized_postgresql_tables_list = 'table1';"
|
||||
)
|
||||
wait_until(
|
||||
120,
|
||||
0.5,
|
||||
lambda: query_clickhouse(
|
||||
client,
|
||||
"select * from db1_postgres.table1 order by 1",
|
||||
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
|
||||
),
|
||||
)
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
|
||||
conn.commit()
|
||||
wait_until(
|
||||
120,
|
||||
0.5,
|
||||
lambda: query_clickhouse(
|
||||
client,
|
||||
"select * from db1_postgres.table1 order by 1",
|
||||
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
|
||||
),
|
||||
)
|
||||
log.debug("Sleeping before final checking if Neon is still alive")
|
||||
time.sleep(3)
|
||||
cur.execute("SELECT 1")
|
||||
@@ -17,7 +17,6 @@ from fixtures.pg_version import PgVersion
|
||||
# 3. Disk space used
|
||||
# 4. Peak memory usage
|
||||
#
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/7124")
|
||||
def test_bulk_insert(neon_with_baseline: PgCompare):
|
||||
env = neon_with_baseline
|
||||
|
||||
|
||||
@@ -389,11 +389,6 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
|
||||
repeat_result = ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, success_timeline, timeout=60
|
||||
)
|
||||
# remote_consistent_lsn_visible will be published only after we've
|
||||
# confirmed the generation, which is not part of what we await during
|
||||
# timeline creation (uploads). mask it out here to avoid flakyness.
|
||||
del success_result["remote_consistent_lsn_visible"]
|
||||
del repeat_result["remote_consistent_lsn_visible"]
|
||||
assert repeat_result == success_result
|
||||
finally:
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
@@ -17,17 +17,22 @@ from fixtures.pg_version import PgVersion
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
if neon_env_builder.pageserver_get_impl == "vectored":
|
||||
reconstruct_function_name = "get_values_reconstruct_data"
|
||||
else:
|
||||
reconstruct_function_name = "get_value_reconstruct_data"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*get_values_reconstruct_data for layer .*",
|
||||
f".*{reconstruct_function_name} for layer .*",
|
||||
".*could not find data for key.*",
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*: layer load failed, assuming permanent failure:.*",
|
||||
".*layer loading failed permanently: load layer: .*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -74,7 +79,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
# (We don't check layer file contents on startup, when loading the timeline)
|
||||
#
|
||||
# This will change when we implement checksums for layers
|
||||
with pytest.raises(Exception, match="get_values_reconstruct_data for layer ") as err:
|
||||
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
|
||||
pg1.start()
|
||||
log.info(
|
||||
f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}"
|
||||
|
||||
@@ -227,6 +227,12 @@ def test_forward_compatibility(
|
||||
)
|
||||
|
||||
try:
|
||||
# Previous version neon_local and pageserver are not aware
|
||||
# of the new config.
|
||||
# TODO: remove these once the previous version of neon local supports them
|
||||
neon_env_builder.pageserver_get_impl = None
|
||||
neon_env_builder.pageserver_validate_vectored_get = None
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
# Use previous version's production binaries (pageserver, safekeeper, pg_distrib_dir, etc.).
|
||||
|
||||
@@ -48,6 +48,8 @@ def test_sharding_smoke(
|
||||
# that the scrubber doesn't barf when it sees a sharded tenant.
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
|
||||
)
|
||||
@@ -370,6 +372,8 @@ def test_sharding_split_smoke(
|
||||
# that the scrubber doesn't barf when it sees a sharded tenant.
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
|
||||
|
||||
env = neon_env_builder.init_configs(True)
|
||||
|
||||
@@ -3,7 +3,7 @@ import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
@@ -1783,198 +1783,3 @@ def test_storage_controller_node_deletion(
|
||||
assert victim.id not in [n["id"] for n in env.storage_controller.node_list()]
|
||||
env.storage_controller.reconcile_all() # FIXME: workaround for optimizations happening on startup, see FIXME above.
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [None, 2])
|
||||
def test_storage_controller_metadata_health(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
shard_count: Optional[int],
|
||||
):
|
||||
"""
|
||||
Create three tenants A, B, C.
|
||||
|
||||
Phase 1:
|
||||
- A: Post healthy status.
|
||||
- B: Post unhealthy status.
|
||||
- C: No updates.
|
||||
|
||||
Phase 2:
|
||||
- B: Post healthy status.
|
||||
- C: Post healthy status.
|
||||
|
||||
Phase 3:
|
||||
- A: Post unhealthy status.
|
||||
|
||||
Phase 4:
|
||||
- Delete tenant A, metadata health status should be deleted as well.
|
||||
"""
|
||||
|
||||
def update_and_query_metadata_health(
|
||||
env: NeonEnv,
|
||||
healthy: List[TenantShardId],
|
||||
unhealthy: List[TenantShardId],
|
||||
outdated_duration: str = "1h",
|
||||
) -> Tuple[Set[str], Set[str]]:
|
||||
"""
|
||||
Update metadata health. Then list tenant shards with unhealthy and
|
||||
outdated metadata health status.
|
||||
"""
|
||||
if healthy or unhealthy:
|
||||
env.storage_controller.metadata_health_update(healthy, unhealthy)
|
||||
result = env.storage_controller.metadata_health_list_unhealthy()
|
||||
unhealthy_res = set(result["unhealthy_tenant_shards"])
|
||||
result = env.storage_controller.metadata_health_list_outdated(outdated_duration)
|
||||
outdated_res = set(record["tenant_shard_id"] for record in result["health_records"])
|
||||
|
||||
return unhealthy_res, outdated_res
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Mock tenant (`initial_tenant``) with healthy scrubber scan result
|
||||
tenant_a_shard_ids = (
|
||||
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=shard_count)
|
||||
if shard_count is not None
|
||||
else [TenantShardId(env.initial_tenant, 0, 0)]
|
||||
)
|
||||
|
||||
# Mock tenant with unhealthy scrubber scan result
|
||||
tenant_b, _ = env.neon_cli.create_tenant(shard_count=shard_count)
|
||||
tenant_b_shard_ids = (
|
||||
env.storage_controller.tenant_shard_split(tenant_b, shard_count=shard_count)
|
||||
if shard_count is not None
|
||||
else [TenantShardId(tenant_b, 0, 0)]
|
||||
)
|
||||
|
||||
# Mock tenant that never gets a health update from scrubber
|
||||
tenant_c, _ = env.neon_cli.create_tenant(shard_count=shard_count)
|
||||
|
||||
tenant_c_shard_ids = (
|
||||
env.storage_controller.tenant_shard_split(tenant_c, shard_count=shard_count)
|
||||
if shard_count is not None
|
||||
else [TenantShardId(tenant_c, 0, 0)]
|
||||
)
|
||||
|
||||
# Metadata health table also updated as tenant shards are created.
|
||||
assert env.storage_controller.metadata_health_is_healthy()
|
||||
|
||||
# post "fake" updates to storage controller db
|
||||
|
||||
unhealthy, outdated = update_and_query_metadata_health(
|
||||
env, healthy=tenant_a_shard_ids, unhealthy=tenant_b_shard_ids
|
||||
)
|
||||
|
||||
log.info(f"After Phase 1: {unhealthy=}, {outdated=}")
|
||||
assert len(unhealthy) == len(tenant_b_shard_ids)
|
||||
for t in tenant_b_shard_ids:
|
||||
assert str(t) in unhealthy
|
||||
assert len(outdated) == 0
|
||||
|
||||
unhealthy, outdated = update_and_query_metadata_health(
|
||||
env, healthy=tenant_b_shard_ids + tenant_c_shard_ids, unhealthy=[]
|
||||
)
|
||||
|
||||
log.info(f"After Phase 2: {unhealthy=}, {outdated=}")
|
||||
assert len(unhealthy) == 0
|
||||
assert len(outdated) == 0
|
||||
|
||||
unhealthy, outdated = update_and_query_metadata_health(
|
||||
env, healthy=[], unhealthy=tenant_a_shard_ids
|
||||
)
|
||||
|
||||
log.info(f"After Phase 3: {unhealthy=}, {outdated=}")
|
||||
assert len(unhealthy) == len(tenant_a_shard_ids)
|
||||
for t in tenant_a_shard_ids:
|
||||
assert str(t) in unhealthy
|
||||
assert len(outdated) == 0
|
||||
|
||||
# Phase 4: Delete A
|
||||
env.storage_controller.pageserver_api().tenant_delete(env.initial_tenant)
|
||||
|
||||
# A's unhealthy metadata health status should be deleted as well.
|
||||
assert env.storage_controller.metadata_health_is_healthy()
|
||||
|
||||
# All shards from B and C are not fresh if set outdated duration to 0 seconds.
|
||||
unhealthy, outdated = update_and_query_metadata_health(
|
||||
env, healthy=[], unhealthy=tenant_a_shard_ids, outdated_duration="0s"
|
||||
)
|
||||
assert len(unhealthy) == 0
|
||||
for t in tenant_b_shard_ids + tenant_c_shard_ids:
|
||||
assert str(t) in outdated
|
||||
|
||||
|
||||
def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the `/control/v1/step_down` storage controller API. Upon receiving such
|
||||
a request, the storage controller cancels any on-going reconciles and replies
|
||||
with 503 to all requests apart from `/control/v1/step_down`, `/status` and `/metrics`.
|
||||
"""
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tid = TenantId.generate()
|
||||
tsid = str(TenantShardId(tid, shard_number=0, shard_count=0))
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
|
||||
|
||||
# Make a change to the tenant config to trigger a slow reconcile
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
virtual_ps_http.patch_tenant_config_client_side(tid, {"compaction_threshold": 5}, None)
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Accepted configuration update but reconciliation failed.*"
|
||||
)
|
||||
|
||||
observed_state = env.storage_controller.step_down()
|
||||
log.info(f"Storage controller stepped down with {observed_state=}")
|
||||
|
||||
# Validate that we waited for the slow reconcile to complete
|
||||
# and updated the observed state in the storcon before stepping down.
|
||||
node_id = str(env.pageserver.id)
|
||||
assert tsid in observed_state
|
||||
assert node_id in observed_state[tsid]["locations"]
|
||||
assert "conf" in observed_state[tsid]["locations"][node_id]
|
||||
assert "tenant_conf" in observed_state[tsid]["locations"][node_id]["conf"]
|
||||
|
||||
tenant_conf = observed_state[tsid]["locations"][node_id]["conf"]["tenant_conf"]
|
||||
assert "compaction_threshold" in tenant_conf
|
||||
assert tenant_conf["compaction_threshold"] == 5
|
||||
|
||||
# Validate that we propagated the change to the pageserver
|
||||
ps_tenant_conf = env.pageserver.http_client().tenant_config(tid)
|
||||
assert "compaction_threshold" in ps_tenant_conf.effective_config
|
||||
assert ps_tenant_conf.effective_config["compaction_threshold"] == 5
|
||||
|
||||
# Validate that the storcon is not replying to the usual requests
|
||||
# once it has stepped down.
|
||||
with pytest.raises(StorageControllerApiException, match="stepped_down"):
|
||||
env.storage_controller.tenant_list()
|
||||
|
||||
# Validate that we can step down multiple times and the observed state
|
||||
# doesn't change.
|
||||
observed_state_again = env.storage_controller.step_down()
|
||||
assert observed_state == observed_state_again
|
||||
|
||||
assert (
|
||||
env.storage_controller.get_metric_value(
|
||||
"storage_controller_leadership_status", filter={"status": "leader"}
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
assert (
|
||||
env.storage_controller.get_metric_value(
|
||||
"storage_controller_leadership_status", filter={"status": "stepped_down"}
|
||||
)
|
||||
== 1
|
||||
)
|
||||
|
||||
assert (
|
||||
env.storage_controller.get_metric_value(
|
||||
"storage_controller_leadership_status", filter={"status": "candidate"}
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
@@ -404,7 +404,7 @@ files:
|
||||
x::text as duration_seconds,
|
||||
neon.approximate_working_set_size_seconds(x) as size
|
||||
from
|
||||
(select generate_series * 60 as x from generate_series(1, 60)) as t (x);
|
||||
(select generate_series * 60 as x from generate_series(1, 60));
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user