Compare commits

..

8 Commits

Author SHA1 Message Date
Arseny Sher
cfd78950e2 basic sk bench of pgbench init with perf fixtures 2024-01-30 14:24:27 +03:00
Arpad Müller
734755eaca Enable nextest retries for the arm build (#6496)
Also make the NEXTEST_RETRIES declaration more local.

Requested in https://github.com/neondatabase/neon/pull/6493#issuecomment-1912110202
2024-01-27 05:16:11 +01:00
Christian Schwarz
e34166a28f CI: switch back to std-fs io engine for soak time before next release (#6492)
PR #5824 introduced the concept of io engines in pageserver and
implemented `tokio-epoll-uring` in addition to our current method,
`std-fs`.

We used `tokio-epoll-uring` in CI for a day to get more exposure to
the code.  Now it's time to switch CI back so that we test with `std-fs`
as well, because that's what we're (still) using in production.
2024-01-26 22:48:34 +01:00
Christian Schwarz
3a36a0a227 fix(test suite): some tests leak child processes (#6497) 2024-01-26 18:23:53 +00:00
John Spray
58f6cb649e control_plane: database persistence for attachment_service (#6468)
## Problem

Spun off from https://github.com/neondatabase/neon/pull/6394 -- this PR
is just the persistence parts and the changes that enable it to work
nicely


## Summary of changes

- Revert #6444 and #6450
- In neon_local, start a vanilla postgres instance for the attachment
service to use.
- Adopt `diesel` crate for database access in attachment service. This
uses raw SQL migrations as the source of truth for the schema, so it's a
soft dependency: we can switch libraries pretty easily.
- Rewrite persistence.rs to use postgres (via diesel) instead of JSON.
- Preserve JSON read+write at startup and shutdown: this enables using
the JSON format in compatibility tests, so that we don't have to commit
to our DB schema yet.
- In neon_local, run database creation + migrations before starting
attachment service
- Run the initial reconciliation in Service::spawn in the background, so
that the pageserver + attachment service don't get stuck waiting for
each other to start, when restarting both together in a test.
2024-01-26 17:20:44 +00:00
Arpad Müller
dcc7610ad6 Do backoff::retry in s3 timetravel test (#6493)
The top level retries weren't enough, probably because we do so many
network requests. Fine grained retries ensure that there is higher
potential for the entire test to succeed.

To demonstrate this, consider the following example: let's assume that
each request has 5% chance of failing and we do 10 requests. Then
chances of success without any retries is 0.95^10 = 0.6. With 3 top
level retries it is 1-0.4^3 = 0.936. With 3 fine grained retries it is
(1-0.05^3)^10 = 0.9988 (roundings implicit). So chances of failure are
6.4% for the top level retry vs 0.12% for the fine grained retry.

Follow-up of #6155
2024-01-26 16:43:56 +00:00
Alexander Bayandin
4c245b0f5a update_build_tools_image.yml: Push build-tools image to Docker Hub (#6481)
## Problem

- `docker.io/neondatabase/build-tools:pinned` image is frequently
outdated on Docker Hub because there's no automated way to update it.
- `update_build_tools_image.yml` workflow contains legacy roll-back
logic, which is not required anymore because it updates only a single
image.

## Summary of changes
- Make `update_build_tools_image.yml` workflow push images to both ECR
and Docker Hub
- Remove unneeded roll-back logic
2024-01-26 16:12:49 +00:00
John Spray
55b7cde665 tests: add basic coverage for sharding (#6380)
## Problem

The support for sharding in the pageserver was written before
https://github.com/neondatabase/neon/pull/6205 landed, so when it landed
we couldn't directly test sharding.

## Summary of changes

- Add `test_sharding_smoke` which tests the basics of creating a
sharding tenant, creating a timeline within it, checking that data
within it is distributed.
- Add modes to pg_regress tests for running with 4 shards as well as
with 1.
2024-01-26 14:40:47 +00:00
68 changed files with 2129 additions and 1129 deletions

View File

@@ -69,7 +69,15 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --dockerfile ${{ inputs.dockerfile-path }} --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-amd64
run: |
/kaniko/executor \
--reproducible \
--snapshotMode=redo \
--skip-unused-stages \
--dockerfile ${{ inputs.dockerfile-path }} \
--cache=true \
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache \
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-amd64
kaniko-arm:
if: needs.check-if-build-tools-dockerfile-changed.outputs.docker_file_changed == 'true'
@@ -85,7 +93,15 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build
run: /kaniko/executor --reproducible --snapshotMode=redo --skip-unused-stages --dockerfile ${{ inputs.dockerfile-path }} --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-arm64
run: |
/kaniko/executor \
--reproducible \
--snapshotMode=redo \
--skip-unused-stages \
--dockerfile ${{ inputs.dockerfile-path }} \
--cache=true \
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache \
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-arm64
manifest:
if: needs.check-if-build-tools-dockerfile-changed.outputs.docker_file_changed == 'true'
@@ -99,7 +115,10 @@ jobs:
steps:
- name: Create manifest
run: docker manifest create 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }} --amend 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-amd64 --amend 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-arm64
run: |
docker manifest create 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }} \
--amend 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-amd64 \
--amend 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}-arm64
- name: Push manifest
run: docker manifest push 369495373322.dkr.ecr.eu-central-1.amazonaws.com/${{ inputs.image-name }}:${{ needs.tag.outputs.build-tools-tag }}

View File

@@ -21,7 +21,6 @@ env:
COPT: '-Werror'
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
NEXTEST_RETRIES: 3
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
@@ -361,6 +360,8 @@ jobs:
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
- name: Run rust tests
env:
NEXTEST_RETRIES: 3
run: |
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
@@ -471,7 +472,7 @@ jobs:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs
- name: Merge and upload coverage data
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'

View File

@@ -124,12 +124,12 @@ jobs:
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
CARGO_FEATURES: --features testing
CARGO_FLAGS: --locked --release
CARGO_FLAGS: --release
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
@@ -210,18 +210,20 @@ jobs:
- name: Run cargo build
run: |
mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
mold -run cargo build --locked $CARGO_FLAGS $CARGO_FEATURES --bins --tests
- name: Run cargo test
env:
NEXTEST_RETRIES: 3
run: |
cargo test $CARGO_FLAGS $CARGO_FEATURES
cargo nextest run $CARGO_FEATURES
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3
cargo nextest run --package remote_storage --test test_real_s3
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
@@ -231,7 +233,7 @@ jobs:
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
cargo test $CARGO_FLAGS --package remote_storage --test test_real_azure
cargo nextest run --package remote_storage --test test_real_azure
check-codestyle-rust-arm:
timeout-minutes: 90

View File

@@ -20,111 +20,51 @@ defaults:
run:
shell: bash -euo pipefail {0}
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
permissions: {}
jobs:
tag-image:
runs-on: [ self-hosted, gen3, small ]
container: golang:1.19-bullseye
env:
IMAGE: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools
FROM_TAG: ${{ inputs.from-tag }}
TO_TAG: ${{ inputs.to-tag }}
outputs:
next-digest-buildtools: ${{ steps.next-digest.outputs.next-digest-buildtools }}
prev-digest-buildtools: ${{ steps.prev-digest.outputs.prev-digest-buildtools }}
steps:
- name: Install Crane & ECR helper
run: |
go install github.com/google/go-containerregistry/cmd/crane@a54d64203cffcbf94146e04069aae4a97f228ee2 # v0.16.1
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@adf1bafd791ae7d4ff098108b1e91f36a4da5404 # v0.7.1
- name: Configure ECR login
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
- name: Get source image digest
id: next-digest
run: |
NEXT_DIGEST=$(crane digest ${IMAGE}:${FROM_TAG} || true)
if [ -z "${NEXT_DIGEST}" ]; then
echo >&2 "Image ${IMAGE}:${FROM_TAG} does not exist"
exit 1
fi
echo "Current ${IMAGE}@${FROM_TAG} image is ${IMAGE}@${NEXT_DIGEST}"
echo "next-digest-buildtools=$NEXT_DIGEST" >> $GITHUB_OUTPUT
- name: Get destination image digest (if already exists)
id: prev-digest
run: |
PREV_DIGEST=$(crane digest ${IMAGE}:${TO_TAG} || true)
if [ -z "${PREV_DIGEST}" ]; then
echo >&2 "Image ${IMAGE}:${TO_TAG} does not exist (it's ok)"
else
echo >&2 "Current ${IMAGE}@${TO_TAG} image is ${IMAGE}@${PREV_DIGEST}"
echo "prev-digest-buildtools=$PREV_DIGEST" >> $GITHUB_OUTPUT
fi
- name: Tag image
run: |
crane tag "${IMAGE}:${FROM_TAG}" "${TO_TAG}"
rollback-tag-image:
needs: tag-image
if: ${{ !success() }}
runs-on: [ self-hosted, gen3, small ]
container: golang:1.19-bullseye
env:
IMAGE: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools
ECR_IMAGE: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools
DOCKER_HUB_IMAGE: docker.io/neondatabase/build-tools
FROM_TAG: ${{ inputs.from-tag }}
TO_TAG: ${{ inputs.to-tag }}
steps:
- name: Install Crane & ECR helper
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
# The default value is ~/.docker
- name: Set custom docker config directory
run: |
go install github.com/google/go-containerregistry/cmd/crane@a54d64203cffcbf94146e04069aae4a97f228ee2 # v0.16.1
go install github.com/awslabs/amazon-ecr-credential-helper/ecr-login/cli/docker-credential-ecr-login@adf1bafd791ae7d4ff098108b1e91f36a4da5404 # v0.7.1
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- name: Configure ECR login
- uses: docker/login-action@v2
with:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
- uses: docker/login-action@v2
with:
registry: 369495373322.dkr.ecr.eu-central-1.amazonaws.com
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- uses: actions/setup-go@v5
with:
go-version: '1.21'
- name: Install crane
run: |
mkdir /github/home/.docker/
echo "{\"credsStore\":\"ecr-login\"}" > /github/home/.docker/config.json
go install github.com/google/go-containerregistry/cmd/crane@a0658aa1d0cc7a7f1bcc4a3af9155335b6943f40 # v0.18.0
- name: Restore previous tag if needed
- name: Copy images
run: |
NEXT_DIGEST="${{ needs.tag-image.outputs.next-digest-buildtools }}"
PREV_DIGEST="${{ needs.tag-image.outputs.prev-digest-buildtools }}"
crane copy "${ECR_IMAGE}:${FROM_TAG}" "${ECR_IMAGE}:${TO_TAG}"
crane copy "${ECR_IMAGE}:${FROM_TAG}" "${DOCKER_HUB_IMAGE}:${TO_TAG}"
if [ -z "${NEXT_DIGEST}" ]; then
echo >&2 "Image ${IMAGE}:${FROM_TAG} does not exist, nothing to rollback"
exit 0
fi
if [ -z "${PREV_DIGEST}" ]; then
# I guess we should delete the tag here/untag the image, but crane does not support it
# - https://github.com/google/go-containerregistry/issues/999
echo >&2 "Image ${IMAGE}:${TO_TAG} did not exist, but it was created by the job, no need to rollback"
exit 0
fi
CURRENT_DIGEST=$(crane digest "${IMAGE}:${TO_TAG}")
if [ "${CURRENT_DIGEST}" == "${NEXT_DIGEST}" ]; then
crane tag "${IMAGE}@${PREV_DIGEST}" "${TO_TAG}"
echo >&2 "Successfully restored ${TO_TAG} tag from ${IMAGE}@${CURRENT_DIGEST} to ${IMAGE}@${PREV_DIGEST}"
else
echo >&2 "Image ${IMAGE}:${TO_TAG}@${CURRENT_DIGEST} is not required to be restored"
fi
- name: Remove custom docker config directory
if: always()
run: |
rm -rf .docker-custom

91
Cargo.lock generated
View File

@@ -278,6 +278,7 @@ dependencies = [
"camino",
"clap",
"control_plane",
"diesel",
"futures",
"git-version",
"hyper",
@@ -286,7 +287,6 @@ dependencies = [
"pageserver_client",
"postgres_backend",
"postgres_connection",
"scopeguard",
"serde",
"serde_json",
"thiserror",
@@ -1328,6 +1328,8 @@ dependencies = [
"clap",
"comfy-table",
"compute_api",
"diesel",
"diesel_migrations",
"futures",
"git-version",
"hex",
@@ -1638,6 +1640,52 @@ dependencies = [
"rusticata-macros",
]
[[package]]
name = "diesel"
version = "2.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62c6fcf842f17f8c78ecf7c81d75c5ce84436b41ee07e03f490fbb5f5a8731d8"
dependencies = [
"bitflags 2.4.1",
"byteorder",
"diesel_derives",
"itoa",
"pq-sys",
"serde_json",
]
[[package]]
name = "diesel_derives"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef8337737574f55a468005a83499da720f20c65586241ffea339db9ecdfd2b44"
dependencies = [
"diesel_table_macro_syntax",
"proc-macro2",
"quote",
"syn 2.0.32",
]
[[package]]
name = "diesel_migrations"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6036b3f0120c5961381b570ee20a02432d7e2d27ea60de9578799cf9156914ac"
dependencies = [
"diesel",
"migrations_internals",
"migrations_macros",
]
[[package]]
name = "diesel_table_macro_syntax"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5"
dependencies = [
"syn 2.0.32",
]
[[package]]
name = "digest"
version = "0.10.7"
@@ -2716,15 +2764,6 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lru"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -2796,6 +2835,27 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "migrations_internals"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f23f71580015254b020e856feac3df5878c2c7a8812297edd6c0a485ac9dada"
dependencies = [
"serde",
"toml",
]
[[package]]
name = "migrations_macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cce3325ac70e67bbab5bd837a31cae01f1a6db64e0e744a33cb03a543469ef08"
dependencies = [
"migrations_internals",
"proc-macro2",
"quote",
]
[[package]]
name = "mime"
version = "0.3.17"
@@ -3346,7 +3406,6 @@ dependencies = [
"humantime-serde",
"hyper",
"itertools",
"lru",
"md5",
"metrics",
"nix 0.27.1",
@@ -3805,6 +3864,15 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pq-sys"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c0052426df997c0cbd30789eb44ca097e3541717a7b8fa36b1c464ee7edebd"
dependencies = [
"vcpkg",
]
[[package]]
name = "pq_proto"
version = "0.1.0"
@@ -6633,6 +6701,7 @@ dependencies = [
"clap",
"clap_builder",
"crossbeam-utils",
"diesel",
"either",
"fail",
"futures-channel",

View File

@@ -10,6 +10,8 @@ async-trait.workspace = true
camino.workspace = true
clap.workspace = true
comfy-table.workspace = true
diesel = { version = "2.1.4", features = ["postgres"]}
diesel_migrations = { version = "2.1.0", features = ["postgres"]}
futures.workspace = true
git-version.workspace = true
nix.workspace = true

View File

@@ -14,7 +14,6 @@ hyper.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
@@ -26,6 +25,8 @@ tracing.workspace = true
# a parsing function when loading pageservers from neon_local LocalEnv
postgres_backend.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
utils = { path = "../../libs/utils/" }
metrics = { path = "../../libs/metrics/" }
control_plane = { path = ".." }

View File

@@ -0,0 +1,6 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass);
DROP FUNCTION IF EXISTS diesel_set_updated_at();

View File

@@ -0,0 +1,36 @@
-- This file was automatically created by Diesel to setup helper functions
-- and other internal bookkeeping. This file is safe to edit, any future
-- changes will be added to existing projects as new migrations.
-- Sets up a trigger for the given table to automatically set a column called
-- `updated_at` whenever the row is modified (unless `updated_at` was included
-- in the modified columns)
--
-- # Example
--
-- ```sql
-- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW());
--
-- SELECT diesel_manage_updated_at('users');
-- ```
CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$
BEGIN
EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s
FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$
BEGIN
IF (
NEW IS DISTINCT FROM OLD AND
NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at
) THEN
NEW.updated_at := current_timestamp;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@@ -0,0 +1 @@
DROP TABLE tenant_shards;

View File

@@ -0,0 +1,12 @@
CREATE TABLE tenant_shards (
tenant_id VARCHAR NOT NULL,
shard_number INTEGER NOT NULL,
shard_count INTEGER NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
shard_stripe_size INTEGER NOT NULL,
generation INTEGER NOT NULL,
generation_pageserver BIGINT NOT NULL,
placement_policy VARCHAR NOT NULL,
-- config is JSON encoded, opaque to the database.
config TEXT NOT NULL
);

View File

@@ -0,0 +1 @@
DROP TABLE nodes;

View File

@@ -0,0 +1,10 @@
CREATE TABLE nodes (
node_id BIGINT PRIMARY KEY NOT NULL,
scheduling_policy VARCHAR NOT NULL,
listen_http_addr VARCHAR NOT NULL,
listen_http_port INTEGER NOT NULL,
listen_pg_addr VARCHAR NOT NULL,
listen_pg_port INTEGER NOT NULL
);

View File

@@ -1,5 +1,5 @@
use crate::reconciler::ReconcileError;
use crate::service::Service;
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest};
@@ -104,34 +104,34 @@ async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiErr
json_response(StatusCode::OK, state.service.inspect(inspect_req))
}
async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
async fn handle_tenant_create(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state.service.tenant_create(create_req).await?,
)
json_response(StatusCode::OK, service.tenant_create(create_req).await?)
}
async fn handle_tenant_timeline_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
async fn handle_tenant_timeline_create(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
service
.tenant_timeline_create(tenant_id, create_req)
.await?,
)
}
async fn handle_tenant_locate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
async fn handle_tenant_locate(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.tenant_locate(tenant_id)?)
json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
}
async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -154,14 +154,15 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
}
async fn handle_tenant_shard_migrate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
async fn handle_tenant_shard_migrate(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
service
.tenant_shard_migrate(tenant_shard_id, migrate_req)
.await?,
)
@@ -178,6 +179,35 @@ impl From<ReconcileError> for ApiError {
}
}
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
/// be allowed to run if Service has finished its initial reconciliation.
async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
{
let state = get_state(&request);
let service = state.service.clone();
let startup_complete = service.startup_complete.clone();
if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
.await
.is_err()
{
// This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
// timeouts around its remote calls, to bound its runtime.
return Err(ApiError::Timeout(
"Timed out waiting for service readiness".into(),
));
}
request_span(
request,
|request| async move { handler(service, request).await },
)
.await
}
pub fn make_router(
service: Arc<Service>,
auth: Option<Arc<SwappableJwtAuth>>,
@@ -205,14 +235,20 @@ pub fn make_router(
.put("/node/:node_id/config", |r| {
request_span(r, handle_node_configure)
})
.post("/tenant", |r| request_span(r, handle_tenant_create))
.post("/tenant/:tenant_id/timeline", |r| {
request_span(r, handle_tenant_timeline_create)
.post("/v1/tenant", |r| {
tenant_service_handler(r, handle_tenant_create)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
tenant_service_handler(r, handle_tenant_timeline_create)
})
.get("/tenant/:tenant_id/locate", |r| {
request_span(r, handle_tenant_locate)
tenant_service_handler(r, handle_tenant_locate)
})
.put("/tenant/:tenant_shard_id/migrate", |r| {
request_span(r, handle_tenant_shard_migrate)
tenant_service_handler(r, handle_tenant_shard_migrate)
})
// Path aliases for tests_forward_compatibility
// TODO: remove these in future PR
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
}

View File

@@ -7,6 +7,7 @@ mod node;
pub mod persistence;
mod reconciler;
mod scheduler;
mod schema;
pub mod service;
mod tenant_state;

View File

@@ -12,9 +12,9 @@ use camino::Utf8PathBuf;
use clap::Parser;
use metrics::launch_timestamp::LaunchTimestamp;
use std::sync::Arc;
use tokio::signal::unix::SignalKind;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::logging::{self, LogFormat};
use utils::signals::{ShutdownSignals, Signal};
use utils::{project_build_tag, project_git_version, tcp_listener};
@@ -40,6 +40,10 @@ struct Cli {
/// Path to the .json file to store state (will be created if it doesn't exist)
#[arg(short, long)]
path: Utf8PathBuf,
/// URL to connect to postgres, like postgresql://localhost:1234/attachment_service
#[arg(long)]
database_url: String,
}
#[tokio::main]
@@ -66,9 +70,14 @@ async fn main() -> anyhow::Result<()> {
jwt_token: args.jwt_token,
};
let persistence = Arc::new(Persistence::spawn(&args.path).await);
let json_path = if args.path.as_os_str().is_empty() {
None
} else {
Some(args.path)
};
let persistence = Arc::new(Persistence::new(args.database_url, json_path.clone()));
let service = Service::spawn(config, persistence).await?;
let service = Service::spawn(config, persistence.clone()).await?;
let http_listener = tcp_listener::bind(args.listen)?;
@@ -81,20 +90,31 @@ async fn main() -> anyhow::Result<()> {
let router = make_router(service, auth)
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?.serve(service);
let router_service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?.serve(router_service);
tracing::info!("Serving on {0}", args.listen);
tokio::task::spawn(server);
ShutdownSignals::handle(|signal| match signal {
Signal::Interrupt | Signal::Terminate | Signal::Quit => {
tracing::info!("Got {}. Terminating", signal.name());
// We're just a test helper: no graceful shutdown.
std::process::exit(0);
}
})?;
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
tokio::select! {
_ = sigint.recv() => {},
_ = sigterm.recv() => {},
_ = sigquit.recv() => {},
}
tracing::info!("Terminating on signal");
Ok(())
if json_path.is_some() {
// Write out a JSON dump on shutdown: this is used in compat tests to avoid passing
// full postgres dumps around.
if let Err(e) = persistence.write_tenants_json().await {
tracing::error!("Failed to write JSON on shutdown: {e}")
}
}
std::process::exit(0);
}

View File

@@ -1,6 +1,8 @@
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use utils::id::NodeId;
use crate::persistence::NodePersistence;
#[derive(Clone)]
pub(crate) struct Node {
pub(crate) id: NodeId,
@@ -34,4 +36,15 @@ impl Node {
NodeSchedulingPolicy::Pause => false,
}
}
pub(crate) fn to_persistent(&self) -> NodePersistence {
NodePersistence {
node_id: self.id.0 as i64,
scheduling_policy: self.scheduling.into(),
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port as i32,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
}
}
}

View File

@@ -1,182 +1,161 @@
use std::{collections::HashMap, str::FromStr};
use std::collections::HashMap;
use std::str::FromStr;
use camino::{Utf8Path, Utf8PathBuf};
use control_plane::{
attachment_service::{NodeAvailability, NodeSchedulingPolicy},
local_env::LocalEnv,
};
use pageserver_api::{
models::TenantConfig,
shard::{ShardCount, ShardNumber, TenantShardId},
};
use camino::Utf8Path;
use camino::Utf8PathBuf;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::{
generation::Generation,
id::{NodeId, TenantId},
};
use utils::generation::Generation;
use utils::id::{NodeId, TenantId};
use crate::{node::Node, PlacementPolicy};
use crate::node::Node;
use crate::PlacementPolicy;
/// Placeholder for storage. This will be replaced with a database client.
/// ## What do we store?
///
/// The attachment service does not store most of its state durably.
///
/// The essential things to store durably are:
/// - generation numbers, as these must always advance monotonically to ensure data safety.
/// - Tenant's PlacementPolicy and TenantConfig, as the source of truth for these is something external.
/// - Node's scheduling policies, as the source of truth for these is something external.
///
/// Other things we store durably as an implementation detail:
/// - Node's host/port: this could be avoided it we made nodes emit a self-registering heartbeat,
/// but it is operationally simpler to make this service the authority for which nodes
/// it talks to.
///
/// ## Performance/efficiency
///
/// The attachment service does not go via the database for most things: there are
/// a couple of places where we must, and where efficiency matters:
/// - Incrementing generation numbers: the Reconciler has to wait for this to complete
/// before it can attach a tenant, so this acts as a bound on how fast things like
/// failover can happen.
/// - Pageserver re-attach: we will increment many shards' generations when this happens,
/// so it is important to avoid e.g. issuing O(N) queries.
///
/// Database calls relating to nodes have low performance requirements, as they are very rarely
/// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
inner: std::sync::Mutex<Inner>,
}
struct Inner {
state: PersistentState,
write_queue_tx: tokio::sync::mpsc::UnboundedSender<PendingWrite>,
database_url: String,
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
// compatible just yet.
json_path: Option<Utf8PathBuf>,
}
/// Legacy format, for use in JSON compat objects in test environment
#[derive(Serialize, Deserialize)]
struct PersistentState {
struct JsonPersistence {
tenants: HashMap<TenantShardId, TenantShardPersistence>,
}
struct PendingWrite {
bytes: Vec<u8>,
done_tx: tokio::sync::oneshot::Sender<()>,
#[derive(thiserror::Error, Debug)]
pub(crate) enum DatabaseError {
#[error(transparent)]
Query(#[from] diesel::result::Error),
#[error(transparent)]
Connection(#[from] diesel::result::ConnectionError),
#[error("Logical error: {0}")]
Logical(String),
}
impl PersistentState {
async fn load(path: &Utf8Path) -> anyhow::Result<Self> {
let bytes = tokio::fs::read(path).await?;
let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
for (tenant_id, tenant) in &mut decoded.tenants {
// Backward compat: an old attachments.json from before PR #6251, replace
// empty strings with proper defaults.
if tenant.tenant_id.is_empty() {
tenant.tenant_id = format!("{}", tenant_id);
tenant.config = serde_json::to_string(&TenantConfig::default())?;
tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())?;
}
}
Ok(decoded)
}
async fn load_or_new(path: &Utf8Path) -> Self {
match Self::load(path).await {
Ok(s) => {
tracing::info!("Loaded state file at {}", path);
s
}
Err(e)
if e.downcast_ref::<std::io::Error>()
.map(|e| e.kind() == std::io::ErrorKind::NotFound)
.unwrap_or(false) =>
{
tracing::info!("Will create state file at {}", path);
Self {
tenants: HashMap::new(),
}
}
Err(e) => {
panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path)
}
}
}
}
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
pub async fn spawn(path: &Utf8Path) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let state = PersistentState::load_or_new(path).await;
tokio::spawn(Self::writer_task(rx, path.to_owned()));
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
Self {
inner: std::sync::Mutex::new(Inner {
state,
write_queue_tx: tx,
}),
database_url,
json_path,
}
}
async fn writer_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<PendingWrite>,
path: Utf8PathBuf,
) {
scopeguard::defer! {
info!("persistence writer task exiting");
};
loop {
match rx.recv().await {
Some(write) => {
tokio::task::spawn_blocking({
let path = path.clone();
move || {
let tmp_path =
utils::crashsafe::path_with_suffix_extension(&path, "___new");
utils::crashsafe::overwrite(&path, &tmp_path, &write.bytes)
}
})
.await
.expect("spawn_blocking")
.expect("write file");
let _ = write.done_tx.send(()); // receiver may lose interest any time
}
None => {
return;
}
}
}
}
/// Perform a modification on our [`PersistentState`].
/// Return a future that completes once our modification has been persisted.
/// The output of the future is the return value of the `txn`` closure.
async fn mutating_transaction<F, R>(&self, txn: F) -> R
/// Call the provided function in a tokio blocking thread, with a Diesel database connection.
async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
where
F: FnOnce(&mut PersistentState) -> R,
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
let (ret, done_rx) = {
let mut inner = self.inner.lock().unwrap();
let ret = txn(&mut inner.state);
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
let write = PendingWrite {
bytes: serde_json::to_vec(&inner.state).expect("Serialization error"),
done_tx,
};
inner
.write_queue_tx
.send(write)
.expect("writer task always outlives self");
(ret, done_rx)
};
// the write task can go away once we start .await'ing
let _: () = done_rx.await.expect("writer task dead, check logs");
ret
let database_url = self.database_url.clone();
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
// TODO: connection pooling, such as via diesel::r2d2
let mut conn = PgConnection::establish(&database_url)?;
func(&mut conn)
})
.await
.expect("Task panic")
}
/// When registering a node, persist it so that on next start we will be able to
/// iterate over known nodes to synchronize their tenant shard states with our observed state.
pub(crate) async fn insert_node(&self, _node: &Node) -> anyhow::Result<()> {
// TODO: node persitence will come with database backend
Ok(())
/// When a node is first registered, persist it before using it for anything
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
let np = node.to_persistent();
self.with_conn(move |conn| -> DatabaseResult<()> {
diesel::insert_into(crate::schema::nodes::table)
.values(&np)
.execute(conn)?;
Ok(())
})
.await
}
/// At startup, we populate the service's list of nodes, and use this list to call into
/// each node to do an initial reconciliation of the state of the world with our in-memory
/// observed state.
pub(crate) async fn list_nodes(&self) -> anyhow::Result<Vec<Node>> {
let env = LocalEnv::load_config()?;
// TODO: node persitence will come with database backend
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
let nodes: Vec<Node> = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table
.load::<NodePersistence>(conn)?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<Node>>())
})
.await?;
// XXX hack: enable test_backward_compatibility to work by populating our list of
if nodes.is_empty() {
return self.list_nodes_local_env().await;
}
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
Ok(nodes)
}
/// Shim for automated compatibility tests: load nodes from LocalEnv instead of database
pub(crate) async fn list_nodes_local_env(&self) -> DatabaseResult<Vec<Node>> {
// Enable test_backward_compatibility to work by populating our list of
// nodes from LocalEnv when it is not present in persistent storage. Otherwise at
// first startup in the compat test, we may have shards but no nodes.
let mut result = Vec::new();
use control_plane::local_env::LocalEnv;
let env = LocalEnv::load_config().map_err(|e| DatabaseError::Logical(format!("{e}")))?;
tracing::info!(
"Loaded {} pageserver nodes from LocalEnv",
"Loading {} pageserver nodes from LocalEnv",
env.pageservers.len()
);
let mut nodes = Vec::new();
for ps_conf in env.pageservers {
let (pg_host, pg_port) =
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
result.push(Node {
let node = Node {
id: ps_conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
@@ -184,16 +163,96 @@ impl Persistence {
listen_http_port: http_port.unwrap_or(80),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
});
};
// Synchronize database with what we learn from LocalEnv
self.insert_node(&node).await?;
nodes.push(node);
}
Ok(result)
Ok(nodes)
}
/// At startup, we populate our map of tenant shards from persistent storage.
pub(crate) async fn list_tenant_shards(&self) -> anyhow::Result<Vec<TenantShardPersistence>> {
let inner = self.inner.lock().unwrap();
Ok(inner.state.tenants.values().cloned().collect())
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
let loaded = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
})
.await?;
if loaded.is_empty() {
if let Some(path) = &self.json_path {
if tokio::fs::try_exists(path)
.await
.map_err(|e| DatabaseError::Logical(format!("Error stat'ing JSON file: {e}")))?
{
tracing::info!("Importing from legacy JSON format at {path}");
return self.list_tenant_shards_json(path).await;
}
}
}
Ok(loaded)
}
/// Shim for automated compatibility tests: load tenants from a JSON file instead of database
pub(crate) async fn list_tenant_shards_json(
&self,
path: &Utf8Path,
) -> DatabaseResult<Vec<TenantShardPersistence>> {
let bytes = tokio::fs::read(path)
.await
.map_err(|e| DatabaseError::Logical(format!("Failed to load JSON: {e}")))?;
let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
.map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
for (tenant_id, tenant) in &mut decoded.tenants {
// Backward compat: an old attachments.json from before PR #6251, replace
// empty strings with proper defaults.
if tenant.tenant_id.is_empty() {
tenant.tenant_id = tenant_id.to_string();
tenant.config = serde_json::to_string(&TenantConfig::default())
.map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
tenant.placement_policy = serde_json::to_string(&PlacementPolicy::default())
.map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
}
}
let tenants: Vec<TenantShardPersistence> = decoded.tenants.into_values().collect();
// Synchronize database with what is in the JSON file
self.insert_tenant_shards(tenants.clone()).await?;
Ok(tenants)
}
/// For use in testing environments, where we dump out JSON on shutdown.
pub async fn write_tenants_json(&self) -> anyhow::Result<()> {
let Some(path) = &self.json_path else {
anyhow::bail!("Cannot write JSON if path isn't set (test environment bug)");
};
tracing::info!("Writing state to {path}...");
let tenants = self.list_tenant_shards().await?;
let mut tenants_map = HashMap::new();
for tsp in tenants {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount(tsp.shard_count as u8),
};
tenants_map.insert(tenant_shard_id, tsp);
}
let json = serde_json::to_string(&JsonPersistence {
tenants: tenants_map,
})?;
tokio::fs::write(path, &json).await?;
tracing::info!("Wrote {} bytes to {path}...", json.len());
Ok(())
}
/// Tenants must be persisted before we schedule them for the first time. This enables us
@@ -201,22 +260,79 @@ impl Persistence {
pub(crate) async fn insert_tenant_shards(
&self,
shards: Vec<TenantShardPersistence>,
) -> anyhow::Result<()> {
self.mutating_transaction(|locked| {
for shard in shards {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(shard.tenant_id.as_str())?,
shard_number: ShardNumber(shard.shard_number as u8),
shard_count: ShardCount(shard.shard_count as u8),
};
locked.tenants.insert(tenant_shard_id, shard);
}
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
for tenant in &shards {
diesel::insert_into(tenant_shards)
.values(tenant)
.execute(conn)?;
}
Ok(())
})?;
Ok(())
})
.await
}
/// Ordering: call this _after_ deleting the tenant on pageservers, but _before_ dropping state for
/// the tenant from memory on this server.
#[allow(unused)]
pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
diesel::delete(tenant_shards)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.execute(conn)?;
Ok(())
})
.await
}
/// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient
/// batched increment of the generations of all tenants whose generation_pageserver is equal to
/// the node that called /re-attach.
#[tracing::instrument(skip_all, fields(node_id))]
pub(crate) async fn re_attach(
&self,
node_id: NodeId,
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_conn(move |conn| {
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(node_id.0 as i64))
.set(generation.eq(generation + 1))
.execute(conn)?;
tracing::info!("Incremented {} tenants' generations", rows_updated);
// TODO: UPDATE+SELECT in one query
let updated = tenant_shards
.filter(generation_pageserver.eq(node_id.0 as i64))
.select(TenantShardPersistence::as_select())
.load(conn)?;
Ok(updated)
})
.await?;
let mut result = HashMap::new();
for tsp in updated {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())
.map_err(|e| DatabaseError::Logical(format!("Malformed tenant id: {e}")))?,
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount(tsp.shard_count as u8),
};
result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
}
Ok(result)
}
/// Reconciler calls this immediately before attaching to a new pageserver, to acquire a unique, monotonically
/// advancing generation number. We also store the NodeId for which the generation was issued, so that in
/// [`Self::re_attach`] we can do a bulk UPDATE on the generations for that node.
@@ -225,47 +341,46 @@ impl Persistence {
tenant_shard_id: TenantShardId,
node_id: NodeId,
) -> anyhow::Result<Generation> {
self.mutating_transaction(|locked| {
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
anyhow::bail!("Tried to increment generation of unknown shard");
};
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_conn(move |conn| {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
.set((
generation.eq(generation + 1),
generation_pageserver.eq(node_id.0 as i64),
))
// TODO: only returning() the generation column
.returning(TenantShardPersistence::as_returning())
.get_result(conn)?;
shard.generation += 1;
shard.generation_pageserver = Some(node_id);
Ok(updated)
})
.await?;
let gen = Generation::new(shard.generation);
Ok(gen)
})
.await
Ok(Generation::new(updated.generation as u32))
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
self.mutating_transaction(|locked| {
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
anyhow::bail!("Tried to increment generation of unknown shard");
};
shard.generation_pageserver = None;
shard.placement_policy = serde_json::to_string(&PlacementPolicy::Detached).unwrap();
Ok(())
})
.await
}
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.0 as i32))
.set((
generation_pageserver.eq(i64::MAX),
placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
))
.execute(conn)?;
pub(crate) async fn re_attach(
&self,
node_id: NodeId,
) -> anyhow::Result<HashMap<TenantShardId, Generation>> {
self.mutating_transaction(|locked| {
let mut result = HashMap::new();
for (tenant_shard_id, shard) in locked.tenants.iter_mut() {
if shard.generation_pageserver == Some(node_id) {
shard.generation += 1;
result.insert(*tenant_shard_id, Generation::new(shard.generation));
}
}
Ok(result)
Ok(updated)
})
.await
.await?;
Ok(())
}
// TODO: when we start shard splitting, we must durably mark the tenant so that
@@ -285,7 +400,8 @@ impl Persistence {
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
#[derive(Serialize, Deserialize, Clone)]
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone)]
#[diesel(table_name = crate::schema::tenant_shards)]
pub(crate) struct TenantShardPersistence {
#[serde(default)]
pub(crate) tenant_id: String,
@@ -296,16 +412,28 @@ pub(crate) struct TenantShardPersistence {
#[serde(default)]
pub(crate) shard_stripe_size: i32,
// Currently attached pageserver
#[serde(rename = "pageserver")]
pub(crate) generation_pageserver: Option<NodeId>,
// Latest generation number: next time we attach, increment this
// and use the incremented number when attaching
pub(crate) generation: u32,
pub(crate) generation: i32,
// Currently attached pageserver
#[serde(rename = "pageserver")]
pub(crate) generation_pageserver: i64,
#[serde(default)]
pub(crate) placement_policy: String,
#[serde(default)]
pub(crate) config: String,
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
#[diesel(table_name = crate::schema::nodes)]
pub(crate) struct NodePersistence {
pub(crate) node_id: i64,
pub(crate) scheduling_policy: String,
pub(crate) listen_http_addr: String,
pub(crate) listen_http_port: i32,
pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: i32,
}

View File

@@ -0,0 +1,27 @@
// @generated automatically by Diesel CLI.
diesel::table! {
nodes (node_id) {
node_id -> Int8,
scheduling_policy -> Varchar,
listen_http_addr -> Varchar,
listen_http_port -> Int4,
listen_pg_addr -> Varchar,
listen_pg_port -> Int4,
}
}
diesel::table! {
tenant_shards (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
shard_stripe_size -> Int4,
generation -> Int4,
generation_pageserver -> Int8,
placement_policy -> Varchar,
config -> Text,
}
}
diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,);

View File

@@ -11,6 +11,7 @@ use control_plane::attachment_service::{
TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use diesel::result::DatabaseErrorKind;
use hyper::StatusCode;
use pageserver_api::{
control_api::{
@@ -26,6 +27,7 @@ use pageserver_api::{
};
use pageserver_client::mgmt_api;
use utils::{
completion::Barrier,
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId},
@@ -35,7 +37,7 @@ use utils::{
use crate::{
compute_hook::ComputeHook,
node::Node,
persistence::{Persistence, TenantShardPersistence},
persistence::{DatabaseError, Persistence, TenantShardPersistence},
scheduler::Scheduler,
tenant_state::{
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
@@ -46,6 +48,10 @@ use crate::{
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
/// How long [`Service::startup_reconcile`] is allowed to take before it should give
/// up on unresponsive pageservers and proceed.
pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
// Top level state available to all HTTP handlers
struct ServiceState {
tenants: BTreeMap<TenantShardId, TenantState>,
@@ -79,10 +85,27 @@ pub struct Config {
pub jwt_token: Option<String>,
}
impl From<DatabaseError> for ApiError {
fn from(err: DatabaseError) -> ApiError {
match err {
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
DatabaseError::Logical(reason) => {
ApiError::InternalServerError(anyhow::anyhow!(reason))
}
}
}
}
pub struct Service {
inner: Arc<std::sync::RwLock<ServiceState>>,
config: Config,
persistence: Arc<Persistence>,
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
/// passes, it isn't safe to do any actions that mutate tenants.
pub(crate) startup_complete: Barrier,
}
impl From<ReconcileWaitError> for ApiError {
@@ -96,77 +119,32 @@ impl From<ReconcileWaitError> for ApiError {
}
impl Service {
pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
let mut nodes = persistence.list_nodes().await?;
tracing::info!("Loaded {} nodes from database.", nodes.len());
tracing::info!("Loading shards from database...");
let tenant_shard_persistence = persistence.list_tenant_shards().await?;
tracing::info!(
"Loaded {} shards from database.",
tenant_shard_persistence.len()
);
let mut tenants = BTreeMap::new();
for tsp in tenant_shard_persistence {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount(tsp.shard_count as u8),
};
let shard_identity = if tsp.shard_count == 0 {
ShardIdentity::unsharded()
} else {
ShardIdentity::new(
ShardNumber(tsp.shard_number as u8),
ShardCount(tsp.shard_count as u8),
ShardStripeSize(tsp.shard_stripe_size as u32),
)?
};
let new_tenant = TenantState {
tenant_shard_id,
shard: shard_identity,
sequence: Sequence::initial(),
// Note that we load generation, but don't care about generation_pageserver. We will either end up finding
// our existing attached location and it will match generation_pageserver, or we will attach somewhere new
// and update generation_pageserver in the process.
generation: Generation::new(tsp.generation),
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
intent: IntentState::new(),
observed: ObservedState::new(),
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
};
tenants.insert(tenant_shard_id, new_tenant);
}
pub fn get_config(&self) -> &Config {
&self.config
}
/// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
/// until this is done.
async fn startup_reconcile(&self) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed = HashMap::new();
let nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
// TODO: issue these requests concurrently
for node in &mut nodes {
let client = mgmt_api::Client::new(node.base_url(), config.jwt_token.as_deref());
for node in nodes.values() {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
tracing::info!("Scanning shards on node {}...", node.id);
match client.list_location_config().await {
Err(e) => {
tracing::warn!("Could not contact pageserver {} ({e})", node.id);
// TODO: be more tolerant, apply a generous 5-10 second timeout
// TODO: setting a node to Offline is a dramatic thing to do, and can
// prevent neon_local from starting up (it starts this service before
// any pageservers are running). It may make sense to give nodes
// a Pending state to accomodate this situation, and allow (but deprioritize)
// scheduling on Pending nodes.
//node.availability = NodeAvailability::Offline;
// TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case
// pageserver is being restarted at the same time as we are
}
Ok(listing) => {
tracing::info!(
@@ -174,7 +152,6 @@ impl Service {
listing.tenant_shards.len(),
node.id
);
node.availability = NodeAvailability::Active;
for (tenant_shard_id, conf_opt) in listing.tenant_shards {
observed.insert(tenant_shard_id, (node.id, conf_opt));
@@ -186,41 +163,46 @@ impl Service {
let mut cleanup = Vec::new();
// Populate intent and observed states for all tenants, based on reported state on pageservers
for (tenant_shard_id, (node_id, observed_loc)) in observed {
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
let shard_count = {
let mut locked = self.inner.write().unwrap();
for (tenant_shard_id, (node_id, observed_loc)) in observed {
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
tenant_state
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
}
// State of nodes is now frozen, transform to a HashMap.
let mut nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
// Populate each tenant's intent state
let mut scheduler = Scheduler::new(&tenants, &nodes);
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
tenant_state.intent_from_observed();
if let Err(e) = tenant_state.schedule(&mut scheduler) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
// to clients.
tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
tenant_state
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
}
}
// Populate each tenant's intent state
let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
tenant_state.intent_from_observed();
if let Err(e) = tenant_state.schedule(&mut scheduler) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
// to clients.
tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
}
}
locked.tenants.len()
};
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
// generation_pageserver in the database.
// Clean up any tenants that were found on pageservers but are not known to us.
for (tenant_shard_id, node_id) in cleanup {
// A node reported a tenant_shard_id which is unknown to us: detach it.
let node = nodes
.get_mut(&node_id)
.get(&node_id)
.expect("Always exists: only known nodes are scanned");
let client = mgmt_api::Client::new(node.base_url(), config.jwt_token.as_deref());
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
match client
.location_config(
tenant_shard_id,
@@ -252,13 +234,80 @@ impl Service {
}
}
let shard_count = tenants.len();
// Finally, now that the service is up and running, launch reconcile operations for any tenants
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted.
let reconcile_tasks = self.reconcile_all();
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
}
pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
let nodes = persistence.list_nodes().await?;
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
tracing::info!("Loading shards from database...");
let tenant_shard_persistence = persistence.list_tenant_shards().await?;
tracing::info!(
"Loaded {} shards from database.",
tenant_shard_persistence.len()
);
let mut tenants = BTreeMap::new();
for tsp in tenant_shard_persistence {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount(tsp.shard_count as u8),
};
let shard_identity = if tsp.shard_count == 0 {
ShardIdentity::unsharded()
} else {
ShardIdentity::new(
ShardNumber(tsp.shard_number as u8),
ShardCount(tsp.shard_count as u8),
ShardStripeSize(tsp.shard_stripe_size as u32),
)?
};
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
if tsp.generation_pageserver != i64::MAX {
intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
}
let new_tenant = TenantState {
tenant_shard_id,
shard: shard_identity,
sequence: Sequence::initial(),
generation: Generation::new(tsp.generation as u32),
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
intent,
observed: ObservedState::new(),
config: serde_json::from_str(&tsp.config).unwrap(),
reconciler: None,
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
};
tenants.insert(tenant_shard_id, new_tenant);
}
let (startup_completion, startup_complete) = utils::completion::channel();
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
result_tx, nodes, tenants,
))),
config,
persistence,
startup_complete,
});
let result_task_this = this.clone();
@@ -316,11 +365,13 @@ impl Service {
}
});
// Finally, now that the service is up and running, launch reconcile operations for any tenants
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted.
let reconcile_tasks = this.reconcile_all();
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
let startup_reconcile_this = this.clone();
tokio::task::spawn(async move {
// Block the [`Service::startup_complete`] barrier until we're done
let _completion = startup_completion;
startup_reconcile_this.startup_reconcile().await
});
Ok(this)
}
@@ -336,7 +387,6 @@ impl Service {
let locked = self.inner.write().unwrap();
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
};
if insert {
let tsp = TenantShardPersistence {
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
@@ -344,22 +394,39 @@ impl Service {
shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
shard_stripe_size: 0,
generation: 0,
generation_pageserver: None,
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
};
self.persistence.insert_tenant_shards(vec![tsp]).await?;
match self.persistence.insert_tenant_shards(vec![tsp]).await {
Err(e) => match e {
DatabaseError::Query(diesel::result::Error::DatabaseError(
DatabaseErrorKind::UniqueViolation,
_,
)) => {
tracing::info!(
"Raced with another request to insert tenant {}",
attach_req.tenant_shard_id
)
}
_ => return Err(e.into()),
},
Ok(()) => {
tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
attach_req.tenant_shard_id,
TenantState::new(
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Single,
),
);
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
attach_req.tenant_shard_id,
TenantState::new(
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Single,
),
);
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
}
}
}
let new_generation = if let Some(req_node_id) = attach_req.node_id {
@@ -506,6 +573,14 @@ impl Service {
id: req_tenant.id,
valid,
});
} else {
// After tenant deletion, we may approve any validation. This avoids
// spurious warnings on the pageserver if it has pending LSN updates
// at the point a deletion happens.
response.tenants.push(ValidateResponseTenant {
id: req_tenant.id,
valid: true,
});
}
}
response
@@ -561,7 +636,7 @@ impl Service {
shard_count: tenant_shard_id.shard_count.0 as i32,
shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
generation: 0,
generation_pageserver: None,
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
config: serde_json::to_string(&create_req.config).unwrap(),
})
@@ -967,10 +1042,7 @@ impl Service {
availability: NodeAvailability::Active,
};
// TODO: idempotency if the node already exists in the database
self.persistence
.insert_node(&new_node)
.await
.map_err(ApiError::InternalServerError)?;
self.persistence.insert_node(&new_node).await?;
let mut locked = self.inner.write().unwrap();
let mut new_nodes = (*locked.nodes).clone();

View File

@@ -1,5 +1,11 @@
use crate::{background_process, local_env::LocalEnv};
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use diesel::{
backend::Backend,
query_builder::{AstPass, QueryFragment, QueryId},
Connection, PgConnection, QueryResult, RunQueryDsl,
};
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
use hyper::Method;
use pageserver_api::{
models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo},
@@ -7,9 +13,9 @@ use pageserver_api::{
};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{path::PathBuf, str::FromStr};
use std::{env, str::FromStr};
use tokio::process::Command;
use tracing::instrument;
use utils::{
auth::{Claims, Scope},
@@ -19,14 +25,17 @@ use utils::{
pub struct AttachmentService {
env: LocalEnv,
listen: String,
path: PathBuf,
path: Utf8PathBuf,
jwt_token: Option<String>,
public_key_path: Option<Utf8PathBuf>,
postgres_port: u16,
client: reqwest::Client,
}
const COMMAND: &str = "attachment_service";
const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
pub tenant_shard_id: TenantShardId,
@@ -169,7 +178,9 @@ pub struct TenantShardMigrateResponse {}
impl AttachmentService {
pub fn from_env(env: &LocalEnv) -> Self {
let path = env.base_data_dir.join("attachments.json");
let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
.unwrap()
.join("attachments.json");
// Makes no sense to construct this if pageservers aren't going to use it: assume
// pageservers have control plane API set
@@ -181,6 +192,13 @@ impl AttachmentService {
listen_url.port().unwrap()
);
// Convention: NeonEnv in python tests reserves the next port after the control_plane_api
// port, for use by our captive postgres.
let postgres_port = listen_url
.port()
.expect("Control plane API setting should always have a port")
+ 1;
// Assume all pageservers have symmetric auth configuration: this service
// expects to use one JWT token to talk to all of them.
let ps_conf = env
@@ -209,6 +227,7 @@ impl AttachmentService {
listen,
jwt_token,
public_key_path,
postgres_port,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
@@ -220,13 +239,214 @@ impl AttachmentService {
.expect("non-Unicode path")
}
pub async fn start(&self) -> anyhow::Result<()> {
let path_str = self.path.to_string_lossy();
/// PIDFile for the postgres instance used to store attachment service state
fn postgres_pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(
self.env
.base_data_dir
.join("attachment_service_postgres.pid"),
)
.expect("non-Unicode path")
}
let mut args = vec!["-l", &self.listen, "-p", &path_str]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
/// In order to access database migrations, we need to find the Neon source tree
async fn find_source_root(&self) -> anyhow::Result<Utf8PathBuf> {
// We assume that either prd or our binary is in the source tree. The former is usually
// true for automated test runners, the latter is usually true for developer workstations. Often
// both are true, which is fine.
let candidate_start_points = [
// Current working directory
Utf8PathBuf::from_path_buf(std::env::current_dir()?).unwrap(),
// Directory containing the binary we're running inside
Utf8PathBuf::from_path_buf(env::current_exe()?.parent().unwrap().to_owned()).unwrap(),
];
// For each candidate start point, search through ancestors looking for a neon.git source tree root
for start_point in &candidate_start_points {
// Start from the build dir: assumes we are running out of a built neon source tree
for path in start_point.ancestors() {
// A crude approximation: the root of the source tree is whatever contains a "control_plane"
// subdirectory.
let control_plane = path.join("control_plane");
if tokio::fs::try_exists(&control_plane).await? {
return Ok(path.to_owned());
}
}
}
// Fall-through
Err(anyhow::anyhow!(
"Could not find control_plane src dir, after searching ancestors of {candidate_start_points:?}"
))
}
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
///
/// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
/// to other versions if that one isn't found. Some automated tests create circumstances
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
for v in prefer_versions {
let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
if tokio::fs::try_exists(&path).await? {
return Ok(path);
}
}
// Fall through
anyhow::bail!(
"Postgres binaries not found in {}",
self.env.pg_distrib_dir.display()
);
}
/// Readiness check for our postgres process
async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result<bool> {
let bin_path = pg_bin_dir.join("pg_isready");
let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)];
let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?;
Ok(exitcode.success())
}
/// Create our database if it doesn't exist, and run migrations.
///
/// This function is equivalent to the `diesel setup` command in the diesel CLI. We implement
/// the same steps by hand to avoid imposing a dependency on installing diesel-cli for developers
/// who just want to run `cargo neon_local` without knowing about diesel.
///
/// Returns the database url
pub async fn setup_database(&self) -> anyhow::Result<String> {
let database_url = format!(
"postgresql://localhost:{}/attachment_service",
self.postgres_port
);
println!("Running attachment service database setup...");
fn change_database_of_url(database_url: &str, default_database: &str) -> (String, String) {
let base = ::url::Url::parse(database_url).unwrap();
let database = base.path_segments().unwrap().last().unwrap().to_owned();
let mut new_url = base.join(default_database).unwrap();
new_url.set_query(base.query());
(database, new_url.into())
}
#[derive(Debug, Clone)]
pub struct CreateDatabaseStatement {
db_name: String,
}
impl CreateDatabaseStatement {
pub fn new(db_name: &str) -> Self {
CreateDatabaseStatement {
db_name: db_name.to_owned(),
}
}
}
impl<DB: Backend> QueryFragment<DB> for CreateDatabaseStatement {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DB>) -> QueryResult<()> {
out.push_sql("CREATE DATABASE ");
out.push_identifier(&self.db_name)?;
Ok(())
}
}
impl<Conn> RunQueryDsl<Conn> for CreateDatabaseStatement {}
impl QueryId for CreateDatabaseStatement {
type QueryId = ();
const HAS_STATIC_QUERY_ID: bool = false;
}
if PgConnection::establish(&database_url).is_err() {
let (database, postgres_url) = change_database_of_url(&database_url, "postgres");
println!("Creating database: {database}");
let mut conn = PgConnection::establish(&postgres_url)?;
CreateDatabaseStatement::new(&database).execute(&mut conn)?;
}
let mut conn = PgConnection::establish(&database_url)?;
let migrations_dir = self
.find_source_root()
.await?
.join("control_plane/attachment_service/migrations");
let migrations = diesel_migrations::FileBasedMigrations::from_path(migrations_dir)?;
println!("Running migrations in {}", migrations.path().display());
HarnessWithOutput::write_to_stdout(&mut conn)
.run_pending_migrations(migrations)
.map(|_| ())
.map_err(|e| anyhow::anyhow!(e))?;
println!("Migrations complete");
Ok(database_url)
}
pub async fn start(&self) -> anyhow::Result<()> {
// Start a vanilla Postgres process used by the attachment service for persistence.
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
.unwrap()
.join("attachment_service_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_log_path = pg_data_path.join("postgres.log");
if !tokio::fs::try_exists(&pg_data_path).await? {
// Initialize empty database
let initdb_path = pg_bin_dir.join("initdb");
let mut child = Command::new(&initdb_path)
.args(["-D", pg_data_path.as_ref()])
.spawn()
.expect("Failed to spawn initdb");
let status = child.wait().await?;
if !status.success() {
anyhow::bail!("initdb failed with status {status}");
}
tokio::fs::write(
&pg_data_path.join("postgresql.conf"),
format!("port = {}", self.postgres_port),
)
.await?;
};
println!("Starting attachment service database...");
let db_start_args = [
"-w",
"-D",
pg_data_path.as_ref(),
"-l",
pg_log_path.as_ref(),
"start",
];
background_process::start_process(
"attachment_service_db",
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
[],
background_process::InitialPidFile::Create(self.postgres_pid_file()),
|| self.pg_isready(&pg_bin_dir),
)
.await?;
// Run migrations on every startup, in case something changed.
let database_url = self.setup_database().await?;
let mut args = vec![
"-l",
&self.listen,
"-p",
self.path.as_ref(),
"--database-url",
&database_url,
]
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();
if let Some(jwt_token) = &self.jwt_token {
args.push(format!("--jwt-token={jwt_token}"));
}
@@ -235,7 +455,7 @@ impl AttachmentService {
args.push(format!("--public-key={public_key_path}"));
}
let result = background_process::start_process(
background_process::start_process(
COMMAND,
&self.env.base_data_dir,
&self.env.attachment_service_bin(),
@@ -252,30 +472,46 @@ impl AttachmentService {
}
},
)
.await;
.await?;
// TODO: shouldn't we bail if we fail to spawn the process?
for ps_conf in &self.env.pageservers {
let (pg_host, pg_port) =
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
self.node_register(NodeRegisterRequest {
node_id: ps_conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
Ok(())
}
pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
println!("Stopping attachment service database...");
let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_stop_args)
.spawn()?
.wait()
.await?;
if !stop_status.success() {
let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"];
let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_status_args)
.spawn()?
.wait()
.await?;
// pg_ctl status returns this exit code if postgres is not running: in this case it is
// fine that stop failed. Otherwise it is an error that stop failed.
const PG_STATUS_NOT_RUNNING: i32 = 3;
if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
println!("Attachment service data base is already stopped");
return Ok(());
} else {
anyhow::bail!("Failed to stop attachment service database: {stop_status}")
}
}
result
Ok(())
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())
}
/// Simple HTTP request wrapper for calling into attachment service
async fn dispatch<RQ, RS>(
&self,
@@ -357,7 +593,7 @@ impl AttachmentService {
&self,
req: TenantCreateRequest,
) -> anyhow::Result<TenantCreateResponse> {
self.dispatch(Method::POST, "tenant".to_string(), Some(req))
self.dispatch(Method::POST, "v1/tenant".to_string(), Some(req))
.await
}
@@ -414,7 +650,7 @@ impl AttachmentService {
) -> anyhow::Result<TimelineInfo> {
self.dispatch(
Method::POST,
format!("tenant/{tenant_id}/timeline"),
format!("v1/tenant/{tenant_id}/timeline"),
Some(req),
)
.await

View File

@@ -135,7 +135,7 @@ fn main() -> Result<()> {
"tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
"timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
"start" => rt.block_on(handle_start_all(sub_args, &env)),
"stop" => handle_stop_all(sub_args, &env),
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
@@ -1056,8 +1056,9 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
if let Err(e) = get_pageserver(env, subcommand_args)?
.start(&pageserver_config_overrides(subcommand_args))
.start(&pageserver_config_overrides(subcommand_args), *register)
.await
{
eprintln!("pageserver start failed: {e}");
@@ -1086,24 +1087,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
}
if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args))
.await
{
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("migrate", subcommand_args)) => {
let pageserver = get_pageserver(env, subcommand_args)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args))
.start(&pageserver_config_overrides(subcommand_args), false)
.await
{
eprintln!("pageserver start failed: {e}");
@@ -1161,7 +1145,7 @@ async fn handle_attachment_service(
.map(|s| s.as_str())
== Some("immediate");
if let Err(e) = svc.stop(immediate) {
if let Err(e) = svc.stop(immediate).await {
eprintln!("stop failed: {}", e);
exit(1);
}
@@ -1257,7 +1241,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start().await {
eprintln!("attachment_service start failed: {:#}", e);
try_stop_all(env, true);
try_stop_all(env, true).await;
exit(1);
}
}
@@ -1265,11 +1249,11 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver
.start(&pageserver_config_overrides(sub_match))
.start(&pageserver_config_overrides(sub_match), true)
.await
{
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
try_stop_all(env, true);
try_stop_all(env, true).await;
exit(1);
}
}
@@ -1278,23 +1262,23 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start(vec![]).await {
eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
try_stop_all(env, false);
try_stop_all(env, false).await;
exit(1);
}
}
Ok(())
}
fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let immediate =
sub_match.get_one::<String>("stop-mode").map(|s| s.as_str()) == Some("immediate");
try_stop_all(env, immediate);
try_stop_all(env, immediate).await;
Ok(())
}
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
@@ -1329,7 +1313,7 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
if env.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate) {
if let Err(e) = attachment_service.stop(immediate).await {
eprintln!("attachment service stop failed: {e:#}");
}
}
@@ -1549,7 +1533,11 @@ fn cli() -> Command {
.subcommand(Command::new("status"))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone())
.arg(pageserver_config_args.clone()).arg(Arg::new("register")
.long("register")
.default_value("true").required(false)
.value_parser(value_parser!(bool))
.value_name("register"))
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")

View File

@@ -438,7 +438,7 @@ impl Endpoint {
}
fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
// TODO use background_process::stop_process instead
// TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
let pid = nix::unistd::Pid::from_raw(pid as i32);
@@ -583,9 +583,21 @@ impl Endpoint {
}
let child = cmd.spawn()?;
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let child = scopeguard::guard(child, |mut child| {
println!("SIGKILL & wait the started process");
(|| {
// TODO: use another signal that can be caught by the child so it can clean up any children it spawned
child.kill().context("SIGKILL child")?;
child.wait().context("wait() for child process")?;
anyhow::Ok(())
})()
.with_context(|| format!("scopeguard kill&wait child {child:?}"))
.unwrap();
});
// Write down the pid so we can wait for it when we want to stop
// TODO use background_process::start_process instead
// TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
let pid = child.id();
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
std::fs::write(pidfile_path, pid.to_string())?;
@@ -634,6 +646,9 @@ impl Endpoint {
std::thread::sleep(ATTEMPT_INTERVAL);
}
// disarm the scopeguard, let the child outlive this function (and neon_local invoction)
drop(scopeguard::ScopeGuard::into_inner(child));
Ok(())
}

View File

@@ -223,7 +223,11 @@ impl LocalEnv {
}
pub fn attachment_service_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("attachment_service")
// Irrespective of configuration, attachment service binary is always
// run from the same location as neon_local. This means that for compatibility
// tests that run old pageserver/safekeeper, they still run latest attachment service.
let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned();
neon_local_bin_dir.join("attachment_service")
}
pub fn safekeeper_bin(&self) -> PathBuf {

View File

@@ -30,6 +30,7 @@ use utils::{
lsn::Lsn,
};
use crate::attachment_service::{AttachmentService, NodeRegisterRequest};
use crate::local_env::PageServerConf;
use crate::{background_process, local_env::LocalEnv};
@@ -161,8 +162,8 @@ impl PageServerNode {
.expect("non-Unicode path")
}
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> {
self.start_node(config_overrides, false, register).await
}
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
@@ -207,6 +208,7 @@ impl PageServerNode {
&self,
config_overrides: &[&str],
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
@@ -244,7 +246,26 @@ impl PageServerNode {
}
},
)
.await
.await?;
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
Ok(())
}
fn pageserver_basic_args<'a>(

9
diesel.toml Normal file
View File

@@ -0,0 +1,9 @@
# For documentation on how to configure this file,
# see https://diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "control_plane/attachment_service/src/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId"]
[migrations_directory]
dir = "control_plane/attachment_service/migrations"

View File

@@ -1,4 +1,5 @@
use std::env;
use std::fmt::{Debug, Display};
use std::num::NonZeroUsize;
use std::ops::ControlFlow;
use std::sync::Arc;
@@ -8,6 +9,7 @@ use std::{collections::HashSet, time::SystemTime};
use crate::common::{download_to_vec, upload_stream};
use anyhow::Context;
use camino::Utf8Path;
use futures_util::Future;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
};
@@ -22,6 +24,7 @@ mod common;
mod tests_s3;
use common::{cleanup, ensure_logging_ready, upload_remote_data, upload_simple_remote_data};
use utils::backoff;
const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
@@ -39,6 +42,25 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// to take the time from S3 response headers.
const WAIT_TIME: Duration = Duration::from_millis(3_000);
async fn retry<T, O, F, E>(op: O) -> Result<T, E>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
let warn_threshold = 3;
let max_retries = 10;
backoff::retry(
op,
|_e| false,
warn_threshold,
max_retries,
"test retry",
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await
}
async fn time_point() -> SystemTime {
tokio::time::sleep(WAIT_TIME).await;
let ret = SystemTime::now();
@@ -47,8 +69,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
}
async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
Ok(client
.list_files(None)
Ok(retry(|| client.list_files(None))
.await
.context("list root files failure")?
.into_iter()
@@ -64,16 +85,23 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None).await?;
retry(|| {
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None)
})
.await?;
let t0_files = list_files(&ctx.client).await?;
let t0 = time_point().await;
println!("at t0: {t0_files:?}");
let old_data = "remote blob data2";
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
retry(|| {
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None)
})
.await?;
let t1_files = list_files(&ctx.client).await?;
let t1 = time_point().await;
@@ -81,7 +109,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// A little check to ensure that our clock is not too far off from the S3 clock
{
let dl = ctx.client.download(&path2).await?;
let dl = retry(|| ctx.client.download(&path2)).await?;
let last_modified = dl.last_modified.unwrap();
let half_wt = WAIT_TIME.mul_f32(0.5);
let t0_hwt = t0 + half_wt;
@@ -92,15 +120,21 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
}
}
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None).await?;
retry(|| {
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None)
})
.await?;
let new_data = "new remote blob data2";
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
ctx.client.delete(&path1).await?;
retry(|| {
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None)
})
.await?;
retry(|| ctx.client.delete(&path1)).await?;
let t2_files = list_files(&ctx.client).await?;
let t2 = time_point().await;
println!("at t2: {t2_files:?}");
@@ -137,7 +171,9 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
assert_eq!(t0_files, t0_files_recovered);
// cleanup
ctx.client.delete_objects(&[path1, path2, path3]).await?;
let paths = &[path1, path2, path3];
retry(|| ctx.client.delete_objects(paths)).await?;
Ok(())
}

View File

@@ -1,7 +1,7 @@
use std::{
borrow::Cow,
fs::{self, File},
io::{self, Write},
io,
};
use camino::{Utf8Path, Utf8PathBuf};
@@ -112,48 +112,6 @@ pub async fn fsync_async(path: impl AsRef<Utf8Path>) -> Result<(), std::io::Erro
tokio::fs::File::open(path.as_ref()).await?.sync_all().await
}
/// Writes a file to the specified `final_path` in a crash safe fasion
///
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
///
/// NB: an async variant of this code exists in Pageserver's VirtualFile.
pub fn overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
nix::errno::Errno::EINVAL as i32,
));
};
std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true)
.open(tmp_path)?;
file.write_all(content)?;
file.sync_all()?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd = std::fs::OpenOptions::new()
.read(true)
.open(final_path_parent)?;
final_parent_dirfd.sync_all()?;
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -86,7 +86,6 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
lru = "0.12.2"
[dev-dependencies]
criterion.workspace = true

View File

@@ -275,22 +275,4 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn set_io_engine(&self, engine_str: &str) -> Result<()> {
let uri = format!("{}/v1/set_io_engine", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, engine_str)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn set_request_lru_size(&self, size: usize) -> Result<()> {
let uri = format!("{}/v1/set_req_lru_size", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, size)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -51,10 +51,6 @@ pub(crate) struct Args {
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
#[clap(long)]
keyspace_cache: Option<Utf8PathBuf>,
#[clap(long)]
set_io_engine: Option<String>,
#[clap(long)]
set_req_lru_size: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -107,14 +103,6 @@ async fn main_impl(
args.pageserver_jwt.as_deref(),
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.set_io_engine(engine_str).await?;
}
if let Some(req_lru_size) = &args.set_req_lru_size {
mgmt_api_client.set_request_lru_size(*req_lru_size).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,

View File

@@ -66,12 +66,12 @@ impl serde::Serialize for LatencyPercentiles {
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for (i, p) in LATENCY_PERCENTILES.iter().enumerate() {
for p in LATENCY_PERCENTILES {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[i])
&humantime::format_duration(self.latency_percentiles[0])
),
)?;
}

View File

@@ -1,91 +0,0 @@
use std::cell::RefCell;
use crate::tenant::disk_btree::PAGE_SZ;
#[repr(C, align(8192))]
struct BufferContent([u8; PAGE_SZ]);
impl BufferContent {
fn empty() -> Self {
BufferContent(std::array::from_fn(|_| 0))
}
}
pub struct Buffer(Option<Box<BufferContent>>);
// Thread-local list of re-usable buffers.
thread_local! {
static POOL: RefCell<Vec<Box<BufferContent>>> = RefCell::new(Vec::new());
}
pub(crate) fn get() -> Buffer {
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
match maybe {
Some(buf) => Buffer(Some(buf)),
None => Buffer(Some(Box::new(BufferContent::empty()))),
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let buf = self.0.take().unwrap();
POOL.with(|rc| rc.borrow_mut().push(buf))
}
}
impl std::ops::Deref for Buffer {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
&self.0.as_ref().unwrap().as_ref().0
}
}
impl std::ops::DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0.as_mut().unwrap().as_mut().0
}
}
pub(crate) struct PageWriteGuardBuf {
page: Buffer,
init_up_to: usize,
}
impl PageWriteGuardBuf {
pub fn new(buf: Buffer) -> Self {
PageWriteGuardBuf {
page: buf,
init_up_to: 0,
}
}
pub fn assume_init(self) -> Buffer {
assert_eq!(self.init_up_to, PAGE_SZ);
self.page
}
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.page.len()
}
}
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
// hence it's safe to hand out the `stable_mut_ptr()`.
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.page.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}

View File

@@ -86,9 +86,7 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use std::sync::{Arc, Mutex};
use crate::{buffer_pool, page_cache, task_mgr::TaskKind};
use crate::task_mgr::TaskKind;
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
@@ -97,8 +95,6 @@ pub struct RequestContext {
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub(crate) buf_cache:
Option<Arc<Mutex<lru::LruCache<page_cache::CacheKey, Arc<buffer_pool::Buffer>>>>>,
}
/// The kind of access to the page cache.
@@ -154,7 +150,6 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
buf_cache: None,
},
}
}
@@ -168,7 +163,6 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
buf_cache: original.buf_cache.as_ref().map(Arc::clone),
},
}
}

View File

@@ -2052,28 +2052,5 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
)
.put("/v1/set_io_engine", |r| {
async fn set_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
crate::virtual_file::io_engine::set(kind);
json_response(StatusCode::OK, ())
}
api_handler(r, set_io_engine_handler)
})
.put("/v1/set_req_lru_size", |r| {
async fn set_req_lru_size_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let size: usize = json_request(&mut r).await?;
crate::tenant::timeline::REQ_LRU_SIZE
.store(size, std::sync::atomic::Ordering::Relaxed);
json_response(StatusCode::OK, ())
}
api_handler(r, set_req_lru_size_handler)
})
.any(handler_404))
}

View File

@@ -12,7 +12,6 @@ pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub use pageserver_api::keyspace;
pub(crate) mod buffer_pool;
pub mod metrics;
pub mod page_cache;
pub mod page_service;

View File

@@ -125,6 +125,14 @@ impl ReconstructTimeMetrics {
}
}
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
)
.expect("failed to define a metric")
});
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_getpage_get_reconstruct_data_seconds",
@@ -134,6 +142,14 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
)
.expect("failed to define a metric")
});
pub(crate) struct GetVectoredLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
@@ -172,8 +188,12 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
});
pub(crate) struct PageCacheMetricsForTaskKind {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
}
pub(crate) struct PageCacheMetrics {
@@ -206,6 +226,16 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
let content_kind: &'static str = content_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
@@ -217,6 +247,28 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"exact",
])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"older_lsn",
])
.unwrap()
},
}
}))
})),
@@ -334,6 +386,7 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
#[derive(IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
@@ -2349,6 +2402,8 @@ pub fn preinitialize_metrics() {
// counters
[
&MATERIALIZED_PAGE_CACHE_HIT,
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
&UNEXPECTED_ONDEMAND_DOWNLOADS,
&WALRECEIVER_STARTED_CONNECTIONS,
&WALRECEIVER_BROKER_UPDATES,

View File

@@ -74,13 +74,17 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
time::Duration,
};
use anyhow::Context;
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use utils::id::TimelineId;
use utils::{id::TimelineId, lsn::Lsn};
use crate::{
context::RequestContext,
@@ -133,10 +137,17 @@ pub fn next_file_id() -> FileId {
///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum CacheKey {
ImmutableFilePage { file_id: FileId, blkno: u32 },
enum CacheKey {
MaterializedPage {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
ImmutableFilePage {
file_id: FileId,
blkno: u32,
},
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
@@ -152,6 +163,12 @@ struct MaterializedPageHashKey {
key: Key,
}
#[derive(Clone)]
struct Version {
lsn: Lsn,
slot_idx: usize,
}
struct Slot {
inner: tokio::sync::RwLock<SlotInner>,
usage_count: AtomicU8,
@@ -160,17 +177,8 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
// for `coalesce_readers_permit`
buf: &'static mut SlotContents,
}
#[derive(Clone)]
#[repr(C, align(8192))]
struct SlotContents([u8; PAGE_SZ]);
impl SlotContents {
fn empty() -> Self {
Self(std::array::from_fn(|_| 0))
}
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
buf: &'static mut [u8; PAGE_SZ],
}
impl Slot {
@@ -212,12 +220,41 @@ impl Slot {
}
}
impl SlotInner {
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
}
}
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
/// contains the page, if any.
///
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
/// this HashMap can be replaced with a more concurrent version, there are
/// plenty of such crates around.
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
pinned_slots: Arc<tokio::sync::Semaphore>,
/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
@@ -225,11 +262,14 @@ pub struct PageCache {
size_metrics: &'static PageCacheSizeMetrics,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i> {
_permit: Arc<PinnedSlotsPermit>,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}
@@ -237,13 +277,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
&self.slot_guard.buf.0
self.slot_guard.buf
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
&self.slot_guard.buf.0
self.slot_guard.buf
}
}
@@ -261,6 +301,7 @@ pub struct PageWriteGuard<'i> {
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
}
@@ -268,7 +309,7 @@ enum PageWriteGuardState<'i> {
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.state {
PageWriteGuardState::Invalid { inner } => &mut inner.buf.0,
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -279,7 +320,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
fn deref(&self) -> &Self::Target {
match &self.state {
PageWriteGuardState::Invalid { inner } => &inner.buf.0,
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -291,9 +332,10 @@ impl<'a> PageWriteGuard<'a> {
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner } => {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
@@ -310,7 +352,7 @@ impl Drop for PageWriteGuard<'_> {
///
fn drop(&mut self) {
match &mut self.state {
PageWriteGuardState::Invalid { inner } => {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
@@ -328,6 +370,166 @@ pub enum ReadBufResult<'a> {
}
impl PageCache {
//
// Section 1.1: Public interface functions for looking up and memorizing materialized page
// versions in the page cache
//
/// Look up a materialized page version.
///
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub async fn lookup_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key: *key,
},
lsn,
};
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_older_lsn
.inc();
}
Some((available_lsn, guard))
} else {
panic!("unexpected key type in slot");
}
} else {
None
}
}
///
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key,
},
lsn,
};
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
file_id: FileId,
@@ -347,6 +549,27 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
Duration::from_secs(10),
Arc::clone(&self.pinned_slots).acquire_owned(),
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
Err(_timeout) => {
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
);
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
}
}
}
/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
@@ -356,7 +579,11 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
async fn try_lock_for_read(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
@@ -366,7 +593,10 @@ impl PageCache {
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard { slot_guard: inner });
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
@@ -409,7 +639,12 @@ impl PageCache {
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE
.for_ctx(ctx)
@@ -422,17 +657,19 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
debug_assert!(permit.is_some());
is_first_iteration = false;
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim()
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
@@ -456,8 +693,19 @@ impl PageCache {
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
state: PageWriteGuardState::Invalid { inner },
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}
@@ -478,6 +726,42 @@ impl PageCache {
///
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
Ok(version_idx) => version_idx,
Err(0) => return None,
Err(version_idx) => version_idx - 1,
};
let version = &versions[version_idx];
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
/// Search for a page in the cache using the given search key.
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
Some(versions[version_idx].slot_idx)
} else {
None
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -490,6 +774,27 @@ impl PageCache {
///
fn remove_mapping(&self, old_key: &CacheKey) {
match old_key {
CacheKey::MaterializedPage {
hash_key: old_hash_key,
lsn: old_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
let versions = old_entry.get_mut();
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
versions.remove(version_idx);
self.size_metrics
.current_bytes_materialized_page
.sub_page_sz(1);
if versions.is_empty() {
old_entry.remove_entry();
}
}
} else {
panic!("could not find old key in mapping")
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
@@ -506,6 +811,30 @@ impl PageCache {
/// of the existing mapping and leaves it untouched.
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
match new_key {
CacheKey::MaterializedPage {
hash_key: new_key,
lsn: new_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let versions = map.entry(new_key.clone()).or_default();
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
Ok(version_idx) => Some(versions[version_idx].slot_idx),
Err(version_idx) => {
versions.insert(
version_idx,
Version {
lsn: *new_lsn,
slot_idx,
},
);
self.size_metrics
.current_bytes_materialized_page
.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
@@ -529,6 +858,7 @@ impl PageCache {
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let mut iters = 0;
@@ -610,29 +940,39 @@ impl PageCache {
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
let slot_contents = Box::leak(vec![SlotContents::empty(); num_pages].into_boxed_slice());
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = slot_contents
.into_iter()
.map(|slot_contents| Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf: slot_contents,
}),
usage_count: AtomicU8::new(0),
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf,
permit: std::sync::Mutex::new(Weak::new()),
}),
usage_count: AtomicU8::new(0),
}
})
.collect();
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
}
}
}

View File

@@ -104,29 +104,29 @@ use crate::shutdown_pageserver;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
// pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
// tokio::runtime::Builder::new_multi_thread()
// .thread_name("compute request worker")
// .enable_all()
// .build()
// .expect("Failed to create compute request runtime")
// });
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.build()
.expect("Failed to create compute request runtime")
});
// pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
// tokio::runtime::Builder::new_multi_thread()
// .thread_name("mgmt request worker")
// .enable_all()
// .build()
// .expect("Failed to create mgmt request runtime")
// });
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
});
// pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
// tokio::runtime::Builder::new_multi_thread()
// .thread_name("walreceiver worker")
// .enable_all()
// .build()
// .expect("Failed to create walreceiver runtime")
// });
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
});
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
@@ -150,10 +150,6 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
});
pub static COMPUTE_REQUEST_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
pub static MGMT_REQUEST_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
pub static WALRECEIVER_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);

View File

@@ -25,7 +25,6 @@ impl<'a> BlockCursor<'a> {
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
// TODO: used pooled allocation instead, used by ImageLayer::get_value_reconstruct_data
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)

View File

@@ -5,11 +5,10 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::Deref;
use std::sync::Arc;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -37,7 +36,6 @@ where
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
BufferPool(Arc<crate::buffer_pool::Buffer>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
@@ -64,7 +62,6 @@ impl<'a> Deref for BlockLease<'a> {
fn deref(&self) -> &Self::Target {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::BufferPool(buf) => buf.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
@@ -177,6 +174,17 @@ impl FileBlockReader {
FileBlockReader { file_id, file }
}
/// Read a page from the underlying file into given buffer.
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
@@ -187,69 +195,21 @@ impl FileBlockReader {
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
match ctx.page_content_kind() {
crate::context::PageContentKind::InMemoryLayer => {
unreachable!("this happens in inmemory_layer.rs")
}
crate::context::PageContentKind::Unknown
| crate::context::PageContentKind::DeltaLayerBtreeNode
| crate::context::PageContentKind::ImageLayerBtreeNode => {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
let write_guard = async move {
assert!(write_guard.len() == PAGE_SZ);
self.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await
}
.await?;
Ok(write_guard.mark_valid().into())
}
}
}
crate::context::PageContentKind::ImageLayerValue
| crate::context::PageContentKind::DeltaLayerValue => {
let cache_key = page_cache::CacheKey::ImmutableFilePage {
file_id: self.file_id,
blkno: blknum,
};
if let Some(cache) = &ctx.buf_cache {
let mut cache = cache.lock().unwrap();
if let Some(cached) = cache.get(&cache_key) {
return Ok(BlockLease::BufferPool(Arc::clone(cached)));
};
}
let buf = crate::buffer_pool::get();
let cache = page_cache::get();
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
let buf = async move {
assert_eq!(buf.len(), PAGE_SZ);
std::io::Result::Ok(
self.file
.read_exact_at(
crate::buffer_pool::PageWriteGuardBuf::new(buf),
blknum as u64 * PAGE_SZ as u64,
)
.await?
.assume_init(),
)
}
.await?;
let buf = Arc::new(buf);
if let Some(cache) = &ctx.buf_cache {
cache.lock().unwrap().put(cache_key, Arc::clone(&buf));
}
Ok(BlockLease::BufferPool(buf))
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
}

View File

@@ -65,7 +65,6 @@ where
pub struct ValueReconstructState {
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
pub(crate) scratch: Vec<u8>,
}
/// Return value from [`Layer::get_value_reconstruct_data`]

View File

@@ -688,14 +688,7 @@ impl DeltaLayerInner {
summary: Option<Summary>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new()
.read(true)
.custom_flags(nix::libc::O_DIRECT),
)
.await
{
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
@@ -778,17 +771,15 @@ impl DeltaLayerInner {
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor
.read_blob_into_buf(pos, &mut reconstruct_state.scratch, ctx)
.read_blob_into_buf(pos, &mut buf, ctx)
.await
.with_context(|| {
format!("Failed to read blob from virtual file {}", file.file.path)
})?;
// TODO: this one is super costly, it's allocating a Vec<> for the inner Bytes every time.
// That's on avg 200 allocations.
// Can we re-use the Vec from a buffer pool?
let val = Value::des(&reconstruct_state.scratch).with_context(|| {
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path

View File

@@ -367,14 +367,7 @@ impl ImageLayerInner {
summary: Option<Summary>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new()
.read(true)
.custom_flags(nix::libc::O_DIRECT),
)
.await
{
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};

View File

@@ -199,7 +199,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
tenant.walredo_mgr.maybe_quiesce(period * 10); // TODO: broken with compaction_period 0
tenant.walredo_mgr.maybe_quiesce(period * 10);
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())

View File

@@ -7,8 +7,6 @@ pub mod span;
pub mod uninit;
mod walreceiver;
pub(crate) static REQ_LRU_SIZE: AtomicUsize = AtomicUsize::new(0);
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
@@ -35,6 +33,8 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::gate::Gate;
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -43,14 +43,6 @@ use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use std::{
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
sync::atomic::AtomicUsize,
};
use std::{
num::NonZeroUsize,
ops::{Deref, Range},
};
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
@@ -78,7 +70,9 @@ use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKin
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::TimelineMetrics;
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
@@ -598,10 +592,30 @@ impl Timeline {
ctx.task_kind()
);
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => {
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
return Ok(cached_img); // exact LSN match, return the image
}
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
}
Some((cached_lsn, cached_img))
}
None => None,
};
let mut reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: None,
scratch: Vec::with_capacity(2 * 8192), // for good measure
img: cached_page_img,
};
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
@@ -2299,16 +2313,6 @@ impl Timeline {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
let mut ctx = RequestContextBuilder::extend(ctx).build();
ctx.buf_cache = match REQ_LRU_SIZE.load(std::sync::atomic::Ordering::Relaxed) {
0 => None,
x => Some(Arc::new(Mutex::new(lru::LruCache::new(
// SAFETY: we just checked for 0 above
unsafe { NonZeroUsize::new_unchecked(x) },
)))),
};
let ctx = &ctx;
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2347,6 +2351,7 @@ impl Timeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(traversal_path);
}
if prev_lsn <= cont_lsn {
@@ -2550,6 +2555,26 @@ impl Timeline {
}
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
async fn lookup_cached_page(
&self,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
@@ -4373,6 +4398,8 @@ impl Timeline {
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let last_rec_lsn = data.records.last().unwrap().0;
let img = match self
.walredo_mgr
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
@@ -4383,6 +4410,23 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::WalRedo(e)),
};
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
if let Err(e) = cache
.memorize_materialized_page(
self.tenant_shard_id,
self.timeline_id,
key,
last_rec_lsn,
&img,
)
.await
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));
}
}
Ok(img)
}
}

View File

@@ -28,7 +28,7 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
pub(crate) mod io_engine;
mod io_engine;
mod open_options;
pub use io_engine::IoEngineKind;
pub(crate) use open_options::*;
@@ -403,7 +403,12 @@ impl VirtualFile {
Ok(vfile)
}
/// Async & [`VirtualFile`]-enabled version of [`::utils::crashsafe::overwrite`].
/// Writes a file to the specified `final_path` in a crash safe fasion
///
/// The file is first written to the specified tmp_path, and in a second
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,

View File

@@ -26,31 +26,23 @@ pub enum IoEngineKind {
TokioEpollUring,
}
static IO_ENGINE: std::sync::RwLock<Option<IoEngineKind>> = std::sync::RwLock::new(None);
pub(crate) fn set(engine: IoEngineKind) {
let mut guard = IO_ENGINE.write().unwrap();
*guard = Some(engine);
let metric = &crate::metrics::virtual_file_io_engine::KIND;
metric.reset();
metric.with_label_values(&[&format!("{engine}")]).set(1);
drop(guard);
}
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
#[cfg(not(test))]
pub(super) fn init(engine: IoEngineKind) {
set(engine);
if IO_ENGINE.set(engine).is_err() {
panic!("called twice");
}
crate::metrics::virtual_file_io_engine::KIND
.with_label_values(&[&format!("{engine}")])
.set(1);
}
pub(super) fn get() -> IoEngineKind {
pub(super) fn get() -> &'static IoEngineKind {
#[cfg(test)]
{
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
let guard = IO_ENGINE.read().unwrap();
if let Some(v) = guard.is_some() {
return v;
}
*guard = Some(match std::env::var(env_var_name) {
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
@@ -65,13 +57,10 @@ pub(super) fn get() -> IoEngineKind {
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
});
})
}
#[cfg(not(test))]
IO_ENGINE
.read()
.unwrap()
.expect("should have called set() or init() before")
IO_ENGINE.get().unwrap()
}
use std::os::unix::prelude::FileExt;

View File

@@ -1,12 +1,7 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use nix::libc;
use super::IoEngineKind;
use std::{
os::{fd::OwnedFd, unix::fs::OpenOptionsExt},
path::Path,
};
use std::{os::fd::OwnedFd, path::Path};
#[derive(Debug, Clone)]
pub enum OpenOptions {
@@ -97,18 +92,6 @@ impl OpenOptions {
self
}
pub fn custom_flags(&mut self, custom_flags: libc::c_int) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(custom_flags);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.custom_flags(custom_flags);
}
}
self
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),

View File

@@ -1033,7 +1033,23 @@ impl WalIngest {
// Copy content
debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
for blknum in 0..nblocks {
debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
// Sharding:
// - src and dst are always on the same shard, because they differ only by dbNode, and
// dbNode is not included in the hash inputs for sharding.
// - This WAL command is replayed on all shards, but each shard only copies the blocks
// that belong to it.
let src_key = rel_block_to_key(src_rel, blknum);
if !self.shard.is_key_local(&src_key) {
debug!(
"Skipping non-local key {} during XLOG_DBASE_CREATE",
src_key
);
continue;
}
debug!(
"copying block {} from {} ({}) to {}",
blknum, src_rel, src_key, dst_rel
);
let content = modification
.tline

View File

@@ -21,6 +21,7 @@
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use std::collections::VecDeque;
@@ -30,11 +31,10 @@ use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, Command};
use std::sync::{Arc, RwLock};
use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
@@ -73,12 +73,12 @@ pub(crate) struct BufferTag {
}
struct ProcessInput {
stdin: tokio::process::ChildStdin,
stdin: ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: tokio::process::ChildStdout,
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
@@ -112,8 +112,6 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
}
}
mod writebuf_pool;
///
/// Public interface of WAL redo manager
///
@@ -159,7 +157,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -180,7 +177,6 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
@@ -221,7 +217,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
async fn apply_batch_postgres(
fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -274,7 +270,6 @@ impl PostgresRedoManager {
let buf_tag = BufferTag { rel, blknum };
let result = proc
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
.await
.context("apply_wal_records");
let duration = started_at.elapsed();
@@ -652,8 +647,8 @@ struct WalRedoProcess {
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: tokio::sync::Mutex<ProcessOutput>,
stdin: tokio::sync::Mutex<ProcessInput>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -759,12 +754,12 @@ impl WalRedoProcess {
conf,
tenant_shard_id,
child: Some(child),
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling
stdin: Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
@@ -784,13 +779,15 @@ impl WalRedoProcess {
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
async fn apply_wal_records(
fn apply_wal_records(
&self,
tag: BufferTag,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -800,8 +797,7 @@ impl WalRedoProcess {
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
// TODO replace with allocation pool
let mut writebuf: writebuf_pool::PooledVecU8 = writebuf_pool::get();
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, img, &mut writebuf);
@@ -820,7 +816,7 @@ impl WalRedoProcess {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, wal_redo_timeout).await;
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
@@ -831,17 +827,38 @@ impl WalRedoProcess {
res
}
async fn apply_wal_records0(
fn apply_wal_records0(
&self,
writebuf: &[u8],
_wal_redo_timeout: Duration, // TODO respect
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().await;
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
proc.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
@@ -858,13 +875,40 @@ impl WalRedoProcess {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().await;
let mut output = self.stdout.lock().unwrap();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
output.stdout.read_exact(&mut resultbuf).await.unwrap();
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));

View File

@@ -1,40 +0,0 @@
use std::cell::RefCell;
use postgres_ffi::BLCKSZ;
pub struct PooledVecU8(Option<Vec<u8>>);
// Thread-local list of re-usable buffers.
thread_local! {
static POOL: RefCell<Vec<Vec<u8>>> = RefCell::new(Vec::new());
}
pub(crate) fn get() -> PooledVecU8 {
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
match maybe {
Some(buf) => PooledVecU8(Some(buf)),
None => PooledVecU8(Some(Vec::with_capacity((BLCKSZ as usize) * 3))),
}
}
impl Drop for PooledVecU8 {
fn drop(&mut self) {
let mut buf = self.0.take().unwrap();
buf.clear();
POOL.with(|rc| rc.borrow_mut().push(buf))
}
}
impl std::ops::Deref for PooledVecU8 {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
impl std::ops::DerefMut for PooledVecU8 {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().unwrap()
}
}

View File

@@ -1,36 +0,0 @@
run on i3en.3xlarge
admin@ip-172-31-13-23:[~/neon-main]: du -hs /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
225G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
=> ~2.25x main memory
admin@ip-172-31-13-23:[~/neon-main]: NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
--------------------------------------------------------------------------------- Benchmark results ---------------------------------------------------------------------------------
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.STD_FS
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2321
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 8,785.440 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.TOKIO_EPOLL_URING
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2200
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 9,046.271 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 16,457.727 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 16,457.727 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 16,457.727 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 16,457.727 ms
=========================================================================== 2 passed in 142.33s (0:02:22) ===========================================================================

View File

@@ -20,10 +20,10 @@ fi
# do all the on-disk initialization work now instead of a background kernel thread
# so that we're ready for benchmarking right after this line
#sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
MOUNTPOINT=/instance_store
sudo rmdir "$MOUNTPOINT" || sudo mkdir "$MOUNTPOINT"
sudo mkdir "$MOUNTPOINT"
sudo mount /dev/nvme1n1 "$MOUNTPOINT"
sudo chown -R "$(id -u)":"$(id -g)" "$MOUNTPOINT"
@@ -40,7 +40,7 @@ To run your local neon.git build on the instance store volume,
run the following commands from the top of the neon.git checkout
# raise file descriptor limit of your shell and its child processes
sudo prlimit -p \$\$ --nofile=800000:800000
sudo prlimit -p $$ --nofile=800000:800000
# test suite run
export TEST_OUTPUT="$TEST_OUTPUT"

View File

@@ -117,7 +117,10 @@ class NeonCompare(PgCompare):
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
# Start pg
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
config_lines = ["max_replication_write_lag=-1", "max_replication_flush_lag=-1"]
self._pg = self.env.endpoints.create_start(
branch_name, "main", self.tenant, config_lines=config_lines
)
@property
def pg(self) -> PgProtocol:
@@ -294,7 +297,7 @@ def remote_compare(zenbenchmark: NeonBenchmarker, remote_pg: RemotePostgres) ->
return RemoteCompare(zenbenchmark, remote_pg)
@pytest.fixture(params=["vanilla_compare", "neon_compare"], ids=["vanilla", "neon"])
@pytest.fixture(params=["neon_compare"], ids=["neon"])
def neon_with_baseline(request: FixtureRequest) -> PgCompare:
"""Parameterized fixture that helps compare neon against vanilla postgres.

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import abc
import asyncio
import concurrent.futures
import filecmp
import json
import os
@@ -993,6 +994,11 @@ class NeonEnv:
self.initial_timeline = config.initial_timeline
attachment_service_port = self.port_distributor.get_port()
# Reserve the next port after attachment service for use by its postgres: this
# will assert out if the next port wasn't free.
attachment_service_pg_port = self.port_distributor.get_port()
assert attachment_service_pg_port == attachment_service_port + 1
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
self, config.auth_enabled
@@ -1071,16 +1077,27 @@ class NeonEnv:
self.neon_cli.init(cfg, force=config.config_init_force)
def start(self):
# Start up broker, pageserver and all safekeepers
self.broker.try_start()
# Attachment service starts first, so that pageserver /re-attach calls don't
# bounce through retries on startup
self.attachment_service.start()
for pageserver in self.pageservers:
pageserver.start()
# Start up broker, pageserver and all safekeepers
futs = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=2 + len(self.pageservers) + len(self.safekeepers)
) as executor:
futs.append(
executor.submit(lambda: self.broker.try_start() or None)
) # The `or None` is for the linter
for safekeeper in self.safekeepers:
safekeeper.start()
for pageserver in self.pageservers:
futs.append(executor.submit(lambda ps=pageserver: ps.start()))
for safekeeper in self.safekeepers:
futs.append(executor.submit(lambda sk=safekeeper: sk.start()))
for f in futs:
f.result()
def stop(self, immediate=False, ps_assert_metric_no_errors=False):
"""
@@ -1652,8 +1669,10 @@ class NeonCli(AbstractNeonCli):
id: int,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
register: bool = True,
) -> "subprocess.CompletedProcess[str]":
start_args = ["pageserver", "start", f"--id={id}", *overrides]
register_str = "true" if register else "false"
start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"]
storage = self.env.pageserver_remote_storage
append_pageserver_param_overrides(
params_to_update=start_args,
@@ -2080,6 +2099,7 @@ class NeonPageserver(PgProtocol):
self,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
register: bool = True,
) -> "NeonPageserver":
"""
Start the page server.
@@ -2089,7 +2109,7 @@ class NeonPageserver(PgProtocol):
assert self.running is False
self.env.neon_cli.pageserver_start(
self.id, overrides=overrides, extra_env_vars=extra_env_vars
self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register
)
self.running = True
return self

View File

@@ -21,12 +21,21 @@ class Workload:
- reads, checking we get the right data (`validate`)
"""
def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
def __init__(
self,
env: NeonEnv,
tenant_id: TenantId,
timeline_id: TimelineId,
branch_name: Optional[str] = None,
):
self.env = env
self.tenant_id = tenant_id
self.timeline_id = timeline_id
self.table = "foo"
# By default, use the default branch name for initial tenant in NeonEnv
self.branch_name = branch_name or "main"
self.expect_rows = 0
self.churn_cursor = 0
@@ -35,7 +44,7 @@ class Workload:
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
"main",
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id="ep-workload",

View File

@@ -1,10 +1,7 @@
import enum
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import toml
import fixtures.pageserver.many_tenants as many_tenants
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
@@ -20,10 +17,6 @@ from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
class IoEngine(str, enum.Enum):
STD_FS = "std-fs"
TOKIO_EPOLL_URING = "tokio-epoll-uring"
# For reference, the space usage of the snapshots:
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
# 137G /instance_store/test_output/shared-snapshots
@@ -34,10 +27,9 @@ class IoEngine(str, enum.Enum):
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
@pytest.mark.parametrize("ioengine", [IoEngine.STD_FS, IoEngine.TOKIO_EPOLL_URING])
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100]])
@pytest.mark.parametrize("n_tenants", [1000])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
@pytest.mark.parametrize("n_tenants", [1, 10])
@pytest.mark.timeout(
10000
) # TODO: this value is just "a really high number"; have this per instance type
@@ -48,7 +40,6 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
n_tenants: int,
pgbench_scale: int,
duration: int,
ioengine: IoEngine,
):
def record(metric, **kwargs):
zenbenchmark.record(
@@ -69,12 +60,9 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
pageserver_config_override = {
"page_cache_size": f"{page_cache_size}",
"max_file_descriptors": f"{max_file_descriptors}",
"virtual_file_io_engine": f"\"{ioengine}\"",
}
neon_env_builder.pageserver_config_override = ";".join([f"{k}={v}" for k, v in pageserver_config_override.items()])
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
@@ -82,17 +70,12 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
"pageserver_config.override.virtual_file_io_engine": (ioengine, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
ps_http =env.pageserver.http_client()
for tenant_info in ps_http.tenant_list():
tenant_id = tenant_info["id"]
ps_http.patch_tenant_config_client_side(tenant_id, {"compaction_period": "10s"})
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)

View File

@@ -35,7 +35,7 @@ def init_pgbench(env: PgCompare, cmdline, password: None):
t0 = timeit.default_timer()
with env.record_pageserver_writes("init.pageserver_writes"):
out = env.pg_bin.run_capture(cmdline, env=environ)
env.flush()
# env.flush()
duration = timeit.default_timer() - t0
end_timestamp = utc_now_timestamp()
@@ -94,9 +94,7 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
if workload_type == PgBenchLoadType.INIT:
# Run initialize
init_pgbench(
env, ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr], password=password
)
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", "-I", "dtG", connstr], password=password)
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload
@@ -151,7 +149,7 @@ def get_durations_matrix(default: int = 45) -> List[int]:
return rv
def get_scales_matrix(default: int = 10) -> List[int]:
def get_scales_matrix(default: int = 100) -> List[int]:
scales = os.getenv("TEST_PG_BENCH_SCALES_MATRIX", default=str(default))
rv = []
for s in scales.split(","):
@@ -172,8 +170,8 @@ def get_scales_matrix(default: int = 10) -> List[int]:
@pytest.mark.parametrize("duration", get_durations_matrix())
def test_pgbench(neon_with_baseline: PgCompare, scale: int, duration: int):
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.INIT)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SIMPLE_UPDATE)
# run_test_pgbench(neon_with_baseline, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following 3 tests run on an existing database as it was set up by previous tests,

View File

@@ -138,6 +138,7 @@ def test_create_snapshot(
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
env.attachment_service.stop()
# Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it
compatibility_snapshot_dir = (
@@ -226,11 +227,17 @@ def test_forward_compatibility(
try:
neon_env_builder.num_safekeepers = 3
neon_local_binpath = neon_env_builder.neon_binpath
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",
neon_binpath=compatibility_neon_bin,
pg_distrib_dir=compatibility_postgres_distrib_dir,
)
# Use current neon_local even though we're using old binaries for
# everything else: our test code is written for latest CLI args.
env.neon_local_binpath = neon_local_binpath
neon_env_builder.start()
check_neon_works(

View File

@@ -163,6 +163,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
vanilla_pg.stop()
def test_import_from_pageserver_small(
pg_bin: PgBin, neon_env_builder: NeonEnvBuilder, test_output_dir: Path

View File

@@ -59,3 +59,5 @@ def test_neon_two_primary_endpoints_fail(
env.neon_cli.endpoint_stop("ep1")
# ep1 is stopped so create ep2 will succeed
env.neon_cli.endpoint_start("ep2")
# cleanup
env.neon_cli.endpoint_stop("ep2")

View File

@@ -499,7 +499,8 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# and serve clients.
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
env.pageserver.start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",)
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
register=False,
)
# The pageserver should provide service to clients

View File

@@ -1,4 +1,6 @@
import random
from contextlib import closing
from typing import Optional
import pytest
from fixtures.log_helper import log
@@ -141,18 +143,24 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
# Test that repeatedly kills and restarts the page server, while the
# safekeeper and compute node keep running.
@pytest.mark.timeout(540)
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str):
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pageserver_chaos(
neon_env_builder: NeonEnvBuilder, build_type: str, shard_count: Optional[int]
):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# these can happen, if we shutdown at a good time. to be fixed as part of #5172.
message = ".*duplicated L1 layer layer=.*"
env.pageserver.allowed_errors.append(message)
for ps in env.pageservers:
ps.allowed_errors.append(message)
# Use a tiny checkpoint distance, to create a lot of layers quickly.
# That allows us to stress the compaction and layer flushing logic more.
@@ -192,13 +200,19 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str):
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
# We run "random" kills using a fixed seed, to improve reproducibility if a test
# failure is related to a particular order of operations.
seed = 0xDEADBEEF
rng = random.Random(seed)
# Update the whole table, then immediately kill and restart the pageserver
for i in range(1, 15):
endpoint.safe_psql("UPDATE foo set updates = updates + 1")
# This kills the pageserver immediately, to simulate a crash
env.pageserver.stop(immediate=True)
env.pageserver.start()
to_kill = rng.choice(env.pageservers)
to_kill.stop(immediate=True)
to_kill.start()
# Check that all the updates are visible
num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0]

View File

@@ -2,25 +2,40 @@
# This file runs pg_regress-based tests.
#
from pathlib import Path
from typing import Optional
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
import pytest
from fixtures.neon_fixtures import (
NeonEnvBuilder,
check_restored_datadir_content,
)
from fixtures.remote_storage import s3_storage
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pg_regress(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: Optional[int],
):
env = neon_simple_env
"""
:param shard_count: if None, create an unsharded tenant. Otherwise create a tenant with this
many shards.
"""
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env.neon_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_pg_regress")
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.
@@ -61,22 +76,25 @@ def test_pg_regress(
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_isolation(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: Optional[int],
):
env = neon_simple_env
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env.neon_cli.create_branch("test_isolation", "empty")
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
endpoint = env.endpoints.create_start(
"test_isolation", config_lines=["max_prepared_transactions=100"]
)
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=100"])
endpoint.safe_psql("CREATE DATABASE isolation_regression")
# Create some local directories for pg_isolation_regress to run in.
@@ -114,19 +132,24 @@ def test_isolation(
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
@pytest.mark.parametrize("shard_count", [None, 4])
def test_sql_regress(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: Optional[int],
):
env = neon_simple_env
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env.neon_cli.create_branch("test_sql_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_sql_regress")
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.

View File

@@ -0,0 +1,85 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.remote_storage import s3_storage
from fixtures.types import TimelineId
from fixtures.workload import Workload
def test_sharding_smoke(
neon_env_builder: NeonEnvBuilder,
):
"""
Test the basic lifecycle of a sharded tenant:
- ingested data gets split up
- page service reads
- timeline creation and deletion
- splits
"""
shard_count = 4
neon_env_builder.num_pageservers = shard_count
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
# in number of 8KiB pages.
stripe_size = 128
# Use S3-compatible remote storage so that we can scrub: this test validates
# that the scrubber doesn't barf when it sees a sharded tenant.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
)
tenant_id = env.initial_tenant
pageservers = dict((int(p.id), p) for p in env.pageservers)
shards = env.attachment_service.locate(tenant_id)
def get_sizes():
sizes = {}
for shard in shards:
node_id = int(shard["node_id"])
pageserver = pageservers[node_id]
sizes[node_id] = pageserver.http_client().tenant_status(shard["shard_id"])[
"current_physical_size"
]
log.info(f"sizes = {sizes}")
return sizes
# Test that timeline creation works on a sharded tenant
timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id)
# Test that we can write data to a sharded tenant
workload = Workload(env, tenant_id, timeline_b, branch_name="branch_b")
workload.init()
sizes_before = get_sizes()
workload.write_rows(256)
# Test that we can read data back from a sharded tenant
workload.validate()
# Validate that the data is spread across pageservers
sizes_after = get_sizes()
# Our sizes increased when we wrote data
assert sum(sizes_after.values()) > sum(sizes_before.values())
# That increase is present on all shards
assert all(sizes_after[ps.id] > sizes_before[ps.id] for ps in env.pageservers)
# Validate that timeline list API works properly on all shards
for shard in shards:
node_id = int(shard["node_id"])
pageserver = pageservers[node_id]
timelines = set(
TimelineId(tl["timeline_id"])
for tl in pageserver.http_client().timeline_list(shard["shard_id"])
)
assert timelines == {env.initial_timeline, timeline_b}
# TODO: test timeline deletion and tenant deletion (depends on change in attachment_service)

View File

@@ -29,6 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
diesel = { version = "2", features = ["postgres", "serde_json"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] }
@@ -108,8 +109,10 @@ regex-automata = { version = "0.4", default-features = false, features = ["dfa-o
regex-syntax = { version = "0.8" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }