Compare commits

..

18 Commits

Author SHA1 Message Date
Heikki Linnakangas
fb4b6ce8dc Add debug information to hunt down port collisions.
We've been seeing a lot of sporadic test failures with "Cannot assign
requested address" lately. Add some debug information to help us find
the cause:

- When server startup fails, print "netstat -tnlap" output to the test
  log. If the failure was caused by "Cannot assign requested address",
  this will hopefully tell us which process was occupying the port.
- In pageserver and safekeeper startup, print its PID. This way, we can
  correlate the PID from netstat output with the test that launched it.
- In safekeeper startup, print the HTTP port it's using to the log, in
  addition to the libpq port. The pageserver was already doing it.
2022-11-30 14:36:19 +02:00
Alexander Bayandin
136b029d7a neon-project-create: fix project creation (#2954)
Update api/v2 call to support changes from
https://github.com/neondatabase/cloud/pull/2929
2022-11-30 09:19:59 +00:00
Heikki Linnakangas
33834c01ec Rename Paused states to Stopping.
I'm not a fan of "Paused", for two reasons:

- Paused implies that the tenant/timeline with no activity on it. That's
  not true; the tenant/timeline can still have active tasks working on it.

- Paused implies that it can be resumed later. It can not. A tenant or
  timeline in this state cannot be switched back to Active state anymore.
  A completely new Tenant or Timeline struct can be constructed for the
  same tenant or timeline later, e.g. if you detach and later re-attach
  the same tenant, but that's a different thing.

Stopping describes the state better. I also considered "ShuttingDown",
but Stopping is simpler as it's a single word.
2022-11-30 01:10:16 +02:00
Heikki Linnakangas
9a6c0be823 storage_sync2
The code in this change was extracted from PR #2595, i.e., Heikki’s draft
PR for on-demand download.

High-Level Changes

- storage_sync module rewrite
- Changes to Tenant Loading
- Changes to Timeline States
- Crash-safe & Resumable Tenant Attach

There are several follow-up work items planned.
Refer to the Epic issue on GitHub:
https://github.com/neondatabase/neon/issues/2029

Metadata:

closes https://github.com/neondatabase/neon/pull/2785

unsquashed history of this patch: archive/pr-2785-storage-sync2/pre-squash

Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>

===============================================================================

storage_sync module rewrite
===========================

The storage_sync code is rewritten. New module name is storage_sync2, mostly to
make a more reasonable git diff.

The updated block comment in storage_sync2.rs describes the changes quite well,
so, we will not reproduce that comment here. TL;DR:
- Global sync queue and RemoteIndex are replaced with per-timeline
  `RemoteTimelineClient` structure that contains a queue for UploadOperations
  to ensure proper ordering and necessary metadata.
- Before deleting local layer files, wait for ongoing UploadOps to finish
  (wait_completion()).
- Download operations are not queued and executed immediately.

Changes to Tenant Loading
=========================

Initial sync part was rewritten as well and represents the other major change
that serves as a foundation for on-demand downloads. Routines for attaching and
loading shifted directly to Tenant struct and now are asynchronous and spawned
into the background.

Since this patch doesn’t introduce on-demand download of layers we fully
synchronize with the remote during pageserver startup. See details in
`Timeline::reconcile_with_remote` and `Timeline::download_missing`.

Changes to Tenant States
========================

The “Active” state has lost its “background_jobs_running: bool” member. That
variable indicated whether the GC & Compaction background loops are spawned or
not. With this patch, they are now always spawned. Unit tests (#[test]) use the
TenantConf::{gc_period,compaction_period} to disable their effect (15db566).

This patch introduces a new tenant state, “Attaching”. A tenant that is being
attached starts in this state and transitions to “Active” once it finishes
download.

The `GET /tenant` endpoints returns `TenantInfo::has_in_progress_downloads`. We
derive the value for that field from the tenant state now, to remain
backwards-compatible with cloud.git. We will remove that field when we switch
to on-demand downloads.

Changes to Timeline States
==========================

The TimelineInfo::awaits_download field is now equivalent to the tenant being
in Attaching state.  Previously, download progress was tracked per timeline.
With this change, it’s only tracked per tenant. When on-demand downloads
arrive, the field will be completely obsolete.  Deprecation is tracked in
isuse #2930.

Crash-safe & Resumable Tenant Attach
====================================

Previously, the attach operation was not persistent. I.e., when tenant attach
was interrupted by a crash, the pageserver would not continue attaching after
pageserver restart. In fact, the half-finished tenant directory on disk would
simply be skipped by tenant_mgr because it lacked the metadata file (it’s
written last). This patch introduces an “attaching” marker file inside that is
present inside the tenant directory while the tenant is attaching. During
pageserver startup, tenant_mgr will resume attach if that file is present. If
not, it assumes that the local tenant state is consistent and tries to load the
tenant. If that fails, the tenant transitions into Broken state.
2022-11-29 18:55:20 +01:00
Heikki Linnakangas
baa8d5a16a Test that physical size is the same before and after re-attaching tenant. 2022-11-29 14:32:01 +02:00
Heikki Linnakangas
fbd5f65938 Misc cosmetic fixes in comments, messages.
Most of these were extracted from PR #2785.
2022-11-29 14:10:45 +02:00
Heikki Linnakangas
1f1324ebed Require tenant to be active when calculating tenant size.
It's not clear if the calculation would work or make sense, if the
tenant is only partially loaded. Let's play it safe, and require it to
be Active.
2022-11-29 14:10:45 +02:00
Alexander Bayandin
fb633b16ac neon-project-create: change default region for staging (#2951)
Change the default region for staging from `us-east-1` to `us-east-2`
for project creation.
Remove REGION_ID from `neon-branch-create` since we don't use it.
2022-11-29 11:38:24 +00:00
Joonas Koivunen
f277140234 Small fixes (#2949)
Nothing interesting in these changes. Passing through the
RUST_BACKTRACE=full will hopefully save someone else panick reproduction
time.

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2022-11-29 10:29:25 +02:00
Arseny Sher
52166799bd Put .proto compilation result to $OUT_DIR/
Sometimes CI build fails with

error: couldn't read storage_broker/src/../proto/storage_broker.rs: No such file or directory (os error 2)
  --> storage_broker/src/lib.rs:14:5
   |
14 |     include!("../proto/storage_broker.rs");
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The root cause is not clear, but it looks like interference with cachepot. Per
cargo docs, build scripts shouldn't output to anywhere but OUT_DIR; let's follow
this and see if it helps.
2022-11-28 20:27:43 +04:00
Sergey Melnikov
0a4e5f8aa3 Setup legacy scram proxy in us-east-2 (#2943) 2022-11-28 17:21:35 +01:00
MMeent
0c1195c30d Fix #2937 (#2940)
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2022-11-28 15:34:07 +01:00
Alexander Bayandin
3ba92d238e Nightly Benchmarks: Fix default db name and clickbench-compare trigger (#2938)
- Fix database name: `main` -> `neondb`
- Fix `clickbench-compare` trigger; the job should be triggered even if
`pgbench-compare` fails
2022-11-28 12:08:04 +00:00
Heikki Linnakangas
67469339fa When new timeline is created, don't wait for compaction. (#2931)
When a new root timeline is created, we want to flush all the data to
disk before we return success to the caller. We were using
checkpoint(CheckpointConfig::Forced) for that, but that also performs
compaction. With the default settings, compaction will have no work
after we have imported an empty database, as the image of that is too
small to require compaction. However, with very small
checkpoint_distance and compaction_target_size, compaction will run, and
it can take a while.

PR #2785 adds new tests that use very small checkpoint_distance and
compaction_target_size settings, and the test sometimes failed with
"operation timed out" error in the client, when the create_timeline step
took too long.
2022-11-28 11:05:20 +02:00
Heikki Linnakangas
0205a44265 Remove obsolete TODO and settings in test
The GC and compaction loops have reacted quickly to shutdown request
since commit 40c845e57d.
2022-11-28 11:04:25 +02:00
Alexander Bayandin
480175852f Nightly Benchmarks: add OLAP-style benchmark (clickbench) (#2855)
Add ClickBench benchmark, an OLAP-style benchmark, to Nightly
Benchmarks.

The full run of 43 queries on the original dataset takes more than 6h
(only 34 queries got processed on in 6h) on our default-sized compute.
Having this, currently, would mean having some really unstable tests
because of our regular deployment to staging/captest environment (see
https://github.com/neondatabase/cloud/issues/1872).

I've reduced the dataset size to the first 10^7 rows from the original
10^8 rows. Now it takes ~30-40 minutes to pass.

Ref https://github.com/ClickHouse/ClickBench/tree/main/aurora-postgresql
Ref https://benchmark.clickhouse.com/
2022-11-25 18:41:26 +00:00
Alexander Bayandin
9fdd228dee GitHub Actions: Add branch related actions (#2877)
Add `neon-branch-create` / `neon-branch-delete` to allow using branches
in tests.
I have a couple of use cases in mind:
- For destructive tests with a big DB, we can create the DB once in
advance and then use branches without the need to recreate the DB itself
after tests change it.
- We can run tests in parallel (if there're compute-bound).

Also migrate API v2 for `neon-project-create` / `neon-project-delete`
2022-11-25 18:18:08 +00:00
Heikki Linnakangas
15db566420 Allow setting gc/compaction_period to 0, to disable automatic GC/compaction
Many python tests were setting the GC/compaction period to large
values, to effectively disable GC / compaction. Reserve value 0 to
mean "explicitly disabled". We also set them to 0 in unit tests now,
although currently, unit tests don't launch the background jobs at
all, so it won't have any effect.

Fixes https://github.com/neondatabase/neon/issues/2917
2022-11-25 20:14:06 +02:00
43 changed files with 1358 additions and 754 deletions

View File

@@ -32,8 +32,8 @@ runs:
exit 2
fi
- name: Calculate key
id: calculate-key
- name: Calculate variables
id: calculate-vars
shell: bash -euxo pipefail {0}
run: |
# TODO: for manually triggered workflows (via workflow_dispatch) we need to have a separate key
@@ -41,14 +41,22 @@ runs:
pr_number=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true)
if [ "${pr_number}" != "null" ]; then
key=pr-${pr_number}
elif [ "${GITHUB_REF}" = "refs/heads/main" ]; then
elif [ "${GITHUB_REF_NAME}" = "main" ]; then
# Shortcut for a special branch
key=main
elif [ "${GITHUB_REF_NAME}" = "release" ]; then
# Shortcut for a special branch
key=release
else
key=branch-$(echo ${GITHUB_REF#refs/heads/} | tr -c "[:alnum:]._-" "-")
key=branch-$(printf "${GITHUB_REF_NAME}" | tr -c "[:alnum:]._-" "-")
fi
echo "KEY=${key}" >> $GITHUB_OUTPUT
# Sanitize test selection to remove `/` and any other special characters
# Use printf instead of echo to avoid having `\n` at the end of the string
test_selection=$(printf "${{ inputs.test_selection }}" | tr -c "[:alnum:]._-" "-" )
echo "TEST_SELECTION=${test_selection}" >> $GITHUB_OUTPUT
- uses: actions/setup-java@v3
if: ${{ inputs.action == 'generate' }}
with:
@@ -74,10 +82,11 @@ runs:
- name: Upload Allure results
if: ${{ inputs.action == 'store' }}
env:
REPORT_PREFIX: reports/${{ steps.calculate-key.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-key.outputs.KEY }}/${{ inputs.build_type }}
REPORT_PREFIX: reports/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
TEST_OUTPUT: /tmp/test_output
BUCKET: neon-github-public-dev
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
shell: bash -euxo pipefail {0}
run: |
# Add metadata
@@ -98,7 +107,7 @@ runs:
BUILD_TYPE=${{ inputs.build_type }}
EOF
ARCHIVE="${GITHUB_RUN_ID}-${{ inputs.test_selection }}-${GITHUB_RUN_ATTEMPT}-$(date +%s).tar.zst"
ARCHIVE="${GITHUB_RUN_ID}-${TEST_SELECTION}-${GITHUB_RUN_ATTEMPT}-$(date +%s).tar.zst"
ZSTD_NBTHREADS=0
tar -C ${TEST_OUTPUT}/allure/results -cf ${ARCHIVE} --zstd .
@@ -109,8 +118,9 @@ runs:
if: ${{ inputs.action == 'generate' }}
shell: bash -euxo pipefail {0}
env:
LOCK_FILE: reports/${{ steps.calculate-key.outputs.KEY }}/lock.txt
LOCK_FILE: reports/${{ steps.calculate-vars.outputs.KEY }}/lock.txt
BUCKET: neon-github-public-dev
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
run: |
LOCK_TIMEOUT=300 # seconds
@@ -123,12 +133,12 @@ runs:
fi
sleep 1
done
echo "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${{ inputs.test_selection }}" > lock.txt
echo "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${TEST_SELECTION}" > lock.txt
aws s3 mv --only-show-errors lock.txt "s3://${BUCKET}/${LOCK_FILE}"
# A double-check that exactly WE have acquired the lock
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${{ inputs.test_selection }}" ]; then
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${TEST_SELECTION}" ]; then
break
fi
done
@@ -137,8 +147,8 @@ runs:
if: ${{ inputs.action == 'generate' }}
id: generate-report
env:
REPORT_PREFIX: reports/${{ steps.calculate-key.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-key.outputs.KEY }}/${{ inputs.build_type }}
REPORT_PREFIX: reports/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
RAW_PREFIX: reports-raw/${{ steps.calculate-vars.outputs.KEY }}/${{ inputs.build_type }}
TEST_OUTPUT: /tmp/test_output
BUCKET: neon-github-public-dev
shell: bash -euxo pipefail {0}
@@ -192,12 +202,13 @@ runs:
if: ${{ inputs.action == 'generate' && always() }}
shell: bash -euxo pipefail {0}
env:
LOCK_FILE: reports/${{ steps.calculate-key.outputs.KEY }}/lock.txt
LOCK_FILE: reports/${{ steps.calculate-vars.outputs.KEY }}/lock.txt
BUCKET: neon-github-public-dev
TEST_SELECTION: ${{ steps.calculate-vars.outputs.TEST_SELECTION }}
run: |
aws s3 cp --only-show-errors "s3://${BUCKET}/${LOCK_FILE}" ./lock.txt || exit 0
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${{ inputs.test_selection }}" ]; then
if [ "$(cat lock.txt)" = "${GITHUB_RUN_ID}-${GITHUB_RUN_ATTEMPT}-${TEST_SELECTION}" ]; then
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi

View File

@@ -0,0 +1,154 @@
name: 'Create Branch'
description: 'Create Branch using API'
inputs:
api_key:
desctiption: 'Neon API key'
required: true
environment:
desctiption: 'dev (aka captest) or staging'
required: true
project_id:
desctiption: 'ID of the Project to create Branch in'
required: true
outputs:
dsn:
description: 'Created Branch DSN (for main database)'
value: ${{ steps.change-password.outputs.dsn }}
branch_id:
description: 'Created Branch ID'
value: ${{ steps.create-branch.outputs.branch_id }}
runs:
using: "composite"
steps:
- name: Parse Input
id: parse-input
shell: bash -euxo pipefail {0}
run: |
case "${ENVIRONMENT}" in
dev)
API_HOST=console.dev.neon.tech
;;
staging)
API_HOST=console.stage.neon.tech
;;
*)
echo 2>&1 "Unknown environment=${ENVIRONMENT}. Allowed 'dev' or 'staging' only"
exit 1
;;
esac
echo "api_host=${API_HOST}" >> $GITHUB_OUTPUT
env:
ENVIRONMENT: ${{ inputs.environment }}
- name: Create New Branch
id: create-branch
shell: bash -euxo pipefail {0}
run: |
for i in $(seq 1 10); do
branch=$(curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}/branches" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}" \
--data "{
\"branch\": {
\"name\": \"Created by actions/neon-branch-create; GITHUB_RUN_ID=${GITHUB_RUN_ID} at $(date +%s)\"
}
}")
if [ -z "${branch}" ]; then
sleep 1
continue
fi
branch_id=$(echo $branch | jq --raw-output '.branch.id')
if [ "${branch_id}" == "null" ]; then
sleep 1
continue
fi
break
done
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
echo 2>&1 "Failed to create branch after 10 attempts, the latest response was: ${branch}"
exit 1
fi
branch_id=$(echo $branch | jq --raw-output '.branch.id')
echo "branch_id=${branch_id}" >> $GITHUB_OUTPUT
host=$(echo $branch | jq --raw-output '.endpoints[0].host')
echo "host=${host}" >> $GITHUB_OUTPUT
env:
API_KEY: ${{ inputs.api_key }}
API_HOST: ${{ steps.parse-input.outputs.api_host }}
PROJECT_ID: ${{ inputs.project_id }}
- name: Get Role name
id: role-name
shell: bash -euxo pipefail {0}
run: |
roles=$(curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}/branches/${BRANCH_ID}/roles" \
--fail \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
)
role_name=$(echo $roles | jq --raw-output '.roles[] | select(.protected == false) | .name')
echo "role_name=${role_name}" >> $GITHUB_OUTPUT
env:
API_KEY: ${{ inputs.api_key }}
API_HOST: ${{ steps.parse-input.outputs.api_host }}
PROJECT_ID: ${{ inputs.project_id }}
BRANCH_ID: ${{ steps.create-branch.outputs.branch_id }}
- name: Change Password
id: change-password
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
for i in $(seq 1 10); do
reset_password=$(curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}/branches/${BRANCH_ID}/roles/${ROLE_NAME}/reset_password" \
--request POST \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
)
if [ -z "${reset_password}" ]; then
sleep 1
continue
fi
password=$(echo $reset_password | jq --raw-output '.role.password')
if [ "${password}" == "null" ]; then
sleep 1
continue
fi
echo "::add-mask::${password}"
break
done
if [ -z "${password}" ] || [ "${password}" == "null" ]; then
echo 2>&1 "Failed to reset password after 10 attempts, the latest response was: ${reset_password}"
exit 1
fi
dsn="postgres://${ROLE_NAME}:${password}@${HOST}/neondb"
echo "::add-mask::${dsn}"
echo "dsn=${dsn}" >> $GITHUB_OUTPUT
env:
API_KEY: ${{ inputs.api_key }}
API_HOST: ${{ steps.parse-input.outputs.api_host }}
PROJECT_ID: ${{ inputs.project_id }}
BRANCH_ID: ${{ steps.create-branch.outputs.branch_id }}
ROLE_NAME: ${{ steps.role-name.outputs.role_name }}
HOST: ${{ steps.create-branch.outputs.host }}

View File

@@ -0,0 +1,79 @@
name: 'Delete Branch'
description: 'Delete Branch using API'
inputs:
api_key:
desctiption: 'Neon API key'
required: true
environment:
desctiption: 'dev (aka captest) or staging'
required: true
project_id:
desctiption: 'ID of the Project which should be deleted'
required: true
branch_id:
desctiption: 'ID of the branch to delete'
required: true
runs:
using: "composite"
steps:
- name: Parse Input
id: parse-input
shell: bash -euxo pipefail {0}
run: |
case "${ENVIRONMENT}" in
dev)
API_HOST=console.dev.neon.tech
;;
staging)
API_HOST=console.stage.neon.tech
;;
*)
echo 2>&1 "Unknown environment=${ENVIRONMENT}. Allowed 'dev' or 'staging' only"
exit 1
;;
esac
echo "api_host=${API_HOST}" >> $GITHUB_OUTPUT
env:
ENVIRONMENT: ${{ inputs.environment }}
- name: Delete Branch
# Do not try to delete a branch if .github/actions/neon-project-create
# or .github/actions/neon-branch-create failed before
if: ${{ inputs.project_id != '' && inputs.branch_id != '' }}
shell: bash -euxo pipefail {0}
run: |
for i in $(seq 1 10); do
deleted_branch=$(curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}/branches/${BRANCH_ID}" \
--request DELETE \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
)
if [ -z "${deleted_branch}" ]; then
sleep 1
continue
fi
branch_id=$(echo $deleted_branch | jq --raw-output '.branch.id')
if [ "${branch_id}" == "null" ]; then
sleep 1
continue
fi
break
done
if [ -z "${branch_id}" ] || [ "${branch_id}" == "null" ]; then
echo 2>&1 "Failed to delete branch after 10 attempts, the latest response was: ${deleted_branch}"
exit 1
fi
env:
API_KEY: ${{ inputs.api_key }}
PROJECT_ID: ${{ inputs.project_id }}
BRANCH_ID: ${{ inputs.branch_id }}
API_HOST: ${{ steps.parse-input.outputs.api_host }}

