Compare commits

...

15 Commits

Author SHA1 Message Date
Anastasia Lubennikova
02f8650111 Add feature flag RemoteExtensionsUseLatest to compute_ctl.
This will allow us to test new versions of extensions, without waiting for main branch commit
2024-02-08 01:40:05 +00:00
Christian Schwarz
c561ad4e2e feat: expose locked memory in pageserver /metrics (#6669)
context: https://github.com/neondatabase/neon/issues/6667
2024-02-07 19:39:52 +00:00
John Spray
3bd2a4fd56 control_plane: avoid feedback loop with /location_config if compute hook fails. (#6668)
## Problem

The existing behavior isn't exactly incorrect, but is operationally
risky: if the control plane compute hook breaks, then all the control
plane operations trying to call /location_config will end up retrying
forever, which could put more load on the system.

## Summary of changes

- Treat 404s as fatal errors to do fewer retries: a 404 either indicates
we have the wrong URL, or some control plane bug is failing to recognize
our tenant ID as existing.
- Do not return an error on reconcilation errors in a non-creating
/location_config response: this allows the control plane to finish its
Operation (and we will eventually retry the compute notification later)
2024-02-07 19:14:18 +00:00
Tristan Partin
128fae7054 Update Postgres 16 to 16.2 2024-02-07 11:10:48 -08:00
Tristan Partin
5541244dc4 Update Postgres 15 to 15.6 2024-02-07 11:10:48 -08:00
Tristan Partin
2e9b1f7aaf Update Postgres 14 to 14.11 2024-02-07 11:10:48 -08:00
Christian Schwarz
51f9385b1b live-reconfigurable virtual_file::IoEngine (#6552)
This PR adds an API to live-reconfigure the VirtualFile io engine.

It also adds a flag to `pagebench get-page-latest-lsn`, which is where I
found this functionality to be useful: it helps compare the io engines
in a benchmark without re-compiling a release build, which took ~50s on
the i3en.3xlarge where I was doing the benchmark.

Switching the IO engine is completely safe at runtime.
2024-02-07 17:47:55 +00:00
Sasha Krassovsky
7b49e5e5c3 Remove compute migrations feature flag (#6653) 2024-02-07 07:55:55 -09:00
Abhijeet Patil
75f1a01d4a Optimise e2e run (#6513)
## Problem
We have finite amount of runners and intermediate results are often
wanted before a PR is ready for merging. Currently all PRs get e2e tests
run and this creates a lot of throwaway e2e results which may or may not
get to start or complete before a new push.

## Summary of changes

1. Skip e2e test when PR is in draft mode
2. Run e2e when PR status changes from draft to ready for review (change
this to having its trigger in below PR and update results of build and
test)
3. Abstract e2e test in a Separate workflow and call it from the main
workflow for the e2e test
5. Add a label, if that label is present run e2e test in draft
(run-e2e-test-in-draft)
6. Auto add a label(approve to ci) so that all the external contributors
PR , e2e run in draft
7. Document the new label changes and the above behaviour

Draft PR  : https://github.com/neondatabase/neon/actions/runs/7729128470
Ready To Review :
https://github.com/neondatabase/neon/actions/runs/7733779916
Draft PR with label :
https://github.com/neondatabase/neon/actions/runs/7725691012/job/21062432342
and https://github.com/neondatabase/neon/actions/runs/7733854028

## Checklist before requesting a review

- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-02-07 16:14:10 +00:00
John Spray
090a789408 storage controller: use PUT instead of POST (#6659)
This was a typo, the server expects PUT.
2024-02-07 13:24:10 +00:00
John Spray
3d4fe205ba control_plane/attachment_service: database connection pool (#6622)
## Problem

This is mainly to limit our concurrency, rather than to speed up
requests (I was doing some sanity checks on performance of the service
with thousands of shards)

## Summary of changes

- Enable the `diesel:r2d2` feature, which provides an async connection
pool
- Acquire a connection before entering spawn_blocking for a database
transaction (recall that diesel's interface is sync)
- Set a connection pool size of 99 to fit within default postgres limit
(100)
- Also set the tokio blocking thread count to accomodate the same number
of blocking tasks (the only thing we use spawn_blocking for is database
calls).
2024-02-07 13:08:09 +00:00
Arpad Müller
f7516df6c1 Pass timestamp as a datetime (#6656)
This saves some repetition. I did this in #6533 for
`tenant_time_travel_remote_storage` already.
2024-02-07 12:56:53 +01:00
Konstantin Knizhnik
f3d7d23805 Some small WAL records can write a lot of data to KV storage, so perform checkpoint check more frequently (#6639)
## Problem

See
https://neondb.slack.com/archives/C04DGM6SMTM/p1707149618314539?thread_ts=1707081520.140049&cid=C04DGM6SMTM

## Summary of changes


Perform checkpoint check after processing `ingest_batch_size` (default
100) WAL records.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-02-07 08:47:19 +02:00
Alexander Bayandin
9f75da7c0a test_lazy_startup: fix statement_timeout setting (#6654)
## Problem
Test `test_lazy_startup` is flaky[0], sometimes (pretty frequently) it
fails with `canceling statement due to statement timeout`.

- [0]
https://neon-github-public-dev.s3.amazonaws.com/reports/main/7803316870/index.html#suites/355b1a7a5b1e740b23ea53728913b4fa/7263782d30986c50/history

## Summary of changes
- Fix setting `statement_timeout` setting by reusing a connection for
all queries.
- Also fix label (`lazy`, `eager`) assignment  
- Split `test_lazy_startup` into two, by `slru` laziness and make tests smaller
2024-02-07 00:31:26 +00:00
Alexander Bayandin
f4cc7cae14 CI(build-tools): Update Python from 3.9.2 to 3.9.18 (#6615)
## Problem

We use an outdated version of Python (3.9.2)

## Summary of changes
- Update Python to the latest patch version (3.9.18)
- Unify the usage of python caches where possible
2024-02-06 20:30:43 +00:00
47 changed files with 638 additions and 245 deletions

View File

@@ -179,6 +179,12 @@ runs:
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}" aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi fi
- name: Cache poetry deps
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Store Allure test stat in the DB (new) - name: Store Allure test stat in the DB (new)
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }} if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
shell: bash -euxo pipefail {0} shell: bash -euxo pipefail {0}

View File

@@ -86,11 +86,10 @@ runs:
fetch-depth: 1 fetch-depth: 1
- name: Cache poetry deps - name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3 uses: actions/cache@v3
with: with:
path: ~/.cache/pypoetry/virtualenvs path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }} key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps - name: Install Python deps
shell: bash -euxo pipefail {0} shell: bash -euxo pipefail {0}

View File

@@ -93,6 +93,7 @@ jobs:
--body-file "body.md" \ --body-file "body.md" \
--head "${BRANCH}" \ --head "${BRANCH}" \
--base "main" \ --base "main" \
--label "run-e2e-tests-in-draft" \
--draft --draft
fi fi

View File

@@ -22,7 +22,7 @@ env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix # A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }} E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
jobs: jobs:
check-permissions: check-permissions:
@@ -112,11 +112,10 @@ jobs:
fetch-depth: 1 fetch-depth: 1
- name: Cache poetry deps - name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3 uses: actions/cache@v3
with: with:
path: ~/.cache/pypoetry/virtualenvs path: ~/.cache/pypoetry/virtualenvs
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }} key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps - name: Install Python deps
run: ./scripts/pysync run: ./scripts/pysync
@@ -693,50 +692,10 @@ jobs:
}) })
trigger-e2e-tests: trigger-e2e-tests:
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' }}
needs: [ check-permissions, promote-images, tag ] needs: [ check-permissions, promote-images, tag ]
runs-on: [ self-hosted, gen3, small ] uses: ./.github/workflows/trigger-e2e-tests.yml
container: secrets: inherit
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
# to place a job run status update later.
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
}
}"
neon-image: neon-image:
needs: [ check-permissions, build-buildtools-image, tag ] needs: [ check-permissions, build-buildtools-image, tag ]