View File

@@ -6,7 +6,7 @@ inputs:
desctiption: 'Neon API key'
required: true
environment:
desctiption: 'dev (aka captest) or stage'
desctiption: 'dev (aka captest) or staging'
required: true
region_id:
desctiption: 'Region ID, if not set the project will be created in the default region'
@@ -29,11 +29,11 @@ runs:
case "${ENVIRONMENT}" in
dev)
API_HOST=console.dev.neon.tech
REGION_ID=${REGION_ID:-eu-west-1}
REGION_ID=${REGION_ID:-aws-eu-west-1}
;;
staging)
API_HOST=console.stage.neon.tech
REGION_ID=${REGION_ID:-us-east-1}
REGION_ID=${REGION_ID:-aws-us-east-2}
;;
*)
echo 2>&1 "Unknown environment=${ENVIRONMENT}. Allowed 'dev' or 'staging' only"
@@ -53,7 +53,7 @@ runs:
shell: bash -euo pipefail {0}
run: |
project=$(curl \
"https://${API_HOST}/api/v1/projects" \
"https://${API_HOST}/api/v2/projects" \
--fail \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
@@ -61,7 +61,6 @@ runs:
--data "{
\"project\": {
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"platform_id\": \"aws\",
\"region_id\": \"${REGION_ID}\",
\"settings\": { }
}
@@ -70,11 +69,11 @@ runs:
# Mask password
echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')"
dsn=$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .dsn')/main
dsn=$(echo $project | jq --raw-output '.connection_uris[0].connection_uri')
echo "::add-mask::${dsn}"
echo "dsn=${dsn}" >> $GITHUB_OUTPUT
project_id=$(echo $project | jq --raw-output '.id')
project_id=$(echo $project | jq --raw-output '.project.id')
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
env:
API_KEY: ${{ inputs.api_key }}

View File

@@ -6,7 +6,7 @@ inputs:
desctiption: 'Neon API key'
required: true
environment:
desctiption: 'dev (aka captest) or stage'
desctiption: 'dev (aka captest) or staging'
required: true
project_id:
desctiption: 'ID of the Project to delete'
@@ -37,17 +37,17 @@ runs:
ENVIRONMENT: ${{ inputs.environment }}
- name: Delete Neon Project
# Do not try to delete a project if .github/actions/neon-project-create failed before
if: ${{ inputs.project_id != '' }}
shell: bash -euxo pipefail {0}
run: |
# Allow PROJECT_ID to be empty/null for cases when .github/actions/neon-project-create failed
if [ -n "${PROJECT_ID}" ]; then
curl -X "POST" \
"https://${API_HOST}/api/v1/projects/${PROJECT_ID}/delete" \
--fail \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
fi
curl \
"https://${API_HOST}/api/v2/projects/${PROJECT_ID}" \
--fail \
--request DELETE \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer ${API_KEY}"
env:
API_KEY: ${{ inputs.api_key }}
PROJECT_ID: ${{ inputs.project_id }}

View File

@@ -0,0 +1,31 @@
# Helm chart values for neon-proxy-scram.
# This is a YAML-formatted file.
image:
repository: neondatabase/neon
settings:
authBackend: "console"
authEndpoint: "http://console-staging.local/management/api/v2"
domain: "*.cloud.stage.neon.tech"
# -- Additional labels for neon-proxy pods
podLabels:
zenith_service: proxy-scram-legacy
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: neon-proxy-scram-legacy.beta.us-east-2.aws.neon.build
#metrics:
# enabled: true
# serviceMonitor:
# enabled: true
# selector:
# release: kube-prometheus-stack

View File

@@ -110,8 +110,14 @@ jobs:
rm -rf perf-report-staging
mkdir -p perf-report-staging
# Set --sparse-ordering option of pytest-order plugin to ensure tests are running in order of appears in the file,
# it's important for test_perf_pgbench.py::test_pgbench_remote_* tests
./scripts/pytest test_runner/performance/ -v -m "remote_cluster" --sparse-ordering --out-dir perf-report-staging --timeout 5400
# it's important for test_perf_pgbench.py::test_pgbench_remote_* tests.
# Do not run tests from test_runner/performance/test_perf_olap.py because they require a prepared DB. We run them separately in `clickbench-compare` job.
./scripts/pytest test_runner/performance/ -v \
-m "remote_cluster" \
--sparse-ordering \
--out-dir perf-report-staging \
--timeout 5400 \
--ignore test_runner/performance/test_perf_olap.py
- name: Submit result
env:
@@ -207,7 +213,7 @@ jobs:
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
CONNSTR=${{ secrets.BENCHMARK_RDS_CONNSTR }}
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CONNSTR }}
;;
rds-postgres)
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
@@ -225,8 +231,8 @@ jobs:
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET seqscan_prefetch_buffers=10"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE neondb SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE neondb SET seqscan_prefetch_buffers=10"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
@@ -292,3 +298,112 @@ jobs:
slack-message: "Periodic perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
clickbench-compare:
# ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters
# we use for performance testing in pgbench-compare.
# Run this job only when pgbench-compare is finished to avoid the intersection.
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: success() || failure()
needs: [ pgbench-compare ]
strategy:
fail-fast: false
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, dev, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-prefetch)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
;;
rds-aurora)
CONNSTR=${{ secrets.BENCHMARK_RDS_AURORA_CLICKBENCH_10M_CONNSTR }}
;;
rds-postgres)
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE main SET seqscan_prefetch_buffers=10"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark clickbench
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_perf_olap.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_clickbench
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Create Allure report
if: success() || failure()
uses: ./.github/actions/allure-report
with:
action: generate
build_type: ${{ env.BUILD_TYPE }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic OLAP perf testing ${{ matrix.platform }}: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -905,9 +905,11 @@ jobs:
- target_region: us-east-2
target_cluster: dev-us-east-2-beta
deploy_link_proxy: true
deploy_legacy_scram_proxy: true
- target_region: eu-west-1
target_cluster: dev-eu-west-1-zeta
deploy_link_proxy: false
deploy_legacy_scram_proxy: false
steps:
- name: Checkout
uses: actions/checkout@v3
@@ -931,6 +933,12 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-link neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-link.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
- name: Re-deploy legacy scram proxy
if: matrix.deploy_legacy_scram_proxy
run: |
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram-legacy neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram-legacy.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
deploy-proxy-prod-new:
runs-on: prod
container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest

View File

@@ -209,7 +209,14 @@ pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> any
}
fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", "1");
// If RUST_BACKTRACE is set, pass it through. But if it's not set, default
// to RUST_BACKTRACE=1.
let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
let backtrace_setting = backtrace_setting
.as_deref()
.unwrap_or_else(|| OsStr::new("1"));
let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
// Pass through these environment variables to the command
for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {

View File

@@ -23,7 +23,7 @@ pub enum TenantState {
Active,
/// A tenant is recognized by pageserver, but it is being detached or the
/// system is being shut down.
Paused,
Stopping,
/// A tenant is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken,
@@ -35,7 +35,7 @@ impl TenantState {
Self::Loading => true,
Self::Attaching => true,
Self::Active => false,
Self::Paused => false,
Self::Stopping => false,
Self::Broken => false,
}
}
@@ -53,7 +53,7 @@ pub enum TimelineState {
Suspended,
/// A timeline is recognized by pageserver, but not yet ready to operate and not allowed to
/// automatically become Active after certain events: only a management call can change this status.
Paused,
Stopping,
/// A timeline is recognized by the pageserver, but can no longer be used for
/// any operations, because it failed to be activated.
Broken,

View File

@@ -239,6 +239,8 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
// we need to release the lock file only when the current process is gone
let _ = Box::leak(Box::new(lock_file));
info!("Created PID file with PID {}", Pid::this().to_string());
// TODO: Check that it looks like a valid repository before going further
// bind sockets before daemonizing so we report errors early and do not return until we are listening

View File

@@ -10,7 +10,8 @@ pub mod page_service;
pub mod pgdatadir_mapping;
pub mod profiling;
pub mod repository;
pub mod storage_sync;
pub mod storage_sync2;
pub use storage_sync2 as storage_sync;
pub mod task_mgr;
pub mod tenant;
pub mod tenant_config;

View File

@@ -315,6 +315,7 @@ impl PageServerHandler {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}

View File

@@ -79,6 +79,13 @@
//! - We rely on read-after write consistency in the remote storage.
//! - Layer files are immutable
//!
//! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
//! storage. Different tenants can be attached to different pageservers, but if the
//! same tenant is attached to two pageservers at the same time, they will overwrite
//! each other's index file updates, and confusion will ensue. There's no interlock or
//! mechanism to detect that in the pageserver, we rely on the control plane to ensure
//! that that doesn't happen.
//!
//! ## Implementation Note
//!
//! The *actual* remote state lags behind the *desired* remote state while
@@ -145,6 +152,10 @@
//!
//! # Downloads (= Tenant Attach)
//!
//! In addition to the upload queue, [`RemoteTimelineClient`] has functions for
//! downloading files from the remote storage. Downloads are performed immediately,
//! independently of the uploads.
//!
//! When we attach a tenant, we perform the following steps:
//! - create `Tenant` object in `TenantState::Attaching` state
//! - List timelines that are present in remote storage, and download their remote [`IndexPart`]s
@@ -174,60 +185,6 @@
//! in remote storage.
//! But note that we don't test any of this right now.
//!
//!
//! # RANDOM NOTES FROM THE PAST (TODO: DELETE / DEDUP WITH CONTENT ABOVE)
//!
//! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage
//! (i.e. using different directories in the local filesystem external storage), but totally up to the storage implementation and not covered with the trait API.
//!
//! * the sync tasks may not processed immediately after the submission: if they error and get re-enqueued, their execution might be backed off to ensure error cap is not exceeded too fast.
//! The sync queue processing also happens in batches, so the sync tasks can wait in the queue for some time.
//!
//! Uploads are queued and executed in the background and in parallel, enforcing the ordering rules.
//! Downloads are performed immediately, and independently of the uploads.
//!
//! Deletion happens only after a successful upload only, otherwise the compaction output might make the timeline inconsistent until both tasks are fully processed without errors.
//! Upload and download update the remote data (inmemory index and S3 json index part file) only after every layer is successfully synchronized, while the deletion task
//! does otherwise: it requires to have the remote data updated first successfully: blob files will be invisible to pageserver this way.
//!
//! FIXME: how is the initial list of remote files created now? Update this paragraph
//! During the loop startup, an initial [`RemoteTimelineIndex`] state is constructed via downloading and merging the index data for all timelines,
//! present locally.
//! It's enough to poll such timelines' remote state once on startup only, due to an agreement that only one pageserver at a time has an exclusive
//! write access to remote portion of timelines that are attached to the pagegserver.
//! The index state is used to issue initial sync tasks, if needed:
//! * all timelines with local state behind the remote gets download tasks scheduled.
//! Such timelines are considered "remote" before the download succeeds, so a number of operations (gc, checkpoints) on that timeline are unavailable
//! before up-to-date layers and metadata file are downloaded locally.
//! * all newer local state gets scheduled for upload, such timelines are "local" and fully operational
//! * remote timelines not present locally are unknown to pageserver, but can be downloaded on a separate request
//!
//! Then, the index is shared across pageserver under [`RemoteIndex`] guard to ensure proper synchronization.
//! The remote index gets updated after very remote storage change (after an upload), same as the index part files remotely.
//!
//! Remote timeline contains a set of layer files, created during checkpoint(s) and the serialized [`IndexPart`] file with timeline metadata and all remote layer paths inside.
//! Those paths are used instead of `S3 list` command to avoid its slowliness and expenciveness for big amount of files.
//! If the index part does not contain some file path but it's present remotely, such file is invisible to pageserver and ignored.
//! Among other tasks, the index is used to prevent invalid uploads and non-existing downloads on demand, refer to [`index`] for more details.
//!
//! FIXME: update this paragraph
//! Index construction is currently the only place where the storage sync can return an [`Err`] to the user.
//! New sync tasks are accepted via [`schedule_layer_upload`], [`schedule_layer_download`] and [`schedule_layer_delete`] functions.
//! After the initial state is loaded into memory and the loop starts, any further [`Err`] results do not stop the loop, but rather
//! reschedule the same task, with possibly less files to sync:
//! * download tasks currently never replace existing local file with metadata file as an exception
//! (but this is a subject to change when checksum checks are implemented: all files could get overwritten on a checksum mismatch)
//! * download tasks carry the information of skipped acrhives, so resubmissions are not downloading successfully processed layers again
//! * downloads do not contain any actual files to download, so that "external", sync pageserver code is able to schedule the timeline download
//! without accessing any extra information about its files.
//!
//! FIXME: update this paragraph
//! Uploads and downloads sync layer files in arbitrary order, but only after all layer files are synched the local metadada (for download) and remote index part (for upload) are updated,
//! to avoid having a corrupt state without the relevant layer files.
//! Refer to [`upload`] and [`download`] for more details.
//!
//! Synchronization never removes any local files from pageserver workdir or remote files from the remote storage, yet there could be overwrites of the same files (index part and metadata file updates, future checksum mismatch fixes).
//! NOTE: No real contents or checksum check happens right now and is a subject to improve later.
mod delete;
mod download;
@@ -999,7 +956,8 @@ impl RemoteTimelineClient {
UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload),
UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete),
UploadOp::Barrier(_) => {
unreachable!("we execute barriers synchronously")
// we do not account these
return;
}
};
REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS

View File

@@ -441,8 +441,6 @@ struct RemoteStartupData {
remote_metadata: TimelineMetadata,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains common part for `load_local_timeline` and `load_remote_timeline`
@@ -1203,10 +1201,12 @@ impl Tenant {
// compaction runs.
let timelines_to_compact = {
let timelines = self.timelines.lock().unwrap();
timelines
let timelines_to_compact = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
.collect::<Vec<_>>()
.collect::<Vec<_>>();
drop(timelines);
timelines_to_compact
};
for (timeline_id, timeline) in &timelines_to_compact {
@@ -1247,42 +1247,87 @@ impl Tenant {
}
/// Removes timeline-related in-memory data
pub fn delete_timeline(&self, timeline_id: TimelineId) -> anyhow::Result<()> {
// in order to be retriable detach needs to be idempotent
// (or at least to a point that each time the detach is called it can make progress)
let mut timelines = self.timelines.lock().unwrap();
pub async fn delete_timeline(&self, timeline_id: TimelineId) -> anyhow::Result<()> {
// Transition the timeline into TimelineState::Stopping.
// This should prevent new operations from starting.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
// Ensure that there are no child timelines **attached to that pageserver**,
// because detach removes files, which will break child branches
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// Ensure that there are no child timelines **attached to that pageserver**,
// because detach removes files, which will break child branches
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
anyhow::ensure!(
!children_exist,
"Cannot delete timeline which has child timelines"
);
let timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(e) => e,
Entry::Vacant(_) => bail!("timeline not found"),
anyhow::ensure!(
!children_exist,
"Cannot delete timeline which has child timelines"
);
let timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(e) => e,
Entry::Vacant(_) => bail!("timeline not found"),
};
let timeline = Arc::clone(timeline_entry.get());
timeline.set_state(TimelineState::Stopping);
drop(timelines);
timeline
};
let timeline = timeline_entry.get();
timeline.set_state(TimelineState::Paused);
info!("waiting for layer_removal_cs.lock()");
// No timeout here, GC & Compaction should be responsive to the `TimelineState::Stopping` change.
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// FIXME: Wait for all tasks, including GC and compaction, that are working on the
// timeline, to finish.
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
// XXX make this atomic so that, if we crash-mid-way, the timeline won't be picked up
// with some layers missing.
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?;
info!("detach removed files");
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
timeline_entry.remove();
drop(layer_removal_guard);
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
Ok(())
}
@@ -1310,10 +1355,10 @@ impl Tenant {
"Could not activate tenant because it is in broken state"
));
}
TenantState::Paused => {
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Paused state, skipping activation");
info!("Tenant is already in Stopping state, skipping activation");
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Active;
@@ -1339,16 +1384,16 @@ impl Tenant {
result
}
/// Change tenant status to paused, to mark that it is being shut down
pub fn set_paused(&self) {
/// Change tenant status to Stopping, to mark that it is being shut down
pub fn set_stopping(&self) {
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Paused;
*current_state = TenantState::Stopping;
// FIXME: If the tenant is still Loading or Attaching, new timelines
// might be created after this. That's harmless, as the Timelines
// won't be accessible to anyone, when the Tenant is in Paused
// won't be accessible to anyone, when the Tenant is in Stopping
// state.
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
@@ -1359,12 +1404,12 @@ impl Tenant {
}
}
TenantState::Broken => {
info!("Cannot set tenant to Paused state, it is already in Broken state");
info!("Cannot set tenant to Stopping state, it is already in Broken state");
}
TenantState::Paused => {
TenantState::Stopping => {
// The tenant was detached, or system shutdown was requested, while we were
// loading or attaching the tenant.
info!("Tenant is already in Paused state");
info!("Tenant is already in Stopping state");
}
}
});
@@ -1385,10 +1430,10 @@ impl Tenant {
// This shouldn't happen either
warn!("Tenant is already broken");
}
TenantState::Paused => {
TenantState::Stopping => {
// This shouldn't happen either
*current_state = TenantState::Broken;
warn!("Marking Paused tenant as Broken");
warn!("Marking Stopping tenant as Broken");
}
TenantState::Loading | TenantState::Attaching => {
*current_state = TenantState::Broken;
@@ -1413,7 +1458,7 @@ impl Tenant {
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken | TenantState::Paused => {
TenantState::Broken | TenantState::Stopping => {
// There's no chance the tenant can transition back into ::Active
anyhow::bail!(
"Tenant {} will not become active. Current state: {:?}",
@@ -2047,9 +2092,10 @@ impl Tenant {
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
})?;
// Flush loop needs to be spawned in order for checkpoint to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
// Flush the new layer files to disk, before we mark the timeline as available to
// the outside world.
//
// Thus spawn flush loop manually and skip flush_loop setup in initialize_with_lock
unfinished_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -2057,7 +2103,7 @@ impl Tenant {
});
unfinished_timeline
.checkpoint(CheckpointConfig::Forced).await
.checkpoint(CheckpointConfig::Flush).await
.with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?;
let timeline = {
@@ -2555,7 +2601,11 @@ pub mod harness {
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenant_conf = TenantConf::dummy_conf();
// Disable automatic GC and compaction to make the unit tests more deterministic.
// The tests perform them manually if needed.
let mut tenant_conf = TenantConf::dummy_conf();
tenant_conf.gc_period = Duration::ZERO;
tenant_conf.compaction_period = Duration::ZERO;
let tenant_id = TenantId::generate();
fs::create_dir_all(conf.tenant_path(&tenant_id))?;

View File

@@ -30,15 +30,14 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::filename::{DeltaFileName, PathOrConf};
use crate::tenant::storage_layer::{
DropNotify, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
@@ -192,8 +191,6 @@ pub struct DeltaLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
drop_watch: Option<DropNotify>,
}
impl Layer for DeltaLayer {
@@ -330,13 +327,10 @@ impl Layer for DeltaLayer {
}
}
fn drop_notify(&self) -> DropNotify {
let mut inner = self.inner.write().unwrap();
inner
.drop_watch
.get_or_insert_with(|| DropNotify::new())
.clone()
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
fn is_incremental(&self) -> bool {
@@ -557,7 +551,6 @@ impl DeltaLayer {
file: None,
index_start_blk: 0,
index_root_blk: 0,
drop_watch: None,
}),
}
}
@@ -585,7 +578,6 @@ impl DeltaLayer {
file: None,
index_start_blk: 0,
index_root_blk: 0,
drop_watch: None,
}),
})
}
@@ -751,7 +743,6 @@ impl DeltaLayerWriterInner {
file: None,
index_start_blk,
index_root_blk,
drop_watch: None,
}),
};

View File

@@ -26,9 +26,7 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::filename::{ImageFileName, PathOrConf};
use crate::tenant::storage_layer::{
DropNotify, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
@@ -36,6 +34,7 @@ use bytes::Bytes;
use hex;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
@@ -118,8 +117,6 @@ pub struct ImageLayerInner {
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
drop_watch: Option<DropNotify>,
}
impl Layer for ImageLayer {
@@ -187,13 +184,10 @@ impl Layer for ImageLayer {
todo!();
}
fn drop_notify(&self) -> DropNotify {
let mut inner = self.inner.write().unwrap();
inner
.drop_watch
.get_or_insert_with(|| DropNotify::new())
.clone()
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
fn is_incremental(&self) -> bool {
@@ -357,7 +351,6 @@ impl ImageLayer {
file: None,
index_start_blk: 0,
index_root_blk: 0,
drop_watch: None,
}),
}
}
@@ -385,7 +378,6 @@ impl ImageLayer {
loaded: false,
index_start_blk: 0,
index_root_blk: 0,
drop_watch: None,
}),
})
}
@@ -540,7 +532,6 @@ impl ImageLayerWriterInner {
file: None,
index_start_blk,
index_root_blk,
drop_watch: None,
}),
};

View File

@@ -10,11 +10,9 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockReader;
use crate::tenant::delta_layer::{DeltaLayer, DeltaLayerWriter};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{
DropNotify, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{ensure, Result};
use anyhow::{bail, ensure, Result};
use std::cell::RefCell;
use std::collections::HashMap;
use tracing::*;
@@ -174,8 +172,8 @@ impl Layer for InMemoryLayer {
/// Nothing to do here. When you drop the last reference to the layer, it will
/// be deallocated.
fn drop_notify(&self) -> DropNotify {
panic!("can't delete an InMemoryLayer")
fn delete(&self) -> Result<()> {
bail!("can't delete an InMemoryLayer")
}
fn is_incremental(&self) -> bool {

View File

@@ -145,31 +145,9 @@ pub trait Layer: Send + Sync {
panic!("Not implemented")
}
fn drop_notify(&self) -> DropNotify;
/// Permanently remove this layer from disk.
fn delete(&self) -> Result<()>;
/// Dump summary of the contents of the layer to stdout
fn dump(&self, verbose: bool) -> Result<()>;
}
#[derive(Clone)]
pub struct DropNotify(std::sync::Arc<tokio::sync::Notify>);
impl DropNotify {
pub fn new() -> Self {
DropNotify(std::sync::Arc::new(tokio::sync::Notify::new()))
}
pub async fn dropped(&self) {
self.0.notified().await
}
pub fn notify_waiters(&self) {
self.0.notify_waiters();
}
}
impl Drop for DropNotify {
fn drop(&mut self) {
self.0.notify_waiters();
}
}

View File

@@ -144,6 +144,12 @@ pub struct Timeline {
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`].
pub(super) layer_removal_cs: tokio::sync::Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -506,7 +512,7 @@ impl Timeline {
pub async fn compact(&self) -> anyhow::Result<()> {
let last_record_lsn = self.get_last_record_lsn();
// Last record Lsn could be zero in case the timelie was just created
// Last record Lsn could be zero in case the timeline was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(());
@@ -546,6 +552,13 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
anyhow::bail!("timeline is Stopping");
}
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
@@ -655,8 +668,8 @@ impl Timeline {
(TimelineState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(TimelineState::Paused, TimelineState::Active) => {
debug!("Not activating a paused timeline");
(TimelineState::Stopping, TimelineState::Active) => {
debug!("Not activating a Stopping timeline");
}
(_, new_state) => {
self.state.send_replace(new_state);
@@ -768,6 +781,7 @@ impl Timeline {
layer_flush_done_tx,
write_lock: Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
@@ -1237,7 +1251,7 @@ impl Timeline {
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return Some(new_state),
TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return Some(new_state),
}
}
Err(_sender_dropped_error) => return None,
@@ -1961,316 +1975,327 @@ impl Timeline {
Ok(layer_paths_to_upload)
}
}
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<dyn Layer>>,
}
impl Timeline {
async fn compact_level0_phase1(
&self,
target_file_size: u64,
) -> anyhow::Result<CompactLevel0Phase1Result> {
let layers = self.layers.read().unwrap();
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Only compact if enough layers have accumulated.
if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() {
return Ok(Default::default());
}
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
// level 0 files that form a contiguous sequence, such that the end
// LSN of previous file matches the start LSN of the next file.
//
// Note that if the files don't form such a sequence, we might
// "compact" just a single file. That's a bit pointless, but it allows
// us to get rid of the level 0 file, and compact the other files on
// the next iteration. This could probably made smarter, but such
// "gaps" in the sequence of level 0 files should only happen in case
// of a crash, partial download from cloud storage, or something like
// that, so it's not a big deal in practice.
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
let mut level0_deltas_iter = level0_deltas.iter();
let first_level0_delta = level0_deltas_iter.next().unwrap();
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
for l in level0_deltas_iter {
let lsn_range = l.get_lsn_range();
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(Arc::clone(l));
prev_lsn_end = lsn_range.end;
}
let lsn_range = Range {
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
};
info!(
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
lsn_range.start,
lsn_range.end,
deltas_to_compact.len(),
level0_deltas.len()
);
for l in deltas_to_compact.iter() {
info!("compact includes {}", l.filename().display());
}
// We don't need the original list of layers anymore. Drop it so that
// we don't accidentally use it later in the function.
drop(level0_deltas);
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = deltas_to_compact
.iter()
.map(|l| l.iter())
.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
true
}
});
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = deltas_to_compact
.iter()
.map(|l| l.key_iter())
.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
});
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
// It's possible that there is a single key with so many page versions that storing all of them in a single layer file
// would be too large. In that case, we also split on the LSN dimension.
//
// LSN
// ^
// |
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
//
//
// If one key (X) has a lot of page versions:
//
// LSN
// ^
// | (X)
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | +--+ |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | +--+ |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
let mut new_layers = Vec::new();
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
for x in all_values_iter {
let (key, lsn, value) = x?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
let mut next_key_size = 0u64;
let is_dup_layer = dup_end_lsn.is_valid();
dup_start_lsn = Lsn::INVALID;
if !same_key {
dup_end_lsn = Lsn::INVALID;
}
// Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
next_key_size = next_size;
if key != next_key {
if dup_end_lsn.is_valid() {
// We are writting segment with duplicates:
// place all remaining values of this key in separate segment
dup_start_lsn = dup_end_lsn; // new segments starts where old stops
dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
}
break;
}
key_values_total_size += next_size;
// Check if it is time to split segment: if total keys size is larger than target file size.
// We need to avoid generation of empty segments if next_size > target_file_size.
if key_values_total_size > target_file_size && lsn != next_lsn {
// Split key between multiple layers: such layer can contain only single key
dup_start_lsn = if dup_end_lsn.is_valid() {
dup_end_lsn // new segment with duplicates starts where old one stops
} else {
lsn // start with the first LSN for this key
};
dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
break;
}
}
// handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)?);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
anyhow::bail!("failpoint delta-layer-writer-fail-before-finish");
});
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
})
}
///
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
///
async fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> {
let mut deltas_to_compact;
let mut new_layers;
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = self.compact_level0_phase1(target_file_size).await?;
{
let mut level0_deltas = {
let layers = self.layers.read().unwrap();
layers.get_level0_deltas()?
};
// Only compact if enough layers have accumulated.
if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() {
return Ok(());
}
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
// level 0 files that form a contiguous sequence, such that the end
// LSN of previous file matches the start LSN of the next file.
//
// Note that if the files don't form such a sequence, we might
// "compact" just a single file. That's a bit pointless, but it allows
// us to get rid of the level 0 file, and compact the other files on
// the next iteration. This could probably made smarter, but such
// "gaps" in the sequence of level 0 files should only happen in case
// of a crash, partial download from cloud storage, or something like
// that, so it's not a big deal in practice.
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
let mut level0_deltas_iter = level0_deltas.iter();
let first_level0_delta = level0_deltas_iter.next().unwrap();
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
deltas_to_compact = vec![Arc::clone(first_level0_delta)];
for l in level0_deltas_iter {
let lsn_range = l.get_lsn_range();
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(Arc::clone(l));
prev_lsn_end = lsn_range.end;
}
let lsn_range = Range {
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
};
info!(
"Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
lsn_range.start,
lsn_range.end,
deltas_to_compact.len(),
level0_deltas.len()
);
for l in deltas_to_compact.iter() {
info!("compact includes {}", l.filename().display());
}
// We don't need the original list of layers anymore. Drop it so that
// we don't accidentally use it later in the function.
drop(level0_deltas);
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = deltas_to_compact
.iter()
.map(|l| l.iter())
.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
if let Ok((b_key, b_lsn, _)) = b {
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
} else {
false
}
} else {
true
}
});
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter =
deltas_to_compact
.iter()
.map(|l| l.key_iter())
.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
let (b_key, b_lsn, _) = b;
match a_key.cmp(b_key) {
Ordering::Less => true,
Ordering::Equal => a_lsn <= b_lsn,
Ordering::Greater => false,
}
});
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
//
// We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
// It's possible that there is a single key with so many page versions that storing all of them in a single layer file
// would be too large. In that case, we also split on the LSN dimension.
//
// LSN
// ^
// |
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | | | |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
//
//
// If one key (X) has a lot of page versions:
//
// LSN
// ^
// | (X)
// | +-----------+ +--+--+--+--+
// | | | | | | | |
// | +-----------+ | | +--+ |
// | | | | | | | |
// | +-----------+ ==> | | | | |
// | | | | | +--+ |
// | +-----------+ | | | | |
// | | | | | | | |
// | +-----------+ +--+--+--+--+
// |
// +--------------> key
// TODO: this actually divides the layers into fixed-size chunks, not
// based on the partitioning.
//
// TODO: we should also opportunistically materialize and
// garbage collect what we can.
new_layers = Vec::new();
let mut prev_key: Option<Key> = None;
let mut writer: Option<DeltaLayerWriter> = None;
let mut key_values_total_size = 0u64;
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
for x in all_values_iter {
let (key, lsn, value) = x?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
let mut next_key_size = 0u64;
let is_dup_layer = dup_end_lsn.is_valid();
dup_start_lsn = Lsn::INVALID;
if !same_key {
dup_end_lsn = Lsn::INVALID;
}
// Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
next_key_size = next_size;
if key != next_key {
if dup_end_lsn.is_valid() {
// We are writting segment with duplicates:
// place all remaining values of this key in separate segment
dup_start_lsn = dup_end_lsn; // new segments starts where old stops
dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
}
break;
}
key_values_total_size += next_size;
// Check if it is time to split segment: if total keys size is larger than target file size.
// We need to avoid generation of empty segments if next_size > target_file_size.
if key_values_total_size > target_file_size && lsn != next_lsn {
// Split key between multiple layers: such layer can contain only single key
dup_start_lsn = if dup_end_lsn.is_valid() {
dup_end_lsn // new segment with duplicates starts where old one stops
} else {
lsn // start with the first LSN for this key
};
dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
break;
}
}
// handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
dup_start_lsn = dup_end_lsn;
dup_end_lsn = lsn_range.end;
}
if writer.is_some() {
let written_size = writer.as_mut().unwrap().size();
// check if key cause layer overflow...
if is_dup_layer
|| dup_end_lsn.is_valid()
|| written_size + key_values_total_size > target_file_size
{
// ... if so, flush previous layer and prepare to write new one
new_layers
.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
writer = None;
}
}
// Remember size of key value because at next iteration we will access next item
key_values_total_size = next_key_size;
}
if writer.is_none() {
// Create writer if not initiaized yet
writer = Some(DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
key,
if dup_end_lsn.is_valid() {
// this is a layer containing slice of values of the same key
debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
dup_start_lsn..dup_end_lsn
} else {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
)?);
}
fail_point!("delta-layer-writer-fail-before-finish", |_| {
anyhow::bail!("failpoint delta-layer-writer-fail-before-finish");
});
writer.as_mut().unwrap().put_value(key, lsn, value)?;
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
info!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let files_to_delete = {
let mut layers = self.layers.write().unwrap();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
let new_delta_path = l.path();
let mut layers = self.layers.write().unwrap();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
let new_delta_path = l.path();
let metadata = new_delta_path.metadata()?;
let metadata = new_delta_path.metadata()?;
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&new_delta_path,
&LayerFileMetadata::new(metadata.len()),
)?;
}
// update the timeline's physical size
self.metrics.current_physical_size_gauge.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
layers.insert_historic(Arc::new(l));
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&new_delta_path,
&LayerFileMetadata::new(metadata.len()),
)?;
}
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones XXX
let mut files_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact.into_iter() {
if let Some(path) = l.local_path() {
files_to_delete.push((l.drop_notify(), path));
}
layers.remove_historic(l);
}
drop(layers);
// update the timeline's physical size
self.metrics.current_physical_size_gauge.add(metadata.len());
files_to_delete
};
// Perform the deletions
for (drop_notify, path) in files_to_delete.iter() {
drop_notify.dropped().await;
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
fs::remove_file(path)?;
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
layers.insert_historic(Arc::new(l));
}
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones
let mut layer_paths_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
if let Some(path) = l.local_path() {
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_to_delete.push(path);
}
l.delete()?;
layers.remove_historic(l);
}
drop(layers);
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
// FIXME: This also uploads new index file. If
// flush_frozen_layer() is doing this at the same time, do
// we have a problem?
let paths_to_delete = files_to_delete
.into_iter()
.map(|(_, path)| path)
.collect::<Vec<PathBuf>>();
remote_client.schedule_layer_file_deletion(&paths_to_delete)?;
remote_client.schedule_layer_file_deletion(&layer_paths_to_delete)?;
}
Ok(())
@@ -2363,28 +2388,42 @@ impl Timeline {
/// obsolete.
///
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
let mut result: GcResult = GcResult::default();
let now = SystemTime::now();
fail_point!("before-timeline-gc");
let horizon_cutoff;
let pitr_cutoff;
let retain_lsns;
{
let _layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
anyhow::bail!("timeline is Stopping");
}
let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
let gc_info = self.gc_info.read().unwrap();
horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
pitr_cutoff = gc_info.pitr_cutoff;
retain_lsns = gc_info.retain_lsns.clone();
}
let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.pitr_cutoff;
let retain_lsns = gc_info.retain_lsns.clone();
(horizon_cutoff, pitr_cutoff, retain_lsns)
};
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
// FIXME
//let _enter =
// info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff)
// .entered();
self.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.instrument(
info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff),
)
.await
}
async fn gc_timeline(
&self,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
new_gc_cutoff: Lsn,
) -> anyhow::Result<GcResult> {
let now = SystemTime::now();
let mut result: GcResult = GcResult::default();
// Nothing to GC. Return early.
let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
@@ -2418,6 +2457,17 @@ impl Timeline {
debug!("retain_lsns: {:?}", retain_lsns);
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
info!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut layers_to_remove = Vec::new();
// Scan all on-disk layers in the timeline.
@@ -2428,126 +2478,114 @@ impl Timeline {
// 3. it doesn't need to be retained for 'retain_lsns';
// 4. newer on-disk image layers cover the layer's whole key range
//
let mut files_to_delete;
{
let mut layers = self.layers.write().unwrap();
'outer: for l in layers.iter_historic_layers() {
// This layer is in the process of being flushed to disk.
// It will be swapped out of the layer map, replaced with
// on-disk layers containing the same data.
// We can't GC it, as it's not on disk. We can't remove it
// from the layer map yet, as it would make its data
// inaccessible.
if l.is_in_memory() {
continue;
}
let mut layers = self.layers.write().unwrap();
'outer: for l in layers.iter_historic_layers() {
// This layer is in the process of being flushed to disk.
// It will be swapped out of the layer map, replaced with
// on-disk layers containing the same data.
// We can't GC it, as it's not on disk. We can't remove it
// from the layer map yet, as it would make its data
// inaccessible.
if l.is_in_memory() {
continue;
}
result.layers_total += 1;
result.layers_total += 1;
// 1. Is it newer than GC horizon cutoff point?
if l.get_lsn_range().end > horizon_cutoff {
debug!(
"keeping {} because it's newer than horizon_cutoff {}",
l.filename().display(),
horizon_cutoff
);
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// 2. It is newer than PiTR cutoff point?
if l.get_lsn_range().end > pitr_cutoff {
debug!(
"keeping {} because it's newer than pitr_cutoff {}",
l.filename().display(),
pitr_cutoff
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
// 3. Is it needed by a child branch?
// NOTE With that we would keep data that
// might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when
// they are no longer needed. This might be complicated with long inheritance chains.
for retain_lsn in retain_lsns.iter() {
// start_lsn is inclusive
if l.get_lsn_range().start <= *retain_lsn {
debug!(
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
l.filename().display(),
retain_lsn,
l.is_incremental(),
);
result.layers_needed_by_branches += 1;
continue 'outer;
}
}
// 4. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
// OK for a delta layer to have end LSN 101, but if the end LSN
// is 102, then it might not have been fully flushed to disk
// before crash.
//
// For example, imagine that the following layers exist:
//
// 1000 - image (A)
// 1000-2000 - delta (B)
// 2000 - image (C)
// 2000-3000 - delta (D)
// 3000 - image (E)
//
// If GC horizon is at 2500, we can remove layers A and B, but
// we cannot remove C, even though it's older than 2500, because
// the delta layer 2000-3000 depends on it.
if !layers.image_layer_exists(
&l.get_key_range(),
&(l.get_lsn_range().end..new_gc_cutoff),
)? {
debug!(
"keeping {} because it is the latest layer",
l.filename().display()
);
result.layers_not_updated += 1;
continue 'outer;
}
// We didn't find any reason to keep this file, so remove it.
// 1. Is it newer than GC horizon cutoff point?
if l.get_lsn_range().end > horizon_cutoff {
debug!(
"garbage collecting {} is_dropped: xx is_incremental: {}",
"keeping {} because it's newer than horizon_cutoff {}",
l.filename().display(),
l.is_incremental(),
horizon_cutoff
);
layers_to_remove.push(Arc::clone(&l));
result.layers_needed_by_cutoff += 1;
continue 'outer;
}
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
files_to_delete = Vec::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove.into_iter() {
if let Some(path) = doomed_layer.local_path() {
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
files_to_delete.push((doomed_layer.drop_notify(), path));
}
layers.remove_historic(doomed_layer);
result.layers_removed += 1;
// 2. It is newer than PiTR cutoff point?
if l.get_lsn_range().end > pitr_cutoff {
debug!(
"keeping {} because it's newer than pitr_cutoff {}",
l.filename().display(),
pitr_cutoff
);
result.layers_needed_by_pitr += 1;
continue 'outer;
}
// 3. Is it needed by a child branch?
// NOTE With that we would keep data that
// might be referenced by child branches forever.
// We can track this in child timeline GC and delete parent layers when
// they are no longer needed. This might be complicated with long inheritance chains.
for retain_lsn in &retain_lsns {
// start_lsn is inclusive
if &l.get_lsn_range().start <= retain_lsn {
debug!(
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
l.filename().display(),
retain_lsn,
l.is_incremental(),
);
result.layers_needed_by_branches += 1;
continue 'outer;
}
}
// 4. Is there a later on-disk layer for this relation?
//
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
// OK for a delta layer to have end LSN 101, but if the end LSN
// is 102, then it might not have been fully flushed to disk
// before crash.
//
// For example, imagine that the following layers exist:
//
// 1000 - image (A)
// 1000-2000 - delta (B)
// 2000 - image (C)
// 2000-3000 - delta (D)
// 3000 - image (E)
//
// If GC horizon is at 2500, we can remove layers A and B, but
// we cannot remove C, even though it's older than 2500, because
// the delta layer 2000-3000 depends on it.
if !layers
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
{
debug!(
"keeping {} because it is the latest layer",
l.filename().display()
);
result.layers_not_updated += 1;
continue 'outer;
}
// We didn't find any reason to keep this file, so remove it.
debug!(
"garbage collecting {} is_dropped: xx is_incremental: {}",
l.filename().display(),
l.is_incremental(),
);
layers_to_remove.push(Arc::clone(&l));
}
// Perform the deletions
for (drop_notify, path) in files_to_delete.iter() {
drop_notify.dropped().await;
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
fs::remove_file(path)?;
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
let mut layer_paths_to_delete = Vec::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
if let Some(path) = doomed_layer.local_path() {
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_to_delete.push(path);
}
doomed_layer.delete()?;
layers.remove_historic(doomed_layer);
result.layers_removed += 1;
}
info!(
@@ -2560,11 +2598,7 @@ impl Timeline {
}
if let Some(remote_client) = &self.remote_client {
let paths_to_delete = files_to_delete
.into_iter()
.map(|(_, path)| path)
.collect::<Vec<PathBuf>>();
remote_client.schedule_layer_file_deletion(&paths_to_delete)?;
remote_client.schedule_layer_file_deletion(&layer_paths_to_delete)?;
}
result.elapsed = now.elapsed()?;

View File

@@ -51,6 +51,7 @@ pub struct TenantConf {
// This parameter determines L1 layer file size.
pub compaction_target_size: u64,
// How often to check if there's compaction work to be done.
// Duration::ZERO means automatic compaction is disabled.
#[serde(with = "humantime_serde")]
pub compaction_period: Duration,
// Level0 delta layer threshold for compaction.
@@ -61,6 +62,7 @@ pub struct TenantConf {
// Page versions older than this are garbage collected away.
pub gc_horizon: u64,
// Interval at which garbage collection is triggered.
// Duration::ZERO means automatic GC is disabled
#[serde(with = "humantime_serde")]
pub gc_period: Duration,
// Delta layer churn threshold to create L1 image layers.

View File

@@ -170,7 +170,7 @@ pub async fn shutdown_all_tenants() {
for (_, tenant) in m.drain() {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_paused();
tenant.set_stopping();
tenants_to_shut_down.push(tenant)
}
}
@@ -290,7 +290,7 @@ pub async fn delete_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> an
info!("timeline task shutdown completed");
match get_tenant(tenant_id, true) {
Ok(tenant) => {
tenant.delete_timeline(timeline_id)?;
tenant.delete_timeline(timeline_id).await?;
}
Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"),
}
@@ -310,7 +310,7 @@ pub async fn detach_tenant(
None => anyhow::bail!("Tenant not found for id {tenant_id}"),
};
tenant.set_paused();
tenant.set_stopping();
// shutdown all tenant and timeline tasks: gc, compaction, page service)
task_mgr::shutdown_tasks(None, Some(tenant_id), None).await;

View File

@@ -7,26 +7,12 @@ use std::time::Duration;
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use crate::tenant_mgr;
use tracing::*;
use utils::id::TenantId;
#[cfg(test)]
pub fn start_background_loops(tenant_id: TenantId) {
// Do not start the background loops.
// Right now, in tests, Tenant is only created by TenantHarness,
// and all tests that use TenantHarness assume that there are
// no background loops that do compaction and GC. If they want it
// to happen, they call the corresponding functions directly.
//
// XXX replace this with a TenantConfigRequest flag that is
// also usable by tests, see https://github.com/neondatabase/neon/issues/2917
}
#[cfg(not(test))]
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
#[cfg(not(test))]
pub fn start_background_loops(tenant_id: TenantId) {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -80,13 +66,17 @@ async fn compaction_loop(tenant_id: TenantId) {
},
};
// Run blocking part of the task
// Run compaction
let mut sleep_duration = tenant.get_compaction_period();
if let Err(e) = tenant.compaction_iteration().await {
sleep_duration = wait_duration;
error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration);
if sleep_duration == Duration::ZERO {
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
// Run compaction
if let Err(e) = tenant.compaction_iteration().await {
sleep_duration = wait_duration;
error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration);
}
}
// Sleep
@@ -127,15 +117,21 @@ async fn gc_loop(tenant_id: TenantId) {
},
};
// Run gc
let gc_period = tenant.get_gc_period();
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
if sleep_duration == Duration::ZERO {
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
// Run gc
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:?}", sleep_duration);
}
}
}

View File

@@ -214,7 +214,7 @@ async fn connection_manager_loop_step(
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
TimelineState::Broken | TimelineState::Paused | TimelineState::Suspended => return ControlFlow::Continue(new_state),
TimelineState::Broken | TimelineState::Stopping | TimelineState::Suspended => return ControlFlow::Continue(new_state),
}
}
Err(_sender_dropped_error) => return ControlFlow::Break(()),

View File

@@ -267,7 +267,7 @@ readahead_buffer_resize(int newsize, void *extra)
nfree = newsize;
PrefetchState *newPState;
Size newprfs_size = offsetof(PrefetchState, prf_buffer) + (
sizeof(PrefetchRequest) * readahead_buffer_size
sizeof(PrefetchRequest) * newsize
);
/* don't try to re-initialize if we haven't initialized yet */

View File

@@ -165,18 +165,27 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
// we need to release the lock file only when the current process is gone
let _ = Box::leak(Box::new(lock_file));
info!("Created PID file with PID {}", Pid::this().to_string());
// Set or read our ID.
set_id(&mut conf, given_id)?;
if init {
return Ok(());
}
info!(
"Starting safekeeper http handler on {}",
conf.listen_http_addr
);
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
})?;
info!("Starting safekeeper on {}", conf.listen_pg_addr);
info!(
"Starting safekeeper pg protocol handler on {}",
conf.listen_pg_addr
);
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
e

View File

@@ -1,7 +1,11 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Generate code to deterministic location to make finding it easier.
tonic_build::configure()
.out_dir("proto/") // put generated code to proto/
.compile(&["proto/broker.proto"], &["proto/"])?;
// Generate rust code from .proto protobuf.
//
// Note: we previously tried to use deterministic location at proto/ for
// easy location, but apparently interference with cachepot sometimes fails
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/broker.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
Ok(())
}

View File

@@ -11,7 +11,7 @@ use proto::{
// Code generated by protobuf.
pub mod proto {
include!("../proto/storage_broker.rs");
tonic::include_proto!("storage_broker");
}
pub mod metrics;

View File

@@ -1580,7 +1580,17 @@ class NeonCli(AbstractNeonCli):
s3_env_vars = self.env.remote_storage.access_env_vars()
extra_env_vars = (extra_env_vars or {}) | s3_env_vars
return self.raw_cli(start_args, extra_env_vars=extra_env_vars)
try:
return self.raw_cli(start_args, extra_env_vars=extra_env_vars)
except Exception:
# A common reason for startup failure is that the port is already in use. We
# coordinate port assignment with PortDistributor, but it's a common mistake
# when writing a new test to use a hardcoded port, or assign the port without
# using the distributor, causing races where two tests runnign concurrently
# sometimes choose the same port. To help debug such cases, get a listing
# of all inuse ports and the processes holding them.
list_inuse_ports()
raise
def pageserver_stop(self, immediate=False) -> "subprocess.CompletedProcess[str]":
cmd = ["pageserver", "stop"]
@@ -1595,7 +1605,11 @@ class NeonCli(AbstractNeonCli):
if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage):
s3_env_vars = self.env.remote_storage.access_env_vars()
return self.raw_cli(["safekeeper", "start", str(id)], extra_env_vars=s3_env_vars)
try:
return self.raw_cli(["safekeeper", "start", str(id)], extra_env_vars=s3_env_vars)
except Exception:
list_inuse_ports() # see comment in pageserver_start
raise
def safekeeper_stop(
self, id: Optional[int] = None, immediate=False
@@ -1761,6 +1775,13 @@ class NeonPageserver(PgProtocol):
".*Removing intermediate uninit mark file.*",
# FIXME: known race condition in TaskHandle: https://github.com/neondatabase/neon/issues/2885
".*sender is dropped while join handle is still alive.*",
# Tenant::delete_timeline() can cause any of the four following errors.
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
]
def start(
@@ -2528,6 +2549,7 @@ class SafekeeperTimelineStatus:
acceptor_epoch: int
pg_version: int
flush_lsn: Lsn
commit_lsn: Lsn
timeline_start_lsn: Lsn
backup_lsn: Lsn
remote_consistent_lsn: Lsn
@@ -2577,6 +2599,7 @@ class SafekeeperHttpClient(requests.Session):
acceptor_epoch=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
flush_lsn=Lsn(resj["flush_lsn"]),
commit_lsn=Lsn(resj["commit_lsn"]),
timeline_start_lsn=Lsn(resj["timeline_start_lsn"]),
backup_lsn=Lsn(resj["backup_lsn"]),
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
@@ -2972,3 +2995,24 @@ def fork_at_current_lsn(
"""
current_lsn = pg.safe_psql("SELECT pg_current_wal_lsn()")[0][0]
return env.neon_cli.create_branch(new_branch_name, ancestor_branch_name, tenant_id, current_lsn)
def list_inuse_ports():
"""
Print "netstat -tnlap" output to the test log. This is useful for debugging
port collisions in tests.
"""
# This won't work on all platforms, because not all platforms have 'netstat',
# and the CLI arguments vary across platforms, too. macOS's netstat doesn't have
# the -p option, for example. So this is just best-effort.
res = subprocess.run(
["netstat", "-tnlap"],
check=False,
universal_newlines=True,
capture_output=True,
)
if res.returncode:
log.info(f"netstat -tnlap failed with return code {res.returncode}")
log.info(f"netstat -tnlap stdout: \n{res.stdout}\n")
log.info(f"netstat -tnlap stderr: \n{res.stderr}\n")

View File

@@ -12,12 +12,12 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
n_iters = 10
n_records = 100000
# We want to have a lot of lot of layer files to exercise the layer map. Make
# gc_horizon and checkpoint_distance very small, so that we get a lot of small layer files.
# We want to have a lot of lot of layer files to exercise the layer map. Disable
# GC, and make checkpoint_distance very small, so that we get a lot of small layer
# files.
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "100 m",
"gc_horizon": "1048576",
"gc_period": "0s",
"checkpoint_distance": "8192",
"compaction_period": "1 s",
"compaction_threshold": "1",

View File

@@ -0,0 +1,111 @@
from dataclasses import dataclass
from typing import Dict, Tuple
import pytest
from fixtures.compare_fixtures import RemoteCompare
from fixtures.log_helper import log
@dataclass
class LabelledQuery:
"""An SQL query with a label for the test report."""
label: str
query: str
# A list of queries to run.
# Please do not alter the label for the query, as it is used to identify it.
# Labels for ClickBench queries match the labels in ClickBench reports
# on https://benchmark.clickhouse.com/ (the DB size may differ).
QUERIES: Tuple[LabelledQuery, ...] = (
# Disable `black` formatting for the list of queries so that it's easier to read
# fmt: off
### ClickBench queries:
LabelledQuery("Q0", r"SELECT COUNT(*) FROM hits;"),
LabelledQuery("Q1", r"SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0;"),
LabelledQuery("Q2", r"SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits;"),
LabelledQuery("Q3", r"SELECT AVG(UserID) FROM hits;"),
LabelledQuery("Q4", r"SELECT COUNT(DISTINCT UserID) FROM hits;"),
LabelledQuery("Q5", r"SELECT COUNT(DISTINCT SearchPhrase) FROM hits;"),
LabelledQuery("Q6", r"SELECT MIN(EventDate), MAX(EventDate) FROM hits;"),
LabelledQuery("Q7", r"SELECT AdvEngineID, COUNT(*) FROM hits WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC;"),
LabelledQuery("Q8", r"SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10;"),
LabelledQuery("Q9", r"SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM hits GROUP BY RegionID ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q10", r"SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;"),
LabelledQuery("Q11", r"SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM hits WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;"),
LabelledQuery("Q12", r"SELECT SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q13", r"SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM hits WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;"),
LabelledQuery("Q14", r"SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q15", r"SELECT UserID, COUNT(*) FROM hits GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10;"),
LabelledQuery("Q16", r"SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;"),
LabelledQuery("Q17", r"SELECT UserID, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, SearchPhrase LIMIT 10;"),
LabelledQuery("Q18", r"SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM hits GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;"),
LabelledQuery("Q19", r"SELECT UserID FROM hits WHERE UserID = 435090932899640449;"),
LabelledQuery("Q20", r"SELECT COUNT(*) FROM hits WHERE URL LIKE '%google%';"),
LabelledQuery("Q21", r"SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM hits WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q22", r"SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM hits WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q23", r"SELECT * FROM hits WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10;"),
LabelledQuery("Q24", r"SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10;"),
LabelledQuery("Q25", r"SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10;"),
LabelledQuery("Q26", r"SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10;"),
LabelledQuery("Q27", r"SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM hits WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;"),
LabelledQuery("Q28", r"SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;"),
LabelledQuery("Q29", r"SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM hits;"),
LabelledQuery("Q30", r"SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q31", r"SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q32", r"SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q33", r"SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q34", r"SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q35", r"SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10;"),
LabelledQuery("Q36", r"SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;"),
LabelledQuery("Q37", r"SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10;"),
LabelledQuery("Q38", r"SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;"),
LabelledQuery("Q39", r"SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;"),
LabelledQuery("Q40", r"SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100;"),
LabelledQuery("Q41", r"SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;"),
LabelledQuery("Q42", r"SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000;"),
### Custom Neon queries:
# I suggest using the NQ prefix (which stands for Neon Query) instead of Q
# to not intersect with the original ClickBench queries if their list is extended.
#
# LabelledQuery("NQ0", r"..."),
# LabelledQuery("NQ1", r"..."),
# ...
# fmt: on
)
def run_psql(env: RemoteCompare, labelled_query: LabelledQuery, times: int) -> None:
# prepare connstr:
# - cut out password from connstr to pass it via env
# - add options to connstr
password = env.pg.default_options.get("password", None)
options = f"-cstatement_timeout=0 {env.pg.default_options.get('options', '')}"
connstr = env.pg.connstr(password=None, options=options)
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
label, query = labelled_query.label, labelled_query.query
log.info(f"Running query {label} {times} times")
for i in range(times):
run = i + 1
log.info(f"Run {run}/{times}")
with env.zenbenchmark.record_duration(f"{label}/{run}"):
env.pg_bin.run_capture(["psql", connstr, "-c", query], env=environ)
@pytest.mark.parametrize("query", QUERIES)
@pytest.mark.remote_cluster
def test_clickbench(query: LabelledQuery, remote_compare: RemoteCompare):
"""
An OLAP-style ClickHouse benchmark
Based on https://github.com/ClickHouse/ClickBench/tree/c00135ca5b6a0d86fedcdbf998fdaa8ed85c1c3b/aurora-postgresql
The DB prepared manually in advance
"""
run_psql(remote_compare, query, times=3)

View File

@@ -11,16 +11,12 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults, 1M gc_horizon and 4M checkpoint_distance.
# Extend compaction_period and gc_period to disable background compaction and gc.
# Override defaults: 4M checkpoint_distance, disable background compaction and gc.
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "10 m",
"gc_horizon": "1048576",
"checkpoint_distance": "4194304",
"compaction_period": "10 m",
"compaction_threshold": "2",
"compaction_target_size": "4194304",
"gc_period": "0s",
"compaction_period": "0s",
}
)

View File

@@ -52,8 +52,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
tenant, _ = env.neon_cli.create_tenant(
conf={
# disable background GC
"gc_period": "10 m",
"gc_horizon": f"{10 * 1024 ** 3}",
"gc_period": "0s",
# small checkpoint distance to create more delta layer files
"checkpoint_distance": f"{1024 ** 2}",
# set the target size to be large to allow the image layer to cover the whole key space
@@ -127,8 +126,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
tenant, _ = env.neon_cli.create_tenant(
conf={
# disable background GC
"gc_period": "10 m",
"gc_horizon": f"{10 * 1024 ** 3}",
"gc_period": "0s",
# small checkpoint distance to create more delta layer files
"checkpoint_distance": f"{1024 ** 2}",
# set the target size to be large to allow the image layer to cover the whole key space

View File

@@ -4,6 +4,7 @@
import os
import re
import shutil
import threading
import time
from pathlib import Path
@@ -11,6 +12,7 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverApiException,
RemoteStorageKind,
assert_no_in_progress_downloads_for_tenant,
available_remote_storages,
@@ -108,9 +110,8 @@ def test_remote_storage_backup_and_restore(
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
log.info(f"waiting for checkpoint {checkpoint_number} upload")
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info(f"upload of checkpoint {checkpoint_number} is done")
@@ -199,15 +200,17 @@ def test_remote_storage_upload_queue_retries(
# compaction and gc
tenant_id, timeline_id = env.neon_cli.create_tenant(
conf={
# small checkpointing and compaction targets to ensure we generate many operations
"checkpoint_distance": f"{32 * 1024}",
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{32 * 1024}",
# large horizon to avoid automatic GC (our assert on gc_result below relies on that)
"gc_horizon": f"{1024 ** 4}",
"gc_period": "1h",
# disable PITR so that GC considers just gc_horizon
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# don't create image layers, that causes just noise
"image_creation_threshold": "10000",
}
)
@@ -271,27 +274,47 @@ def test_remote_storage_upload_queue_retries(
# let all future operations queue up
configure_storage_sync_failpoints("return")
# create more churn to generate all upload ops
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
client.timeline_checkpoint(tenant_id, timeline_id)
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
client.timeline_compact(tenant_id, timeline_id)
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
# Create more churn to generate all upload ops.
# The checkpoint / compact / gc ops will block because they call remote_client.wait_completion().
# So, run this in a differen thread.
churn_thread_result = [False]
# ensure that all operation types that can be in the upload queue have queued up
assert get_queued_count(file_kind="layer", op_kind="upload") > 0
assert get_queued_count(file_kind="index", op_kind="upload") >= 2
assert get_queued_count(file_kind="layer", op_kind="delete") > 0
def churn_while_failpoints_active(result):
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
result[0] = True
# unblock all operations and wait for them to finish
churn_while_failpoints_active_thread = threading.Thread(
target=churn_while_failpoints_active, args=[churn_thread_result]
)
churn_while_failpoints_active_thread.start()
# wait for churn thread's data to get stuck in the upload queue
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0)
# unblock churn operations
configure_storage_sync_failpoints("off")
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
churn_while_failpoints_active_thread.join(30)
assert not churn_while_failpoints_active_thread.is_alive()
assert churn_thread_result[0]
# try a restore to verify that the uploads worked
# XXX: should vary this test to selectively fail just layer uploads, index uploads, deletions
# but how do we validate the result after restore?
@@ -350,9 +373,20 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
"pitr_interval": "0s",
}
)
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
client = env.pageserver.http_client()
def get_queued_count(file_kind, op_kind):
metrics = client.get_metrics()
matches = re.search(
f'^pageserver_remote_upload_queue_unfinished_tasks{{file_kind="{file_kind}",op_kind="{op_kind}",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$',
metrics,
re.MULTILINE,
)
assert matches
return int(matches[1])
pg = env.postgres.create_start("main", tenant_id=tenant_id)
client.configure_failpoints(("before-upload-layer", "return"))
@@ -364,26 +398,40 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
]
)
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
client.timeline_checkpoint(tenant_id, timeline_id)
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
assert timeline_path.exists()
assert len(list(timeline_path.glob("*"))) >= 8
# Kick off a checkpoint operation.
# It will get stuck in remote_client.wait_completion(), since the select query will have
# generated layer upload ops already.
checkpoint_allowed_to_fail = threading.Event()
def get_queued_count(file_kind, op_kind):
metrics = client.get_metrics()
matches = re.search(
f'^pageserver_remote_upload_queue_unfinished_tasks{{file_kind="{file_kind}",op_kind="{op_kind}",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}} (\\S+)$',
metrics,
re.MULTILINE,
)
assert matches
return int(matches[1])
def checkpoint_thread_fn():
try:
client.timeline_checkpoint(tenant_id, timeline_id)
except PageserverApiException:
assert (
checkpoint_allowed_to_fail.is_set()
), "checkpoint op should only fail in response to timeline deletion"
assert get_queued_count(file_kind="index", op_kind="upload") > 0
checkpoint_thread = threading.Thread(target=checkpoint_thread_fn)
checkpoint_thread.start()
# timeline delete should work despite layer files stuck in upload
# Wait for stuck uploads. NB: if there were earlier layer flushes initiated during `INSERT INTO`,
# this will be their uploads. If there were none, it's the timeline_checkpoint()'s uploads.
def assert_compacted_and_uploads_queued():
assert timeline_path.exists()
assert len(list(timeline_path.glob("*"))) >= 8
assert get_queued_count(file_kind="index", op_kind="upload") > 0
wait_until(20, 0.1, assert_compacted_and_uploads_queued)
# Regardless, give checkpoint some time to block for good.
# Not strictly necessary, but might help uncover failure modes in the future.
time.sleep(2)
# Now delete the timeline. It should take priority over ongoing
# checkpoint operations. Hence, checkpoint is allowed to fail now.
log.info("sending delete request")
checkpoint_allowed_to_fail.set()
client.timeline_delete(tenant_id, timeline_id)
assert not timeline_path.exists()
@@ -391,6 +439,10 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
# timeline deletion should kill ongoing uploads
assert get_queued_count(file_kind="index", op_kind="upload") == 0
# timeline deletion should be unblocking checkpoint ops
checkpoint_thread.join(2.0)
assert not checkpoint_thread.is_alive()
# Just to be sure, unblock ongoing uploads. If the previous assert was incorrect, or the prometheus metric broken,
# this would likely generate some ERROR level log entries that the NeonEnvBuilder would detect
client.configure_failpoints(("before-upload-layer", "off"))

View File

@@ -1,13 +1,7 @@
import time
from typing import List, Tuple
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverApiException,
wait_for_last_flush_lsn,
)
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.types import Lsn
@@ -44,8 +38,8 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):
Operate on single branch reading the tenants size after each transaction.
"""
# gc and compaction is not wanted automatically
# the pitr_interval here is quite problematic, so we cannot really use it.
# Disable automatic gc and compaction.
# The pitr_interval here is quite problematic, so we cannot really use it.
# it'd have to be calibrated per test executing env.
# there was a bug which was hidden if the create table and first batch of
@@ -53,7 +47,7 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x30000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='1h', gc_period='1h', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()
@@ -162,7 +156,7 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder
gc_horizon = 128 * 1024
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='1h', gc_period='1h', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()
@@ -256,22 +250,7 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder
assert size_after == size_after_thinning_branch
# teardown, delete branches, and the size should be going down
deleted = False
for _ in range(10):
try:
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
deleted = True
break
except PageserverApiException as e:
# compaction is ok but just retry if this fails; related to #2442
if "cannot lock compaction critical section" in str(e):
# also ignore it in the log
env.pageserver.allowed_errors.append(".*cannot lock compaction critical section.*")
time.sleep(1)
continue
raise
assert deleted
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
size_after_deleting_first = http_client.tenant_size(tenant_id)
assert size_after_deleting_first < size_after_thinning_branch

View File

@@ -11,13 +11,6 @@ def get_only_element(l): # noqa: E741
# Test that gc and compaction tenant tasks start and stop correctly
def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
# The gc and compaction loops don't bother to watch for tenant state
# changes while sleeping, so we use small periods to make this test
# run faster. With default settings we'd have to wait longer for tasks
# to notice state changes and shut down.
# TODO fix this behavior in the pageserver
tenant_config = "{gc_period = '1 s', compaction_period = '1 s'}"
neon_env_builder.pageserver_config_override = f"tenant_config={tenant_config}"
name = "test_tenant_tasks"
env = neon_env_builder.init_start()
client = env.pageserver.http_client()

View File

@@ -161,6 +161,17 @@ def test_tenants_attached_after_download(
##### Stop the pageserver, erase its layer file to force it being downloaded from S3
env.postgres.stop_all()
sk_commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
for sk in env.safekeepers
]
log.info("wait for pageserver to process all the WAL")
wait_for_last_record_lsn(client, tenant_id, timeline_id, max(sk_commit_lsns))
log.info("wait for it to reach remote storage")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, max(sk_commit_lsns))
log.info("latest safekeeper_commit_lsn reached remote storage")
detail_before = client.timeline_detail(
tenant_id, timeline_id, include_non_incremental_physical_size=True
)

View File

@@ -295,7 +295,7 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}"
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='0s', gc_period='0s', pitr_interval='1s'}"
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -16,8 +16,9 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
# by image layer generation. So adjust default parameters to make it happen more frequently.
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "100 m",
"gc_horizon": "1048576",
# disable automatic GC
"gc_period": "0s",
# Compact and create images aggressively
"checkpoint_distance": "1000000",
"compaction_period": "1 s",
"compaction_threshold": "3",