View File

@@ -38,11 +38,10 @@ jobs:
uses: snok/install-poetry@v1 uses: snok/install-poetry@v1
- name: Cache poetry deps - name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3 uses: actions/cache@v3
with: with:
path: ~/.cache/pypoetry/virtualenvs path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }} key: v2-${{ runner.os }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
- name: Install Python deps - name: Install Python deps
shell: bash -euxo pipefail {0} shell: bash -euxo pipefail {0}

118
.github/workflows/trigger-e2e-tests.yml vendored Normal file
View File

@@ -0,0 +1,118 @@
name: Trigger E2E Tests
on:
pull_request:
types:
- ready_for_review
workflow_call:
defaults:
run:
shell: bash -euxo pipefail {0}
env:
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
jobs:
cancel-previous-e2e-tests:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- name: Cancel previous e2e-tests runs for this PR
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
gh workflow --repo neondatabase/cloud \
run cancel-previous-in-concurrency-group.yml \
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
tag:
runs-on: [ ubuntu-latest ]
outputs:
build-tag: ${{ steps.build-tag.outputs.tag }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Get build tag
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
CURRENT_BRANCH: ${{ github.head_ref || github.ref_name }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
fi
id: build-tag
trigger-e2e-tests:
needs: [ tag ]
runs-on: [ self-hosted, gen3, small ]
env:
TAG: ${{ needs.tag.outputs.build-tag }}
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
steps:
- name: check if ecr image are present
run: |
for REPO in neon compute-tools compute-node-v14 vm-compute-node-v14 compute-node-v15 vm-compute-node-v15 compute-node-v16 vm-compute-node-v16; do
OUTPUT=$(aws ecr describe-images --repository-name ${REPO} --region eu-central-1 --query "imageDetails[?imageTags[?contains(@, '${TAG}')]]" --output text)
if [ "$OUTPUT" == "" ]; then
echo "$REPO with image tag $TAG not found" >> $GITHUB_OUTPUT
exit 1
fi
done
- name: Set PR's status to pending and request a remote CI test
run: |
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
# to place a job run status update later.
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"storage_image_tag\": \"${TAG}\",
\"compute_image_tag\": \"${TAG}\",
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
}
}"

View File

@@ -54,6 +54,9 @@ _An instruction for maintainers_
- If and only if it looks **safe** (i.e. it doesn't contain any malicious code which could expose secrets or harm the CI), then: - If and only if it looks **safe** (i.e. it doesn't contain any malicious code which could expose secrets or harm the CI), then:
- Press the "Approve and run" button in GitHub UI - Press the "Approve and run" button in GitHub UI
- Add the `approved-for-ci-run` label to the PR - Add the `approved-for-ci-run` label to the PR
- Currently draft PR will skip e2e test (only for internal contributors). After turning the PR 'Ready to Review' CI will trigger e2e test
- Add `run-e2e-tests-in-draft` label to run e2e test in draft PR (override above behaviour)
- The `approved-for-ci-run` workflow will add `run-e2e-tests-in-draft` automatically to run e2e test for external contributors
Repeat all steps after any change to the PR. Repeat all steps after any change to the PR.
- When the changes are ready to get merged — merge the original PR (not the internal one) - When the changes are ready to get merged — merge the original PR (not the internal one)

26
Cargo.lock generated
View File

@@ -289,6 +289,7 @@ dependencies = [
"pageserver_api", "pageserver_api",
"pageserver_client", "pageserver_client",
"postgres_connection", "postgres_connection",
"r2d2",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@@ -1651,6 +1652,7 @@ dependencies = [
"diesel_derives", "diesel_derives",
"itoa", "itoa",
"pq-sys", "pq-sys",
"r2d2",
"serde_json", "serde_json",
] ]
@@ -2867,6 +2869,7 @@ dependencies = [
"chrono", "chrono",
"libc", "libc",
"once_cell", "once_cell",
"procfs",
"prometheus", "prometheus",
"rand 0.8.5", "rand 0.8.5",
"rand_distr", "rand_distr",
@@ -3984,6 +3987,8 @@ checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"byteorder", "byteorder",
"chrono",
"flate2",
"hex", "hex",
"lazy_static", "lazy_static",
"rustix 0.36.16", "rustix 0.36.16",
@@ -4166,6 +4171,17 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.7.3" version = "0.7.3"
@@ -4879,6 +4895,15 @@ dependencies = [
"windows-sys 0.42.0", "windows-sys 0.42.0",
] ]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@@ -6807,6 +6832,7 @@ dependencies = [
"clap_builder", "clap_builder",
"crossbeam-utils", "crossbeam-utils",
"diesel", "diesel",
"diesel_derives",
"either", "either",
"fail", "fail",
"futures-channel", "futures-channel",

View File

@@ -113,6 +113,7 @@ parquet = { version = "49.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "49.0.0" parquet_derive = "49.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] } pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2" pin-project-lite = "0.2"
procfs = "0.14"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11" prost = "0.11"
rand = "0.8" rand = "0.8"

View File

@@ -111,7 +111,7 @@ USER nonroot:nonroot
WORKDIR /home/nonroot WORKDIR /home/nonroot
# Python # Python
ENV PYTHON_VERSION=3.9.2 \ ENV PYTHON_VERSION=3.9.18 \
PYENV_ROOT=/home/nonroot/.pyenv \ PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \ RUN set -e \

View File

@@ -773,12 +773,11 @@ impl ComputeNode {
// 'Close' connection // 'Close' connection
drop(client); drop(client);
if self.has_feature(ComputeFeature::Migrations) { // Run migrations separately to not hold up cold starts
thread::spawn(move || { thread::spawn(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?; let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client) handle_migrations(&mut client)
}); });
}
Ok(()) Ok(())
} }
@@ -1236,10 +1235,19 @@ LIMIT 100",
info!("Downloading to shared preload libraries: {:?}", &libs_vec); info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let build_tag_str = if spec
.features
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
{
"latest"
} else {
&self.build_tag
};
let mut download_tasks = Vec::new(); let mut download_tasks = Vec::new();
for library in &libs_vec { for library in &libs_vec {
let (ext_name, ext_path) = let (ext_name, ext_path) =
remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?; remote_extensions.get_ext(library, true, build_tag_str, &self.pgversion)?;
download_tasks.push(self.download_extension(ext_name, ext_path)); download_tasks.push(self.download_extension(ext_name, ext_path));
} }
let results = join_all(download_tasks).await; let results = join_all(download_tasks).await;

View File

@@ -8,6 +8,7 @@ use std::thread;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest; use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use compute_api::spec::ComputeFeature;
use anyhow::Result; use anyhow::Result;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
@@ -171,12 +172,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
} }
}; };
remote_extensions.get_ext( let build_tag_str = if spec
&filename, .features
is_library, .contains(&ComputeFeature::RemoteExtensionsUseLatest)
&compute.build_tag, {
&compute.pgversion, "latest"
) } else {
&compute.build_tag
};
remote_extensions.get_ext(&filename, is_library, build_tag_str, &compute.pgversion)
}; };
match ext { match ext {

View File

@@ -24,8 +24,9 @@ tokio.workspace = true
tokio-util.workspace = true tokio-util.workspace = true
tracing.workspace = true tracing.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] } diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel_migrations = { version = "2.1.0" } diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
utils = { path = "../../libs/utils/" } utils = { path = "../../libs/utils/" }
metrics = { path = "../../libs/metrics/" } metrics = { path = "../../libs/metrics/" }

View File

@@ -170,7 +170,7 @@ impl ComputeHook {
reconfigure_request: &ComputeHookNotifyRequest, reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken, cancel: &CancellationToken,
) -> Result<(), NotifyError> { ) -> Result<(), NotifyError> {
let req = client.request(Method::POST, url); let req = client.request(Method::PUT, url);
let req = if let Some(value) = &self.authorization_header { let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value) req.header(reqwest::header::AUTHORIZATION, value)
} else { } else {
@@ -240,7 +240,7 @@ impl ComputeHook {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
backoff::retry( backoff::retry(
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel), || self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|e| matches!(e, NotifyError::Fatal(_)), |e| matches!(e, NotifyError::Fatal(_) | NotifyError::Unexpected(_)),
3, 3,
10, 10,
"Send compute notification", "Send compute notification",

View File

@@ -170,6 +170,7 @@ impl Secrets {
} }
} }
/// Execute the diesel migrations that are built into this binary
async fn migration_run(database_url: &str) -> anyhow::Result<()> { async fn migration_run(database_url: &str) -> anyhow::Result<()> {
use diesel::PgConnection; use diesel::PgConnection;
use diesel_migrations::{HarnessWithOutput, MigrationHarness}; use diesel_migrations::{HarnessWithOutput, MigrationHarness};
@@ -183,8 +184,18 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
#[tokio::main] fn main() -> anyhow::Result<()> {
async fn main() -> anyhow::Result<()> { tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
.enable_all()
.build()
.unwrap()
.block_on(async_main())
}
async fn async_main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate())); let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
logging::init( logging::init(

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration;
use camino::Utf8Path; use camino::Utf8Path;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
@@ -44,7 +45,7 @@ use crate::PlacementPolicy;
/// updated, and reads of nodes are always from memory, not the database. We only require that /// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline. /// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence { pub struct Persistence {
database_url: String, connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of // In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward // test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
@@ -64,6 +65,8 @@ pub(crate) enum DatabaseError {
Query(#[from] diesel::result::Error), Query(#[from] diesel::result::Error),
#[error(transparent)] #[error(transparent)]
Connection(#[from] diesel::result::ConnectionError), Connection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
ConnectionPool(#[from] r2d2::Error),
#[error("Logical error: {0}")] #[error("Logical error: {0}")]
Logical(String), Logical(String),
} }
@@ -71,9 +74,31 @@ pub(crate) enum DatabaseError {
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>; pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence { impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
pub const MAX_CONNECTIONS: u32 = 99;
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self { pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
// to execute queries (database queries are not generally on latency-sensitive paths).
let connection_pool = diesel::r2d2::Pool::builder()
.max_size(Self::MAX_CONNECTIONS)
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
// Always keep at least one connection ready to go
.min_idle(Some(1))
.test_on_check_out(true)
.build(manager)
.expect("Could not build connection pool");
Self { Self {
database_url, connection_pool,
json_path, json_path,
} }
} }
@@ -84,12 +109,8 @@ impl Persistence {
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static, F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static, R: Send + 'static,
{ {
let database_url = self.database_url.clone(); let mut conn = self.connection_pool.get()?;
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
// TODO: connection pooling, such as via diesel::r2d2
let mut conn = PgConnection::establish(&database_url)?;
func(&mut conn)
})
.await .await
.expect("Task panic") .expect("Task panic")
} }

View File

@@ -103,7 +103,9 @@ impl From<DatabaseError> for ApiError {
match err { match err {
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()), DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503. // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
DatabaseError::Connection(_e) => ApiError::ShuttingDown, DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
ApiError::ShuttingDown
}
DatabaseError::Logical(reason) => { DatabaseError::Logical(reason) => {
ApiError::InternalServerError(anyhow::anyhow!(reason)) ApiError::InternalServerError(anyhow::anyhow!(reason))
} }
@@ -987,7 +989,15 @@ impl Service {
.collect(); .collect();
} else { } else {
// This was an update, wait for reconciliation // This was an update, wait for reconciliation
self.await_waiters(waiters).await?; if let Err(e) = self.await_waiters(waiters).await {
// Do not treat a reconcile error as fatal: we have already applied any requested
// Intent changes, and the reconcile can fail for external reasons like unavailable
// compute notification API. In these cases, it is important that we do not
// cause the cloud control plane to retry forever on this API.
tracing::warn!(
"Failed to reconcile after /location_config: {e}, returning success anyway"
);
}
} }
Ok(result) Ok(result)

View File

@@ -90,8 +90,10 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity. /// track short-lived connections as user activity.
ActivityMonitorExperimental, ActivityMonitorExperimental,
/// Enable running migrations // Use latest version of remote extensions
Migrations, // This is needed to allow us to test new versions of extensions before
// they are merged into the main branch.
RemoteExtensionsUseLatest,
/// This is a special feature flag that is used to represent unknown feature flags. /// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test /// Basically all unknown to enum flags are represented as this one. See unit test
@@ -155,8 +157,12 @@ impl RemoteExtSpec {
// //
// Keep it in sync with path generation in // Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main // https://github.com/neondatabase/build-custom-extensions/tree/main
//
// if ComputeFeature::RemoteExtensionsUseLatest is enabled
// use "latest" as the build_tag
let archive_path_str = let archive_path_str =
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"); format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
Ok(( Ok((
real_ext_name.to_string(), real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?, RemotePath::from_string(&archive_path_str)?,

View File

@@ -13,6 +13,9 @@ twox-hash.workspace = true
workspace_hack.workspace = true workspace_hack.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
procfs.workspace = true
[dev-dependencies] [dev-dependencies]
rand = "0.8" rand = "0.8"
rand_distr = "0.4.3" rand_distr = "0.4.3"

View File

@@ -31,6 +31,8 @@ pub use wrappers::{CountedReader, CountedWriter};
mod hll; mod hll;
pub mod metric_vec_duration; pub mod metric_vec_duration;
pub use hll::{HyperLogLog, HyperLogLogVec}; pub use hll::{HyperLogLog, HyperLogLogVec};
#[cfg(target_os = "linux")]
pub mod more_process_metrics;
pub type UIntGauge = GenericGauge<AtomicU64>; pub type UIntGauge = GenericGauge<AtomicU64>;
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>; pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;

View File

@@ -0,0 +1,54 @@
//! process metrics that the [`::prometheus`] crate doesn't provide.
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
use crate::UIntGauge;
pub struct Collector {
descs: Vec<prometheus::core::Desc>,
vmlck: crate::UIntGauge,
}
const NMETRICS: usize = 1;
impl prometheus::core::Collector for Collector {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
let Ok(myself) = procfs::process::Process::myself() else {
return vec![];
};
let mut mfs = Vec::with_capacity(NMETRICS);
if let Ok(status) = myself.status() {
if let Some(vmlck) = status.vmlck {
self.vmlck.set(vmlck);
mfs.extend(self.vmlck.collect())
}
}
mfs
}
}
impl Collector {
pub fn new() -> Self {
let mut descs = Vec::new();
let vmlck =
UIntGauge::new("libmetrics_process_status_vmlck", "/proc/self/status vmlck").unwrap();
descs.extend(
prometheus::core::Collector::desc(&vmlck)
.into_iter()
.cloned(),
);
Self { descs, vmlck }
}
}
impl Default for Collector {
fn default() -> Self {
Self::new()
}
}

View File

@@ -649,6 +649,27 @@ pub struct WalRedoManagerStatus {
pub pid: Option<u32>, pub pid: Option<u32>,
} }
pub mod virtual_file {
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
StdFs,
#[cfg(target_os = "linux")]
TokioEpollUring,
}
}
// Wrapped in libpq CopyData // Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage { pub enum PagestreamFeMessage {

View File

@@ -453,9 +453,12 @@ mod tests {
event_mask: 0, event_mask: 0,
}), }),
expected_messages: vec![ expected_messages: vec![
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 }) // TODO: When updating Postgres versions, this test will cause
// problems. Postgres version in message needs updating.
//
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160002, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
vec![ vec![
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147, 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1, 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,

View File

@@ -339,4 +339,16 @@ impl Client {
.await .await
.map_err(Error::ReceiveBody) .map_err(Error::ReceiveBody)
} }
pub async fn put_io_engine(
&self,
engine: &pageserver_api::models::virtual_file::IoEngineKind,
) -> Result<()> {
let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, engine)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
} }

View File

@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100); pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize; let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> { async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::IoEngineKind::StdFs); virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100); page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?); let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?; let summary_blk = file.read_blk(0, ctx).await?;
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id, new_tenant_id,
new_timeline_id, new_timeline_id,
} => { } => {
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100); pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup // Basic initialization of things that don't change after startup
virtual_file::init(10, virtual_file::IoEngineKind::StdFs); virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100); page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -51,6 +51,10 @@ pub(crate) struct Args {
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction. /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
#[clap(long)] #[clap(long)]
keyspace_cache: Option<Utf8PathBuf>, keyspace_cache: Option<Utf8PathBuf>,
/// Before starting the benchmark, live-reconfigure the pageserver to use the given
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
targets: Option<Vec<TenantTimelineId>>, targets: Option<Vec<TenantTimelineId>>,
} }
@@ -109,6 +113,10 @@ async fn main_impl(
args.pageserver_jwt.as_deref(), args.pageserver_jwt.as_deref(),
)); ));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;
}
// discover targets // discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover( let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client, &mgmt_api_client,

View File

@@ -272,6 +272,8 @@ fn start_pageserver(
); );
set_build_info_metric(GIT_VERSION, BUILD_TAG); set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_launch_timestamp_metric(launch_ts); set_launch_timestamp_metric(launch_ts);
#[cfg(target_os = "linux")]
metrics::register_internal(Box::new(metrics::more_process_metrics::Collector::new())).unwrap();
pageserver::preinitialize_metrics(); pageserver::preinitialize_metrics();
// If any failpoints were set from FAILPOINTS environment variable, // If any failpoints were set from FAILPOINTS environment variable,

View File

@@ -1908,6 +1908,15 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ()) json_response(StatusCode::OK, ())
} }
async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
crate::virtual_file::io_engine::set(kind);
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers. /// Common functionality of all the HTTP API handlers.
/// ///
/// - Adds a tracing span to each request (by `request_span`) /// - Adds a tracing span to each request (by `request_span`)
@@ -2165,5 +2174,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace", "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace), |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
) )
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.any(handler_404)) .any(handler_404))
} }

View File

@@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection(
modification.commit(&ctx).await?; modification.commit(&ctx).await?;
uncommitted_records = 0; uncommitted_records = 0;
filtered_records = 0; filtered_records = 0;
//
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
// layer size can become much larger than `checkpoint_distance`.
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
// amount of data to key-value storage. So performing this check only after processing
// all WAL records in the chunk, can cause huge L0 layer files.
//
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
} }
} }

View File

@@ -28,9 +28,10 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant; use tokio::time::Instant;
use utils::fs_ext; use utils::fs_ext;
mod io_engine; pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
mod open_options; mod open_options;
pub use io_engine::IoEngineKind; pub(crate) use io_engine::IoEngineKind;
pub(crate) use open_options::*; pub(crate) use open_options::*;
/// ///

View File

@@ -7,42 +7,67 @@
//! //!
//! Then use [`get`] and [`super::OpenOptions`]. //! Then use [`get`] and [`super::OpenOptions`].
#[derive( pub(crate) use super::api::IoEngineKind;
Copy, #[derive(Clone, Copy)]
Clone, #[repr(u8)]
PartialEq, pub(crate) enum IoEngine {
Eq, NotSet,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
StdFs, StdFs,
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
TokioEpollUring, TokioEpollUring,
} }
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new(); impl From<IoEngineKind> for IoEngine {
fn from(value: IoEngineKind) -> Self {
#[cfg(not(test))] match value {
pub(super) fn init(engine: IoEngineKind) { IoEngineKind::StdFs => IoEngine::StdFs,
if IO_ENGINE.set(engine).is_err() { #[cfg(target_os = "linux")]
panic!("called twice"); IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
}
} }
crate::metrics::virtual_file_io_engine::KIND
.with_label_values(&[&format!("{engine}")])
.set(1);
} }
pub(super) fn get() -> &'static IoEngineKind { impl TryFrom<u8> for IoEngine {
#[cfg(test)] type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
#[cfg(target_os = "linux")]
v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
x => return Err(x),
})
}
}
static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
pub(crate) fn set(engine_kind: IoEngineKind) {
let engine: IoEngine = engine_kind.into();
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
#[cfg(not(test))]
{ {
let metric = &crate::metrics::virtual_file_io_engine::KIND;
metric.reset();
metric
.with_label_values(&[&format!("{engine_kind}")])
.set(1);
}
}
#[cfg(not(test))]
pub(super) fn init(engine_kind: IoEngineKind) {
set(engine_kind);
}
pub(super) fn get() -> IoEngine {
let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
if cfg!(test) {
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE"; let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) { match cur {
IoEngine::NotSet => {
let kind = match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() { Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind, Ok(engine_kind) => engine_kind,
Err(e) => { Err(e) => {
@@ -57,17 +82,25 @@ pub(super) fn get() -> &'static IoEngineKind {
Err(std::env::VarError::NotUnicode(_)) => { Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode"); panic!("env var {env_var_name} is not unicode");
} }
}) };
self::set(kind);
self::get()
}
x => x,
}
} else {
cur
} }
#[cfg(not(test))]
IO_ENGINE.get().unwrap()
} }
use std::os::unix::prelude::FileExt; use std::{
os::unix::prelude::FileExt,
sync::atomic::{AtomicU8, Ordering},
};
use super::FileGuard; use super::FileGuard;
impl IoEngineKind { impl IoEngine {
pub(super) async fn read_at<B>( pub(super) async fn read_at<B>(
&self, &self,
file_guard: FileGuard, file_guard: FileGuard,
@@ -78,7 +111,8 @@ impl IoEngineKind {
B: tokio_epoll_uring::BoundedBufMut + Send, B: tokio_epoll_uring::BoundedBufMut + Send,
{ {
match self { match self {
IoEngineKind::StdFs => { IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe { let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
@@ -96,7 +130,7 @@ impl IoEngineKind {
((file_guard, buf), res) ((file_guard, buf), res)
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => { IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await; let system = tokio_epoll_uring::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await; let (resources, res) = system.read(file_guard, offset, buf).await;
( (

View File

@@ -1,6 +1,6 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`]; //! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use super::IoEngineKind; use super::io_engine::IoEngine;
use std::{os::fd::OwnedFd, path::Path}; use std::{os::fd::OwnedFd, path::Path};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -13,9 +13,10 @@ pub enum OpenOptions {
impl Default for OpenOptions { impl Default for OpenOptions {
fn default() -> Self { fn default() -> Self {
match super::io_engine::get() { match super::io_engine::get() {
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()), IoEngine::NotSet => panic!("io engine not set"),
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => { IoEngine::TokioEpollUring => {
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new()) Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
} }
} }

View File

@@ -3131,10 +3131,7 @@ class Endpoint(PgProtocol):
log.info(json.dumps(dict(data_dict, **kwargs))) log.info(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4) json.dump(dict(data_dict, **kwargs), file, indent=4)
# Please note: if you didn't respec this endpoint to have the `migrations` # Please note: Migrations only run if pg_skip_catalog_updates is false
# feature, this function will probably fail because neon_migration.migration_id
# won't exist. This is temporary - soon we'll get rid of the feature flag and
# migrations will be enabled for everyone.
def wait_for_migrations(self): def wait_for_migrations(self):
with self.cursor() as cur: with self.cursor() as cur:

View File

@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
self, self,
tenant_id: Union[TenantId, TenantShardId], tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId, timeline_id: TimelineId,
timestamp, timestamp: datetime,
): ):
log.info( log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}" f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
) )
res = self.get( res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}", f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
) )
self.verbose_error(res) self.verbose_error(res)
res_json = res.json() res_json = res.json()

View File

@@ -26,36 +26,32 @@ from fixtures.neon_fixtures import NeonEnvBuilder
# apply during config step, like more users, databases, or extensions. By default # apply during config step, like more users, databases, or extensions. By default
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this # we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
# test we only load neon. # test we only load neon.
@pytest.mark.timeout(1000) @pytest.mark.timeout(1800)
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker): @pytest.mark.parametrize("slru", ["lazy", "eager"])
def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3 neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start() env = neon_env_builder.init_start()
lazy_tenant, _ = env.neon_cli.create_tenant( lazy_slru_download = "true" if slru == "lazy" else "false"
tenant, _ = env.neon_cli.create_tenant(
conf={ conf={
"lazy_slru_download": "true", "lazy_slru_download": lazy_slru_download,
} }
) )
eager_tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": "false",
}
)
tenants = [lazy_tenant, eager_tenant]
slru = "lazy"
for tenant in tenants:
endpoint = env.endpoints.create_start("main", tenant_id=tenant) endpoint = env.endpoints.create_start("main", tenant_id=tenant)
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)") with endpoint.cursor() as cur:
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)") cur.execute("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)") cur.execute("ALTER TABLE t SET (autovacuum_enabled = false)")
endpoint.safe_psql( cur.execute("INSERT INTO t VALUES (1, 0)")
cur.execute(
""" """
CREATE PROCEDURE updating() as CREATE PROCEDURE updating() as
$$ $$
DECLARE DECLARE
i integer; i integer;
BEGIN BEGIN
FOR i IN 1..10000000 LOOP FOR i IN 1..1000000 LOOP
UPDATE t SET x = x + 1 WHERE pk=1; UPDATE t SET x = x + 1 WHERE pk=1;
COMMIT; COMMIT;
END LOOP; END LOOP;
@@ -63,8 +59,8 @@ def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchm
$$ LANGUAGE plpgsql $$ LANGUAGE plpgsql
""" """
) )
endpoint.safe_psql("SET statement_timeout=0") cur.execute("SET statement_timeout=0")
endpoint.safe_psql("call updating()") cur.execute("call updating()")
endpoint.stop() endpoint.stop()
@@ -78,7 +74,7 @@ def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchm
with zenbenchmark.record_duration(f"{slru}_{i}_select"): with zenbenchmark.record_duration(f"{slru}_{i}_select"):
sum = endpoint.safe_psql("select sum(x) from t")[0][0] sum = endpoint.safe_psql("select sum(x) from t")[0][0]
assert sum == 10000000 assert sum == 1000000
# Get metrics # Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json() metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
@@ -108,4 +104,3 @@ def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchm
# Imitate optimizations that console would do for the second start # Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True) endpoint.respec(skip_pg_catalog_updates=True)
slru = "eager"

View File

@@ -0,0 +1,66 @@
import os
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
logical_replication_sync,
)
from fixtures.pg_version import PgVersion
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
if env.pg_version != PgVersion.V16:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
endpoint = env.endpoints.create_start(
"test_logical_replication", config_lines=["log_statement=all"]
)
log.info("postgres is running on 'test_logical_replication' branch")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# create table...
cur.execute("create table t(pk integer primary key)")
cur.execute("create publication pub1 for table t")
# Create slot to hold WAL
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key)")
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
cur.execute(
"""create or replace function create_snapshots(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
perform pg_log_standby_snapshot();
end loop;
end; $$ language plpgsql"""
)
cur.execute("set statement_timeout=0")
cur.execute("select create_snapshots(10000)")
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
time.sleep(10)
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, env.initial_tenant, timeline
)
log.info(f"Check {timeline_path}")
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
assert os.path.getsize(timeline_path + filename) < 512_000_000

View File

@@ -64,18 +64,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Check edge cases # Check edge cases
# Timestamp is in the future # Timestamp is in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1) probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp( result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] == "future" assert result["kind"] == "future"
# make sure that we return a well advanced lsn here # make sure that we return a well advanced lsn here
assert Lsn(result["lsn"]) > start_lsn assert Lsn(result["lsn"]) > start_lsn
# Timestamp is in the unreachable past # Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10) probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp( result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] == "past" assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range # make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) < start_lsn assert Lsn(result["lsn"]) < start_lsn
@@ -83,9 +79,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Probe a bunch of timestamps in the valid range # Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100): for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1] probe_timestamp = tbl[i][1]
result = client.timeline_get_lsn_by_timestamp( result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] not in ["past", "nodata"] assert result["kind"] not in ["past", "nodata"]
lsn = result["lsn"] lsn = result["lsn"]
# Call get_lsn_by_timestamp to get the LSN # Call get_lsn_by_timestamp to get the LSN
@@ -108,9 +102,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Timestamp is in the unreachable past # Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10) probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp( result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
)
assert result["kind"] == "past" assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range # make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) >= last_flush_lsn assert Lsn(result["lsn"]) >= last_flush_lsn

View File

@@ -10,7 +10,7 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint = env.endpoints.create("test_migrations") endpoint = env.endpoints.create("test_migrations")
log_path = endpoint.endpoint_path() / "compute.log" log_path = endpoint.endpoint_path() / "compute.log"
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"]) endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start() endpoint.start()
endpoint.wait_for_migrations() endpoint.wait_for_migrations()

View File

@@ -12,10 +12,10 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
env.neon_cli.create_branch("test_neon_superuser_subscriber") env.neon_cli.create_branch("test_neon_superuser_subscriber")
sub = env.endpoints.create("test_neon_superuser_subscriber") sub = env.endpoints.create("test_neon_superuser_subscriber")
pub.respec(skip_pg_catalog_updates=False, features=["migrations"]) pub.respec(skip_pg_catalog_updates=False)
pub.start() pub.start()
sub.respec(skip_pg_catalog_updates=False, features=["migrations"]) sub.respec(skip_pg_catalog_updates=False)
sub.start() sub.start()
pub.wait_for_migrations() pub.wait_for_migrations()

View File

@@ -310,7 +310,7 @@ def test_sharding_service_compute_hook(
notifications.append(request.json) notifications.append(request.json)
return Response(status=200) return Response(status=200)
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler) httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running # Start running
env = neon_env_builder.init_start() env = neon_env_builder.init_start()

View File

@@ -1,5 +1,5 @@
{ {
"postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a", "postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
"postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535", "postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
"postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88" "postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
} }

View File

@@ -29,7 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] } clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] } clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" } crossbeam-utils = { version = "0.8" }
diesel = { version = "2", features = ["postgres", "serde_json"] } diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
either = { version = "1" } either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] } futures-channel = { version = "0.3", features = ["sink"] }
@@ -90,6 +90,7 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] } bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] } cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
either = { version = "1" } either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }