mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-30 08:40:37 +00:00
Compare commits
15 Commits
statement_
...
RemoteExte
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
02f8650111 | ||
|
|
c561ad4e2e | ||
|
|
3bd2a4fd56 | ||
|
|
128fae7054 | ||
|
|
5541244dc4 | ||
|
|
2e9b1f7aaf | ||
|
|
51f9385b1b | ||
|
|
7b49e5e5c3 | ||
|
|
75f1a01d4a | ||
|
|
090a789408 | ||
|
|
3d4fe205ba | ||
|
|
f7516df6c1 | ||
|
|
f3d7d23805 | ||
|
|
9f75da7c0a | ||
|
|
f4cc7cae14 |
@@ -179,6 +179,12 @@ runs:
|
|||||||
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
|
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
- name: Cache poetry deps
|
||||||
|
uses: actions/cache@v3
|
||||||
|
with:
|
||||||
|
path: ~/.cache/pypoetry/virtualenvs
|
||||||
|
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||||
|
|
||||||
- name: Store Allure test stat in the DB (new)
|
- name: Store Allure test stat in the DB (new)
|
||||||
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
||||||
shell: bash -euxo pipefail {0}
|
shell: bash -euxo pipefail {0}
|
||||||
|
|||||||
@@ -86,11 +86,10 @@ runs:
|
|||||||
fetch-depth: 1
|
fetch-depth: 1
|
||||||
|
|
||||||
- name: Cache poetry deps
|
- name: Cache poetry deps
|
||||||
id: cache_poetry
|
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v3
|
||||||
with:
|
with:
|
||||||
path: ~/.cache/pypoetry/virtualenvs
|
path: ~/.cache/pypoetry/virtualenvs
|
||||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||||
|
|
||||||
- name: Install Python deps
|
- name: Install Python deps
|
||||||
shell: bash -euxo pipefail {0}
|
shell: bash -euxo pipefail {0}
|
||||||
|
|||||||
1
.github/workflows/approved-for-ci-run.yml
vendored
1
.github/workflows/approved-for-ci-run.yml
vendored
@@ -93,6 +93,7 @@ jobs:
|
|||||||
--body-file "body.md" \
|
--body-file "body.md" \
|
||||||
--head "${BRANCH}" \
|
--head "${BRANCH}" \
|
||||||
--base "main" \
|
--base "main" \
|
||||||
|
--label "run-e2e-tests-in-draft" \
|
||||||
--draft
|
--draft
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|||||||
51
.github/workflows/build_and_test.yml
vendored
51
.github/workflows/build_and_test.yml
vendored
@@ -22,7 +22,7 @@ env:
|
|||||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||||
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
|
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
|
||||||
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
check-permissions:
|
check-permissions:
|
||||||
@@ -112,11 +112,10 @@ jobs:
|
|||||||
fetch-depth: 1
|
fetch-depth: 1
|
||||||
|
|
||||||
- name: Cache poetry deps
|
- name: Cache poetry deps
|
||||||
id: cache_poetry
|
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v3
|
||||||
with:
|
with:
|
||||||
path: ~/.cache/pypoetry/virtualenvs
|
path: ~/.cache/pypoetry/virtualenvs
|
||||||
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
|
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||||
|
|
||||||
- name: Install Python deps
|
- name: Install Python deps
|
||||||
run: ./scripts/pysync
|
run: ./scripts/pysync
|
||||||
@@ -693,50 +692,10 @@ jobs:
|
|||||||
})
|
})
|
||||||
|
|
||||||
trigger-e2e-tests:
|
trigger-e2e-tests:
|
||||||
|
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' }}
|
||||||
needs: [ check-permissions, promote-images, tag ]
|
needs: [ check-permissions, promote-images, tag ]
|
||||||
runs-on: [ self-hosted, gen3, small ]
|
uses: ./.github/workflows/trigger-e2e-tests.yml
|
||||||
container:
|
secrets: inherit
|
||||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
|
||||||
options: --init
|
|
||||||
steps:
|
|
||||||
- name: Set PR's status to pending and request a remote CI test
|
|
||||||
run: |
|
|
||||||
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
|
|
||||||
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
|
|
||||||
# to place a job run status update later.
|
|
||||||
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
|
|
||||||
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
|
|
||||||
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
|
|
||||||
|
|
||||||
REMOTE_REPO="${{ github.repository_owner }}/cloud"
|
|
||||||
|
|
||||||
curl -f -X POST \
|
|
||||||
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
|
|
||||||
-H "Accept: application/vnd.github.v3+json" \
|
|
||||||
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
|
||||||
--data \
|
|
||||||
"{
|
|
||||||
\"state\": \"pending\",
|
|
||||||
\"context\": \"neon-cloud-e2e\",
|
|
||||||
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
|
|
||||||
}"
|
|
||||||
|
|
||||||
curl -f -X POST \
|
|
||||||
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
|
|
||||||
-H "Accept: application/vnd.github.v3+json" \
|
|
||||||
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
|
||||||
--data \
|
|
||||||
"{
|
|
||||||
\"ref\": \"main\",
|
|
||||||
\"inputs\": {
|
|
||||||
\"ci_job_name\": \"neon-cloud-e2e\",
|
|
||||||
\"commit_hash\": \"$COMMIT_SHA\",
|
|
||||||
\"remote_repo\": \"${{ github.repository }}\",
|
|
||||||
\"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
|
|
||||||
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
|
|
||||||
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
|
|
||||||
}
|
|
||||||
}"
|
|
||||||
|
|
||||||
neon-image:
|
neon-image:
|
||||||
needs: [ check-permissions, build-buildtools-image, tag ]
|
needs: [ check-permissions, build-buildtools-image, tag ]
|
||||||
|
|||||||
3
.github/workflows/pg_clients.yml
vendored
3
.github/workflows/pg_clients.yml
vendored
@@ -38,11 +38,10 @@ jobs:
|
|||||||
uses: snok/install-poetry@v1
|
uses: snok/install-poetry@v1
|
||||||
|
|
||||||
- name: Cache poetry deps
|
- name: Cache poetry deps
|
||||||
id: cache_poetry
|
|
||||||
uses: actions/cache@v3
|
uses: actions/cache@v3
|
||||||
with:
|
with:
|
||||||
path: ~/.cache/pypoetry/virtualenvs
|
path: ~/.cache/pypoetry/virtualenvs
|
||||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
key: v2-${{ runner.os }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
|
||||||
|
|
||||||
- name: Install Python deps
|
- name: Install Python deps
|
||||||
shell: bash -euxo pipefail {0}
|
shell: bash -euxo pipefail {0}
|
||||||
|
|||||||
118
.github/workflows/trigger-e2e-tests.yml
vendored
Normal file
118
.github/workflows/trigger-e2e-tests.yml
vendored
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
name: Trigger E2E Tests
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
types:
|
||||||
|
- ready_for_review
|
||||||
|
workflow_call:
|
||||||
|
|
||||||
|
defaults:
|
||||||
|
run:
|
||||||
|
shell: bash -euxo pipefail {0}
|
||||||
|
|
||||||
|
env:
|
||||||
|
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
|
||||||
|
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||||
|
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||||
|
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
cancel-previous-e2e-tests:
|
||||||
|
if: github.event_name == 'pull_request'
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Cancel previous e2e-tests runs for this PR
|
||||||
|
env:
|
||||||
|
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||||
|
run: |
|
||||||
|
gh workflow --repo neondatabase/cloud \
|
||||||
|
run cancel-previous-in-concurrency-group.yml \
|
||||||
|
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
|
||||||
|
|
||||||
|
tag:
|
||||||
|
runs-on: [ ubuntu-latest ]
|
||||||
|
outputs:
|
||||||
|
build-tag: ${{ steps.build-tag.outputs.tag }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
|
||||||
|
- name: Get build tag
|
||||||
|
env:
|
||||||
|
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||||
|
CURRENT_BRANCH: ${{ github.head_ref || github.ref_name }}
|
||||||
|
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||||
|
run: |
|
||||||
|
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||||
|
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
|
||||||
|
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||||
|
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
|
||||||
|
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
|
||||||
|
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
id: build-tag
|
||||||
|
|
||||||
|
trigger-e2e-tests:
|
||||||
|
needs: [ tag ]
|
||||||
|
runs-on: [ self-hosted, gen3, small ]
|
||||||
|
env:
|
||||||
|
TAG: ${{ needs.tag.outputs.build-tag }}
|
||||||
|
container:
|
||||||
|
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||||
|
options: --init
|
||||||
|
steps:
|
||||||
|
- name: check if ecr image are present
|
||||||
|
run: |
|
||||||
|
for REPO in neon compute-tools compute-node-v14 vm-compute-node-v14 compute-node-v15 vm-compute-node-v15 compute-node-v16 vm-compute-node-v16; do
|
||||||
|
OUTPUT=$(aws ecr describe-images --repository-name ${REPO} --region eu-central-1 --query "imageDetails[?imageTags[?contains(@, '${TAG}')]]" --output text)
|
||||||
|
if [ "$OUTPUT" == "" ]; then
|
||||||
|
echo "$REPO with image tag $TAG not found" >> $GITHUB_OUTPUT
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
- name: Set PR's status to pending and request a remote CI test
|
||||||
|
run: |
|
||||||
|
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
|
||||||
|
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
|
||||||
|
# to place a job run status update later.
|
||||||
|
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
|
||||||
|
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
|
||||||
|
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
|
||||||
|
|
||||||
|
REMOTE_REPO="${{ github.repository_owner }}/cloud"
|
||||||
|
|
||||||
|
curl -f -X POST \
|
||||||
|
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
|
||||||
|
-H "Accept: application/vnd.github.v3+json" \
|
||||||
|
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
||||||
|
--data \
|
||||||
|
"{
|
||||||
|
\"state\": \"pending\",
|
||||||
|
\"context\": \"neon-cloud-e2e\",
|
||||||
|
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
|
||||||
|
}"
|
||||||
|
|
||||||
|
curl -f -X POST \
|
||||||
|
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
|
||||||
|
-H "Accept: application/vnd.github.v3+json" \
|
||||||
|
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
|
||||||
|
--data \
|
||||||
|
"{
|
||||||
|
\"ref\": \"main\",
|
||||||
|
\"inputs\": {
|
||||||
|
\"ci_job_name\": \"neon-cloud-e2e\",
|
||||||
|
\"commit_hash\": \"$COMMIT_SHA\",
|
||||||
|
\"remote_repo\": \"${{ github.repository }}\",
|
||||||
|
\"storage_image_tag\": \"${TAG}\",
|
||||||
|
\"compute_image_tag\": \"${TAG}\",
|
||||||
|
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
|
||||||
|
}
|
||||||
|
}"
|
||||||
|
|
||||||
@@ -54,6 +54,9 @@ _An instruction for maintainers_
|
|||||||
- If and only if it looks **safe** (i.e. it doesn't contain any malicious code which could expose secrets or harm the CI), then:
|
- If and only if it looks **safe** (i.e. it doesn't contain any malicious code which could expose secrets or harm the CI), then:
|
||||||
- Press the "Approve and run" button in GitHub UI
|
- Press the "Approve and run" button in GitHub UI
|
||||||
- Add the `approved-for-ci-run` label to the PR
|
- Add the `approved-for-ci-run` label to the PR
|
||||||
|
- Currently draft PR will skip e2e test (only for internal contributors). After turning the PR 'Ready to Review' CI will trigger e2e test
|
||||||
|
- Add `run-e2e-tests-in-draft` label to run e2e test in draft PR (override above behaviour)
|
||||||
|
- The `approved-for-ci-run` workflow will add `run-e2e-tests-in-draft` automatically to run e2e test for external contributors
|
||||||
|
|
||||||
Repeat all steps after any change to the PR.
|
Repeat all steps after any change to the PR.
|
||||||
- When the changes are ready to get merged — merge the original PR (not the internal one)
|
- When the changes are ready to get merged — merge the original PR (not the internal one)
|
||||||
|
|||||||
26
Cargo.lock
generated
26
Cargo.lock
generated
@@ -289,6 +289,7 @@ dependencies = [
|
|||||||
"pageserver_api",
|
"pageserver_api",
|
||||||
"pageserver_client",
|
"pageserver_client",
|
||||||
"postgres_connection",
|
"postgres_connection",
|
||||||
|
"r2d2",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@@ -1651,6 +1652,7 @@ dependencies = [
|
|||||||
"diesel_derives",
|
"diesel_derives",
|
||||||
"itoa",
|
"itoa",
|
||||||
"pq-sys",
|
"pq-sys",
|
||||||
|
"r2d2",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -2867,6 +2869,7 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"libc",
|
"libc",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
|
"procfs",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rand_distr",
|
"rand_distr",
|
||||||
@@ -3984,6 +3987,8 @@ checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 1.3.2",
|
"bitflags 1.3.2",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
"chrono",
|
||||||
|
"flate2",
|
||||||
"hex",
|
"hex",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"rustix 0.36.16",
|
"rustix 0.36.16",
|
||||||
@@ -4166,6 +4171,17 @@ dependencies = [
|
|||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "r2d2"
|
||||||
|
version = "0.8.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||||
|
dependencies = [
|
||||||
|
"log",
|
||||||
|
"parking_lot 0.12.1",
|
||||||
|
"scheduled-thread-pool",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rand"
|
name = "rand"
|
||||||
version = "0.7.3"
|
version = "0.7.3"
|
||||||
@@ -4879,6 +4895,15 @@ dependencies = [
|
|||||||
"windows-sys 0.42.0",
|
"windows-sys 0.42.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "scheduled-thread-pool"
|
||||||
|
version = "0.2.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
|
||||||
|
dependencies = [
|
||||||
|
"parking_lot 0.12.1",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scopeguard"
|
name = "scopeguard"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
@@ -6807,6 +6832,7 @@ dependencies = [
|
|||||||
"clap_builder",
|
"clap_builder",
|
||||||
"crossbeam-utils",
|
"crossbeam-utils",
|
||||||
"diesel",
|
"diesel",
|
||||||
|
"diesel_derives",
|
||||||
"either",
|
"either",
|
||||||
"fail",
|
"fail",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
|
|||||||
@@ -113,6 +113,7 @@ parquet = { version = "49.0.0", default-features = false, features = ["zstd"] }
|
|||||||
parquet_derive = "49.0.0"
|
parquet_derive = "49.0.0"
|
||||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||||
pin-project-lite = "0.2"
|
pin-project-lite = "0.2"
|
||||||
|
procfs = "0.14"
|
||||||
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
||||||
prost = "0.11"
|
prost = "0.11"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ USER nonroot:nonroot
|
|||||||
WORKDIR /home/nonroot
|
WORKDIR /home/nonroot
|
||||||
|
|
||||||
# Python
|
# Python
|
||||||
ENV PYTHON_VERSION=3.9.2 \
|
ENV PYTHON_VERSION=3.9.18 \
|
||||||
PYENV_ROOT=/home/nonroot/.pyenv \
|
PYENV_ROOT=/home/nonroot/.pyenv \
|
||||||
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
|
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
|
||||||
RUN set -e \
|
RUN set -e \
|
||||||
|
|||||||
@@ -773,12 +773,11 @@ impl ComputeNode {
|
|||||||
// 'Close' connection
|
// 'Close' connection
|
||||||
drop(client);
|
drop(client);
|
||||||
|
|
||||||
if self.has_feature(ComputeFeature::Migrations) {
|
// Run migrations separately to not hold up cold starts
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let mut client = Client::connect(connstr.as_str(), NoTls)?;
|
let mut client = Client::connect(connstr.as_str(), NoTls)?;
|
||||||
handle_migrations(&mut client)
|
handle_migrations(&mut client)
|
||||||
});
|
});
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1236,10 +1235,19 @@ LIMIT 100",
|
|||||||
|
|
||||||
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
||||||
|
|
||||||
|
let build_tag_str = if spec
|
||||||
|
.features
|
||||||
|
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
|
||||||
|
{
|
||||||
|
"latest"
|
||||||
|
} else {
|
||||||
|
&self.build_tag
|
||||||
|
};
|
||||||
|
|
||||||
let mut download_tasks = Vec::new();
|
let mut download_tasks = Vec::new();
|
||||||
for library in &libs_vec {
|
for library in &libs_vec {
|
||||||
let (ext_name, ext_path) =
|
let (ext_name, ext_path) =
|
||||||
remote_extensions.get_ext(library, true, &self.build_tag, &self.pgversion)?;
|
remote_extensions.get_ext(library, true, build_tag_str, &self.pgversion)?;
|
||||||
download_tasks.push(self.download_extension(ext_name, ext_path));
|
download_tasks.push(self.download_extension(ext_name, ext_path));
|
||||||
}
|
}
|
||||||
let results = join_all(download_tasks).await;
|
let results = join_all(download_tasks).await;
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use std::thread;
|
|||||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||||
use compute_api::requests::ConfigurationRequest;
|
use compute_api::requests::ConfigurationRequest;
|
||||||
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
|
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
|
||||||
|
use compute_api::spec::ComputeFeature;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use hyper::service::{make_service_fn, service_fn};
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
@@ -171,12 +172,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
remote_extensions.get_ext(
|
let build_tag_str = if spec
|
||||||
&filename,
|
.features
|
||||||
is_library,
|
.contains(&ComputeFeature::RemoteExtensionsUseLatest)
|
||||||
&compute.build_tag,
|
{
|
||||||
&compute.pgversion,
|
"latest"
|
||||||
)
|
} else {
|
||||||
|
&compute.build_tag
|
||||||
|
};
|
||||||
|
|
||||||
|
remote_extensions.get_ext(&filename, is_library, build_tag_str, &compute.pgversion)
|
||||||
};
|
};
|
||||||
|
|
||||||
match ext {
|
match ext {
|
||||||
|
|||||||
@@ -24,8 +24,9 @@ tokio.workspace = true
|
|||||||
tokio-util.workspace = true
|
tokio-util.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
|
||||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
|
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
|
||||||
diesel_migrations = { version = "2.1.0" }
|
diesel_migrations = { version = "2.1.0" }
|
||||||
|
r2d2 = { version = "0.8.10" }
|
||||||
|
|
||||||
utils = { path = "../../libs/utils/" }
|
utils = { path = "../../libs/utils/" }
|
||||||
metrics = { path = "../../libs/metrics/" }
|
metrics = { path = "../../libs/metrics/" }
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ impl ComputeHook {
|
|||||||
reconfigure_request: &ComputeHookNotifyRequest,
|
reconfigure_request: &ComputeHookNotifyRequest,
|
||||||
cancel: &CancellationToken,
|
cancel: &CancellationToken,
|
||||||
) -> Result<(), NotifyError> {
|
) -> Result<(), NotifyError> {
|
||||||
let req = client.request(Method::POST, url);
|
let req = client.request(Method::PUT, url);
|
||||||
let req = if let Some(value) = &self.authorization_header {
|
let req = if let Some(value) = &self.authorization_header {
|
||||||
req.header(reqwest::header::AUTHORIZATION, value)
|
req.header(reqwest::header::AUTHORIZATION, value)
|
||||||
} else {
|
} else {
|
||||||
@@ -240,7 +240,7 @@ impl ComputeHook {
|
|||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
backoff::retry(
|
backoff::retry(
|
||||||
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|
||||||
|e| matches!(e, NotifyError::Fatal(_)),
|
|e| matches!(e, NotifyError::Fatal(_) | NotifyError::Unexpected(_)),
|
||||||
3,
|
3,
|
||||||
10,
|
10,
|
||||||
"Send compute notification",
|
"Send compute notification",
|
||||||
|
|||||||
@@ -170,6 +170,7 @@ impl Secrets {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execute the diesel migrations that are built into this binary
|
||||||
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||||
use diesel::PgConnection;
|
use diesel::PgConnection;
|
||||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||||
@@ -183,8 +184,18 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
fn main() -> anyhow::Result<()> {
|
||||||
async fn main() -> anyhow::Result<()> {
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
// We use spawn_blocking for database operations, so require approximately
|
||||||
|
// as many blocking threads as we will open database connections.
|
||||||
|
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(async_main())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn async_main() -> anyhow::Result<()> {
|
||||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||||
|
|
||||||
logging::init(
|
logging::init(
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use camino::Utf8Path;
|
use camino::Utf8Path;
|
||||||
use camino::Utf8PathBuf;
|
use camino::Utf8PathBuf;
|
||||||
@@ -44,7 +45,7 @@ use crate::PlacementPolicy;
|
|||||||
/// updated, and reads of nodes are always from memory, not the database. We only require that
|
/// updated, and reads of nodes are always from memory, not the database. We only require that
|
||||||
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
|
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
|
||||||
pub struct Persistence {
|
pub struct Persistence {
|
||||||
database_url: String,
|
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
|
||||||
|
|
||||||
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
|
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
|
||||||
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
|
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
|
||||||
@@ -64,6 +65,8 @@ pub(crate) enum DatabaseError {
|
|||||||
Query(#[from] diesel::result::Error),
|
Query(#[from] diesel::result::Error),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Connection(#[from] diesel::result::ConnectionError),
|
Connection(#[from] diesel::result::ConnectionError),
|
||||||
|
#[error(transparent)]
|
||||||
|
ConnectionPool(#[from] r2d2::Error),
|
||||||
#[error("Logical error: {0}")]
|
#[error("Logical error: {0}")]
|
||||||
Logical(String),
|
Logical(String),
|
||||||
}
|
}
|
||||||
@@ -71,9 +74,31 @@ pub(crate) enum DatabaseError {
|
|||||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||||
|
|
||||||
impl Persistence {
|
impl Persistence {
|
||||||
|
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
|
||||||
|
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
|
||||||
|
pub const MAX_CONNECTIONS: u32 = 99;
|
||||||
|
|
||||||
|
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
|
||||||
|
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
|
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
|
||||||
|
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||||
|
|
||||||
|
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
|
||||||
|
// to execute queries (database queries are not generally on latency-sensitive paths).
|
||||||
|
let connection_pool = diesel::r2d2::Pool::builder()
|
||||||
|
.max_size(Self::MAX_CONNECTIONS)
|
||||||
|
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
|
||||||
|
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
|
||||||
|
// Always keep at least one connection ready to go
|
||||||
|
.min_idle(Some(1))
|
||||||
|
.test_on_check_out(true)
|
||||||
|
.build(manager)
|
||||||
|
.expect("Could not build connection pool");
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
database_url,
|
connection_pool,
|
||||||
json_path,
|
json_path,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -84,14 +109,10 @@ impl Persistence {
|
|||||||
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
let database_url = self.database_url.clone();
|
let mut conn = self.connection_pool.get()?;
|
||||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
|
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
|
||||||
// TODO: connection pooling, such as via diesel::r2d2
|
.await
|
||||||
let mut conn = PgConnection::establish(&database_url)?;
|
.expect("Task panic")
|
||||||
func(&mut conn)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.expect("Task panic")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// When a node is first registered, persist it before using it for anything
|
/// When a node is first registered, persist it before using it for anything
|
||||||
|
|||||||
@@ -103,7 +103,9 @@ impl From<DatabaseError> for ApiError {
|
|||||||
match err {
|
match err {
|
||||||
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
|
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
|
||||||
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
|
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
|
||||||
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
|
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
|
||||||
|
ApiError::ShuttingDown
|
||||||
|
}
|
||||||
DatabaseError::Logical(reason) => {
|
DatabaseError::Logical(reason) => {
|
||||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||||
}
|
}
|
||||||
@@ -987,7 +989,15 @@ impl Service {
|
|||||||
.collect();
|
.collect();
|
||||||
} else {
|
} else {
|
||||||
// This was an update, wait for reconciliation
|
// This was an update, wait for reconciliation
|
||||||
self.await_waiters(waiters).await?;
|
if let Err(e) = self.await_waiters(waiters).await {
|
||||||
|
// Do not treat a reconcile error as fatal: we have already applied any requested
|
||||||
|
// Intent changes, and the reconcile can fail for external reasons like unavailable
|
||||||
|
// compute notification API. In these cases, it is important that we do not
|
||||||
|
// cause the cloud control plane to retry forever on this API.
|
||||||
|
tracing::warn!(
|
||||||
|
"Failed to reconcile after /location_config: {e}, returning success anyway"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
|
|||||||
@@ -90,8 +90,10 @@ pub enum ComputeFeature {
|
|||||||
/// track short-lived connections as user activity.
|
/// track short-lived connections as user activity.
|
||||||
ActivityMonitorExperimental,
|
ActivityMonitorExperimental,
|
||||||
|
|
||||||
/// Enable running migrations
|
// Use latest version of remote extensions
|
||||||
Migrations,
|
// This is needed to allow us to test new versions of extensions before
|
||||||
|
// they are merged into the main branch.
|
||||||
|
RemoteExtensionsUseLatest,
|
||||||
|
|
||||||
/// This is a special feature flag that is used to represent unknown feature flags.
|
/// This is a special feature flag that is used to represent unknown feature flags.
|
||||||
/// Basically all unknown to enum flags are represented as this one. See unit test
|
/// Basically all unknown to enum flags are represented as this one. See unit test
|
||||||
@@ -155,8 +157,12 @@ impl RemoteExtSpec {
|
|||||||
//
|
//
|
||||||
// Keep it in sync with path generation in
|
// Keep it in sync with path generation in
|
||||||
// https://github.com/neondatabase/build-custom-extensions/tree/main
|
// https://github.com/neondatabase/build-custom-extensions/tree/main
|
||||||
|
//
|
||||||
|
// if ComputeFeature::RemoteExtensionsUseLatest is enabled
|
||||||
|
// use "latest" as the build_tag
|
||||||
let archive_path_str =
|
let archive_path_str =
|
||||||
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
|
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
real_ext_name.to_string(),
|
real_ext_name.to_string(),
|
||||||
RemotePath::from_string(&archive_path_str)?,
|
RemotePath::from_string(&archive_path_str)?,
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ twox-hash.workspace = true
|
|||||||
|
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
|
||||||
|
[target.'cfg(target_os = "linux")'.dependencies]
|
||||||
|
procfs.workspace = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
rand_distr = "0.4.3"
|
rand_distr = "0.4.3"
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ pub use wrappers::{CountedReader, CountedWriter};
|
|||||||
mod hll;
|
mod hll;
|
||||||
pub mod metric_vec_duration;
|
pub mod metric_vec_duration;
|
||||||
pub use hll::{HyperLogLog, HyperLogLogVec};
|
pub use hll::{HyperLogLog, HyperLogLogVec};
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
pub mod more_process_metrics;
|
||||||
|
|
||||||
pub type UIntGauge = GenericGauge<AtomicU64>;
|
pub type UIntGauge = GenericGauge<AtomicU64>;
|
||||||
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
|
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
|
||||||
|
|||||||
54
libs/metrics/src/more_process_metrics.rs
Normal file
54
libs/metrics/src/more_process_metrics.rs
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
//! process metrics that the [`::prometheus`] crate doesn't provide.
|
||||||
|
|
||||||
|
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
|
||||||
|
|
||||||
|
use crate::UIntGauge;
|
||||||
|
|
||||||
|
pub struct Collector {
|
||||||
|
descs: Vec<prometheus::core::Desc>,
|
||||||
|
vmlck: crate::UIntGauge,
|
||||||
|
}
|
||||||
|
|
||||||
|
const NMETRICS: usize = 1;
|
||||||
|
|
||||||
|
impl prometheus::core::Collector for Collector {
|
||||||
|
fn desc(&self) -> Vec<&prometheus::core::Desc> {
|
||||||
|
self.descs.iter().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
|
||||||
|
let Ok(myself) = procfs::process::Process::myself() else {
|
||||||
|
return vec![];
|
||||||
|
};
|
||||||
|
let mut mfs = Vec::with_capacity(NMETRICS);
|
||||||
|
if let Ok(status) = myself.status() {
|
||||||
|
if let Some(vmlck) = status.vmlck {
|
||||||
|
self.vmlck.set(vmlck);
|
||||||
|
mfs.extend(self.vmlck.collect())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mfs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Collector {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let mut descs = Vec::new();
|
||||||
|
|
||||||
|
let vmlck =
|
||||||
|
UIntGauge::new("libmetrics_process_status_vmlck", "/proc/self/status vmlck").unwrap();
|
||||||
|
descs.extend(
|
||||||
|
prometheus::core::Collector::desc(&vmlck)
|
||||||
|
.into_iter()
|
||||||
|
.cloned(),
|
||||||
|
);
|
||||||
|
|
||||||
|
Self { descs, vmlck }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Collector {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -649,6 +649,27 @@ pub struct WalRedoManagerStatus {
|
|||||||
pub pid: Option<u32>,
|
pub pid: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod virtual_file {
|
||||||
|
#[derive(
|
||||||
|
Copy,
|
||||||
|
Clone,
|
||||||
|
PartialEq,
|
||||||
|
Eq,
|
||||||
|
Hash,
|
||||||
|
strum_macros::EnumString,
|
||||||
|
strum_macros::Display,
|
||||||
|
serde_with::DeserializeFromStr,
|
||||||
|
serde_with::SerializeDisplay,
|
||||||
|
Debug,
|
||||||
|
)]
|
||||||
|
#[strum(serialize_all = "kebab-case")]
|
||||||
|
pub enum IoEngineKind {
|
||||||
|
StdFs,
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
TokioEpollUring,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Wrapped in libpq CopyData
|
// Wrapped in libpq CopyData
|
||||||
#[derive(PartialEq, Eq, Debug)]
|
#[derive(PartialEq, Eq, Debug)]
|
||||||
pub enum PagestreamFeMessage {
|
pub enum PagestreamFeMessage {
|
||||||
|
|||||||
@@ -453,9 +453,12 @@ mod tests {
|
|||||||
event_mask: 0,
|
event_mask: 0,
|
||||||
}),
|
}),
|
||||||
expected_messages: vec![
|
expected_messages: vec![
|
||||||
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
|
// TODO: When updating Postgres versions, this test will cause
|
||||||
|
// problems. Postgres version in message needs updating.
|
||||||
|
//
|
||||||
|
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160002, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
|
||||||
vec![
|
vec![
|
||||||
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
|
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
|
||||||
147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
|
147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
|
||||||
188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
|
188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
|
||||||
|
|||||||
@@ -339,4 +339,16 @@ impl Client {
|
|||||||
.await
|
.await
|
||||||
.map_err(Error::ReceiveBody)
|
.map_err(Error::ReceiveBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn put_io_engine(
|
||||||
|
&self,
|
||||||
|
engine: &pageserver_api::models::virtual_file::IoEngineKind,
|
||||||
|
) -> Result<()> {
|
||||||
|
let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
|
||||||
|
self.request(Method::PUT, uri, engine)
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.map_err(Error::ReceiveBody)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
|||||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||||
|
|
||||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||||
pageserver::page_cache::init(100);
|
pageserver::page_cache::init(100);
|
||||||
|
|
||||||
let mut total_delta_layers = 0usize;
|
let mut total_delta_layers = 0usize;
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
|
|||||||
|
|
||||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||||
page_cache::init(100);
|
page_cache::init(100);
|
||||||
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
||||||
let summary_blk = file.read_blk(0, ctx).await?;
|
let summary_blk = file.read_blk(0, ctx).await?;
|
||||||
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
|||||||
new_tenant_id,
|
new_tenant_id,
|
||||||
new_timeline_id,
|
new_timeline_id,
|
||||||
} => {
|
} => {
|
||||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||||
pageserver::page_cache::init(100);
|
pageserver::page_cache::init(100);
|
||||||
|
|
||||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||||
|
|||||||
@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
|||||||
|
|
||||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||||
// Basic initialization of things that don't change after startup
|
// Basic initialization of things that don't change after startup
|
||||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
|
||||||
page_cache::init(100);
|
page_cache::init(100);
|
||||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||||
dump_layerfile_from_path(path, true, &ctx).await
|
dump_layerfile_from_path(path, true, &ctx).await
|
||||||
|
|||||||
@@ -51,6 +51,10 @@ pub(crate) struct Args {
|
|||||||
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
|
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
|
||||||
#[clap(long)]
|
#[clap(long)]
|
||||||
keyspace_cache: Option<Utf8PathBuf>,
|
keyspace_cache: Option<Utf8PathBuf>,
|
||||||
|
/// Before starting the benchmark, live-reconfigure the pageserver to use the given
|
||||||
|
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
|
||||||
|
#[clap(long)]
|
||||||
|
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
|
||||||
targets: Option<Vec<TenantTimelineId>>,
|
targets: Option<Vec<TenantTimelineId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -109,6 +113,10 @@ async fn main_impl(
|
|||||||
args.pageserver_jwt.as_deref(),
|
args.pageserver_jwt.as_deref(),
|
||||||
));
|
));
|
||||||
|
|
||||||
|
if let Some(engine_str) = &args.set_io_engine {
|
||||||
|
mgmt_api_client.put_io_engine(engine_str).await?;
|
||||||
|
}
|
||||||
|
|
||||||
// discover targets
|
// discover targets
|
||||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||||
&mgmt_api_client,
|
&mgmt_api_client,
|
||||||
|
|||||||
@@ -272,6 +272,8 @@ fn start_pageserver(
|
|||||||
);
|
);
|
||||||
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||||
set_launch_timestamp_metric(launch_ts);
|
set_launch_timestamp_metric(launch_ts);
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
metrics::register_internal(Box::new(metrics::more_process_metrics::Collector::new())).unwrap();
|
||||||
pageserver::preinitialize_metrics();
|
pageserver::preinitialize_metrics();
|
||||||
|
|
||||||
// If any failpoints were set from FAILPOINTS environment variable,
|
// If any failpoints were set from FAILPOINTS environment variable,
|
||||||
|
|||||||
@@ -1908,6 +1908,15 @@ async fn post_tracing_event_handler(
|
|||||||
json_response(StatusCode::OK, ())
|
json_response(StatusCode::OK, ())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn put_io_engine_handler(
|
||||||
|
mut r: Request<Body>,
|
||||||
|
_cancel: CancellationToken,
|
||||||
|
) -> Result<Response<Body>, ApiError> {
|
||||||
|
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
|
||||||
|
crate::virtual_file::io_engine::set(kind);
|
||||||
|
json_response(StatusCode::OK, ())
|
||||||
|
}
|
||||||
|
|
||||||
/// Common functionality of all the HTTP API handlers.
|
/// Common functionality of all the HTTP API handlers.
|
||||||
///
|
///
|
||||||
/// - Adds a tracing span to each request (by `request_span`)
|
/// - Adds a tracing span to each request (by `request_span`)
|
||||||
@@ -2165,5 +2174,6 @@ pub fn make_router(
|
|||||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
||||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||||
)
|
)
|
||||||
|
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||||
.any(handler_404))
|
.any(handler_404))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection(
|
|||||||
modification.commit(&ctx).await?;
|
modification.commit(&ctx).await?;
|
||||||
uncommitted_records = 0;
|
uncommitted_records = 0;
|
||||||
filtered_records = 0;
|
filtered_records = 0;
|
||||||
|
|
||||||
|
//
|
||||||
|
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
|
||||||
|
// layer size can become much larger than `checkpoint_distance`.
|
||||||
|
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
|
||||||
|
// amount of data to key-value storage. So performing this check only after processing
|
||||||
|
// all WAL records in the chunk, can cause huge L0 layer files.
|
||||||
|
//
|
||||||
|
timeline
|
||||||
|
.check_checkpoint_distance()
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Failed to check checkpoint distance for timeline {}",
|
||||||
|
timeline.timeline_id
|
||||||
|
)
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -28,9 +28,10 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
|||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
use utils::fs_ext;
|
use utils::fs_ext;
|
||||||
|
|
||||||
mod io_engine;
|
pub use pageserver_api::models::virtual_file as api;
|
||||||
|
pub(crate) mod io_engine;
|
||||||
mod open_options;
|
mod open_options;
|
||||||
pub use io_engine::IoEngineKind;
|
pub(crate) use io_engine::IoEngineKind;
|
||||||
pub(crate) use open_options::*;
|
pub(crate) use open_options::*;
|
||||||
|
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -7,67 +7,100 @@
|
|||||||
//!
|
//!
|
||||||
//! Then use [`get`] and [`super::OpenOptions`].
|
//! Then use [`get`] and [`super::OpenOptions`].
|
||||||
|
|
||||||
#[derive(
|
pub(crate) use super::api::IoEngineKind;
|
||||||
Copy,
|
#[derive(Clone, Copy)]
|
||||||
Clone,
|
#[repr(u8)]
|
||||||
PartialEq,
|
pub(crate) enum IoEngine {
|
||||||
Eq,
|
NotSet,
|
||||||
Hash,
|
|
||||||
strum_macros::EnumString,
|
|
||||||
strum_macros::Display,
|
|
||||||
serde_with::DeserializeFromStr,
|
|
||||||
serde_with::SerializeDisplay,
|
|
||||||
Debug,
|
|
||||||
)]
|
|
||||||
#[strum(serialize_all = "kebab-case")]
|
|
||||||
pub enum IoEngineKind {
|
|
||||||
StdFs,
|
StdFs,
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
TokioEpollUring,
|
TokioEpollUring,
|
||||||
}
|
}
|
||||||
|
|
||||||
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
|
impl From<IoEngineKind> for IoEngine {
|
||||||
|
fn from(value: IoEngineKind) -> Self {
|
||||||
#[cfg(not(test))]
|
match value {
|
||||||
pub(super) fn init(engine: IoEngineKind) {
|
IoEngineKind::StdFs => IoEngine::StdFs,
|
||||||
if IO_ENGINE.set(engine).is_err() {
|
#[cfg(target_os = "linux")]
|
||||||
panic!("called twice");
|
IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
crate::metrics::virtual_file_io_engine::KIND
|
|
||||||
.with_label_values(&[&format!("{engine}")])
|
|
||||||
.set(1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn get() -> &'static IoEngineKind {
|
impl TryFrom<u8> for IoEngine {
|
||||||
#[cfg(test)]
|
type Error = u8;
|
||||||
{
|
|
||||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||||
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
|
Ok(match value {
|
||||||
Ok(v) => match v.parse::<IoEngineKind>() {
|
v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
|
||||||
Ok(engine_kind) => engine_kind,
|
v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
|
||||||
Err(e) => {
|
#[cfg(target_os = "linux")]
|
||||||
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
|
v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
|
||||||
}
|
x => return Err(x),
|
||||||
},
|
|
||||||
Err(std::env::VarError::NotPresent) => {
|
|
||||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
|
||||||
.parse()
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
Err(std::env::VarError::NotUnicode(_)) => {
|
|
||||||
panic!("env var {env_var_name} is not unicode");
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
#[cfg(not(test))]
|
|
||||||
IO_ENGINE.get().unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
use std::os::unix::prelude::FileExt;
|
static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
|
||||||
|
|
||||||
|
pub(crate) fn set(engine_kind: IoEngineKind) {
|
||||||
|
let engine: IoEngine = engine_kind.into();
|
||||||
|
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
#[cfg(not(test))]
|
||||||
|
{
|
||||||
|
let metric = &crate::metrics::virtual_file_io_engine::KIND;
|
||||||
|
metric.reset();
|
||||||
|
metric
|
||||||
|
.with_label_values(&[&format!("{engine_kind}")])
|
||||||
|
.set(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(super) fn init(engine_kind: IoEngineKind) {
|
||||||
|
set(engine_kind);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn get() -> IoEngine {
|
||||||
|
let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
|
||||||
|
if cfg!(test) {
|
||||||
|
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
||||||
|
match cur {
|
||||||
|
IoEngine::NotSet => {
|
||||||
|
let kind = match std::env::var(env_var_name) {
|
||||||
|
Ok(v) => match v.parse::<IoEngineKind>() {
|
||||||
|
Ok(engine_kind) => engine_kind,
|
||||||
|
Err(e) => {
|
||||||
|
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(std::env::VarError::NotPresent) => {
|
||||||
|
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||||
|
.parse()
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
Err(std::env::VarError::NotUnicode(_)) => {
|
||||||
|
panic!("env var {env_var_name} is not unicode");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self::set(kind);
|
||||||
|
self::get()
|
||||||
|
}
|
||||||
|
x => x,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cur
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
os::unix::prelude::FileExt,
|
||||||
|
sync::atomic::{AtomicU8, Ordering},
|
||||||
|
};
|
||||||
|
|
||||||
use super::FileGuard;
|
use super::FileGuard;
|
||||||
|
|
||||||
impl IoEngineKind {
|
impl IoEngine {
|
||||||
pub(super) async fn read_at<B>(
|
pub(super) async fn read_at<B>(
|
||||||
&self,
|
&self,
|
||||||
file_guard: FileGuard,
|
file_guard: FileGuard,
|
||||||
@@ -78,7 +111,8 @@ impl IoEngineKind {
|
|||||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||||
{
|
{
|
||||||
match self {
|
match self {
|
||||||
IoEngineKind::StdFs => {
|
IoEngine::NotSet => panic!("not initialized"),
|
||||||
|
IoEngine::StdFs => {
|
||||||
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
|
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
|
||||||
let dst = unsafe {
|
let dst = unsafe {
|
||||||
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
|
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
|
||||||
@@ -96,7 +130,7 @@ impl IoEngineKind {
|
|||||||
((file_guard, buf), res)
|
((file_guard, buf), res)
|
||||||
}
|
}
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
IoEngineKind::TokioEpollUring => {
|
IoEngine::TokioEpollUring => {
|
||||||
let system = tokio_epoll_uring::thread_local_system().await;
|
let system = tokio_epoll_uring::thread_local_system().await;
|
||||||
let (resources, res) = system.read(file_guard, offset, buf).await;
|
let (resources, res) = system.read(file_guard, offset, buf).await;
|
||||||
(
|
(
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||||
|
|
||||||
use super::IoEngineKind;
|
use super::io_engine::IoEngine;
|
||||||
use std::{os::fd::OwnedFd, path::Path};
|
use std::{os::fd::OwnedFd, path::Path};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -13,9 +13,10 @@ pub enum OpenOptions {
|
|||||||
impl Default for OpenOptions {
|
impl Default for OpenOptions {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
match super::io_engine::get() {
|
match super::io_engine::get() {
|
||||||
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
IoEngine::NotSet => panic!("io engine not set"),
|
||||||
|
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||||
#[cfg(target_os = "linux")]
|
#[cfg(target_os = "linux")]
|
||||||
IoEngineKind::TokioEpollUring => {
|
IoEngine::TokioEpollUring => {
|
||||||
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3131,10 +3131,7 @@ class Endpoint(PgProtocol):
|
|||||||
log.info(json.dumps(dict(data_dict, **kwargs)))
|
log.info(json.dumps(dict(data_dict, **kwargs)))
|
||||||
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
||||||
|
|
||||||
# Please note: if you didn't respec this endpoint to have the `migrations`
|
# Please note: Migrations only run if pg_skip_catalog_updates is false
|
||||||
# feature, this function will probably fail because neon_migration.migration_id
|
|
||||||
# won't exist. This is temporary - soon we'll get rid of the feature flag and
|
|
||||||
# migrations will be enabled for everyone.
|
|
||||||
def wait_for_migrations(self):
|
def wait_for_migrations(self):
|
||||||
with self.cursor() as cur:
|
with self.cursor() as cur:
|
||||||
|
|
||||||
|
|||||||
@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
|
|||||||
self,
|
self,
|
||||||
tenant_id: Union[TenantId, TenantShardId],
|
tenant_id: Union[TenantId, TenantShardId],
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
timestamp,
|
timestamp: datetime,
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||||
)
|
)
|
||||||
res = self.get(
|
res = self.get(
|
||||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
|
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
|
||||||
)
|
)
|
||||||
self.verbose_error(res)
|
self.verbose_error(res)
|
||||||
res_json = res.json()
|
res_json = res.json()
|
||||||
|
|||||||
@@ -26,86 +26,81 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
|||||||
# apply during config step, like more users, databases, or extensions. By default
|
# apply during config step, like more users, databases, or extensions. By default
|
||||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||||
# test we only load neon.
|
# test we only load neon.
|
||||||
@pytest.mark.timeout(1000)
|
@pytest.mark.timeout(1800)
|
||||||
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
@pytest.mark.parametrize("slru", ["lazy", "eager"])
|
||||||
|
def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||||
neon_env_builder.num_safekeepers = 3
|
neon_env_builder.num_safekeepers = 3
|
||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
lazy_tenant, _ = env.neon_cli.create_tenant(
|
lazy_slru_download = "true" if slru == "lazy" else "false"
|
||||||
|
tenant, _ = env.neon_cli.create_tenant(
|
||||||
conf={
|
conf={
|
||||||
"lazy_slru_download": "true",
|
"lazy_slru_download": lazy_slru_download,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
eager_tenant, _ = env.neon_cli.create_tenant(
|
|
||||||
conf={
|
|
||||||
"lazy_slru_download": "false",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
tenants = [lazy_tenant, eager_tenant]
|
|
||||||
slru = "lazy"
|
|
||||||
for tenant in tenants:
|
|
||||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
|
||||||
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
|
||||||
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
|
|
||||||
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
|
|
||||||
endpoint.safe_psql(
|
|
||||||
"""
|
|
||||||
CREATE PROCEDURE updating() as
|
|
||||||
$$
|
|
||||||
DECLARE
|
|
||||||
i integer;
|
|
||||||
BEGIN
|
|
||||||
FOR i IN 1..10000000 LOOP
|
|
||||||
UPDATE t SET x = x + 1 WHERE pk=1;
|
|
||||||
COMMIT;
|
|
||||||
END LOOP;
|
|
||||||
END
|
|
||||||
$$ LANGUAGE plpgsql
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
endpoint.safe_psql("SET statement_timeout=0")
|
|
||||||
endpoint.safe_psql("call updating()")
|
|
||||||
|
|
||||||
|
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||||
|
with endpoint.cursor() as cur:
|
||||||
|
cur.execute("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
||||||
|
cur.execute("ALTER TABLE t SET (autovacuum_enabled = false)")
|
||||||
|
cur.execute("INSERT INTO t VALUES (1, 0)")
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
CREATE PROCEDURE updating() as
|
||||||
|
$$
|
||||||
|
DECLARE
|
||||||
|
i integer;
|
||||||
|
BEGIN
|
||||||
|
FOR i IN 1..1000000 LOOP
|
||||||
|
UPDATE t SET x = x + 1 WHERE pk=1;
|
||||||
|
COMMIT;
|
||||||
|
END LOOP;
|
||||||
|
END
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
cur.execute("SET statement_timeout=0")
|
||||||
|
cur.execute("call updating()")
|
||||||
|
|
||||||
|
endpoint.stop()
|
||||||
|
|
||||||
|
# We do two iterations so we can see if the second startup is faster. It should
|
||||||
|
# be because the compute node should already be configured with roles, databases,
|
||||||
|
# extensions, etc from the first run.
|
||||||
|
for i in range(2):
|
||||||
|
# Start
|
||||||
|
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
||||||
|
endpoint.start()
|
||||||
|
|
||||||
|
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
||||||
|
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
||||||
|
assert sum == 1000000
|
||||||
|
|
||||||
|
# Get metrics
|
||||||
|
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||||
|
durations = {
|
||||||
|
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
||||||
|
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
||||||
|
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
||||||
|
"basebackup_ms": f"{slru}_{i}_basebackup",
|
||||||
|
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
||||||
|
"config_ms": f"{slru}_{i}_config",
|
||||||
|
"total_startup_ms": f"{slru}_{i}_total_startup",
|
||||||
|
}
|
||||||
|
for key, name in durations.items():
|
||||||
|
value = metrics[key]
|
||||||
|
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||||
|
|
||||||
|
basebackup_bytes = metrics["basebackup_bytes"]
|
||||||
|
zenbenchmark.record(
|
||||||
|
f"{slru}_{i}_basebackup_bytes",
|
||||||
|
basebackup_bytes,
|
||||||
|
"bytes",
|
||||||
|
report=MetricReport.LOWER_IS_BETTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Stop so we can restart
|
||||||
endpoint.stop()
|
endpoint.stop()
|
||||||
|
|
||||||
# We do two iterations so we can see if the second startup is faster. It should
|
# Imitate optimizations that console would do for the second start
|
||||||
# be because the compute node should already be configured with roles, databases,
|
endpoint.respec(skip_pg_catalog_updates=True)
|
||||||
# extensions, etc from the first run.
|
|
||||||
for i in range(2):
|
|
||||||
# Start
|
|
||||||
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
|
||||||
endpoint.start()
|
|
||||||
|
|
||||||
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
|
||||||
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
|
||||||
assert sum == 10000000
|
|
||||||
|
|
||||||
# Get metrics
|
|
||||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
|
||||||
durations = {
|
|
||||||
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
|
||||||
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
|
||||||
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
|
||||||
"basebackup_ms": f"{slru}_{i}_basebackup",
|
|
||||||
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
|
||||||
"config_ms": f"{slru}_{i}_config",
|
|
||||||
"total_startup_ms": f"{slru}_{i}_total_startup",
|
|
||||||
}
|
|
||||||
for key, name in durations.items():
|
|
||||||
value = metrics[key]
|
|
||||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
|
||||||
|
|
||||||
basebackup_bytes = metrics["basebackup_bytes"]
|
|
||||||
zenbenchmark.record(
|
|
||||||
f"{slru}_{i}_basebackup_bytes",
|
|
||||||
basebackup_bytes,
|
|
||||||
"bytes",
|
|
||||||
report=MetricReport.LOWER_IS_BETTER,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Stop so we can restart
|
|
||||||
endpoint.stop()
|
|
||||||
|
|
||||||
# Imitate optimizations that console would do for the second start
|
|
||||||
endpoint.respec(skip_pg_catalog_updates=True)
|
|
||||||
slru = "eager"
|
|
||||||
|
|||||||
66
test_runner/regress/test_layer_bloating.py
Normal file
66
test_runner/regress/test_layer_bloating.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import (
|
||||||
|
NeonEnv,
|
||||||
|
logical_replication_sync,
|
||||||
|
)
|
||||||
|
from fixtures.pg_version import PgVersion
|
||||||
|
|
||||||
|
|
||||||
|
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
if env.pg_version != PgVersion.V16:
|
||||||
|
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||||
|
|
||||||
|
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||||
|
endpoint = env.endpoints.create_start(
|
||||||
|
"test_logical_replication", config_lines=["log_statement=all"]
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info("postgres is running on 'test_logical_replication' branch")
|
||||||
|
pg_conn = endpoint.connect()
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
# create table...
|
||||||
|
cur.execute("create table t(pk integer primary key)")
|
||||||
|
cur.execute("create publication pub1 for table t")
|
||||||
|
# Create slot to hold WAL
|
||||||
|
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
|
||||||
|
|
||||||
|
# now start subscriber
|
||||||
|
vanilla_pg.start()
|
||||||
|
vanilla_pg.safe_psql("create table t(pk integer primary key)")
|
||||||
|
|
||||||
|
connstr = endpoint.connstr().replace("'", "''")
|
||||||
|
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||||
|
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||||
|
|
||||||
|
cur.execute(
|
||||||
|
"""create or replace function create_snapshots(n integer) returns void as $$
|
||||||
|
declare
|
||||||
|
i integer;
|
||||||
|
begin
|
||||||
|
for i in 1..n loop
|
||||||
|
perform pg_log_standby_snapshot();
|
||||||
|
end loop;
|
||||||
|
end; $$ language plpgsql"""
|
||||||
|
)
|
||||||
|
cur.execute("set statement_timeout=0")
|
||||||
|
cur.execute("select create_snapshots(10000)")
|
||||||
|
# Wait logical replication to sync
|
||||||
|
logical_replication_sync(vanilla_pg, endpoint)
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
# Check layer file sizes
|
||||||
|
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||||
|
env.pageserver.workdir, env.initial_tenant, timeline
|
||||||
|
)
|
||||||
|
log.info(f"Check {timeline_path}")
|
||||||
|
for filename in os.listdir(timeline_path):
|
||||||
|
if filename.startswith("00000"):
|
||||||
|
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
|
||||||
|
assert os.path.getsize(timeline_path + filename) < 512_000_000
|
||||||
@@ -64,18 +64,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
|||||||
# Check edge cases
|
# Check edge cases
|
||||||
# Timestamp is in the future
|
# Timestamp is in the future
|
||||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||||
result = client.timeline_get_lsn_by_timestamp(
|
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
|
||||||
)
|
|
||||||
assert result["kind"] == "future"
|
assert result["kind"] == "future"
|
||||||
# make sure that we return a well advanced lsn here
|
# make sure that we return a well advanced lsn here
|
||||||
assert Lsn(result["lsn"]) > start_lsn
|
assert Lsn(result["lsn"]) > start_lsn
|
||||||
|
|
||||||
# Timestamp is in the unreachable past
|
# Timestamp is in the unreachable past
|
||||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||||
result = client.timeline_get_lsn_by_timestamp(
|
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
|
||||||
)
|
|
||||||
assert result["kind"] == "past"
|
assert result["kind"] == "past"
|
||||||
# make sure that we return the minimum lsn here at the start of the range
|
# make sure that we return the minimum lsn here at the start of the range
|
||||||
assert Lsn(result["lsn"]) < start_lsn
|
assert Lsn(result["lsn"]) < start_lsn
|
||||||
@@ -83,9 +79,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
|||||||
# Probe a bunch of timestamps in the valid range
|
# Probe a bunch of timestamps in the valid range
|
||||||
for i in range(1, len(tbl), 100):
|
for i in range(1, len(tbl), 100):
|
||||||
probe_timestamp = tbl[i][1]
|
probe_timestamp = tbl[i][1]
|
||||||
result = client.timeline_get_lsn_by_timestamp(
|
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
|
||||||
)
|
|
||||||
assert result["kind"] not in ["past", "nodata"]
|
assert result["kind"] not in ["past", "nodata"]
|
||||||
lsn = result["lsn"]
|
lsn = result["lsn"]
|
||||||
# Call get_lsn_by_timestamp to get the LSN
|
# Call get_lsn_by_timestamp to get the LSN
|
||||||
@@ -108,9 +102,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
|||||||
|
|
||||||
# Timestamp is in the unreachable past
|
# Timestamp is in the unreachable past
|
||||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||||
result = client.timeline_get_lsn_by_timestamp(
|
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
|
||||||
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
|
|
||||||
)
|
|
||||||
assert result["kind"] == "past"
|
assert result["kind"] == "past"
|
||||||
# make sure that we return the minimum lsn here at the start of the range
|
# make sure that we return the minimum lsn here at the start of the range
|
||||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
|||||||
endpoint = env.endpoints.create("test_migrations")
|
endpoint = env.endpoints.create("test_migrations")
|
||||||
log_path = endpoint.endpoint_path() / "compute.log"
|
log_path = endpoint.endpoint_path() / "compute.log"
|
||||||
|
|
||||||
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
endpoint.respec(skip_pg_catalog_updates=False)
|
||||||
endpoint.start()
|
endpoint.start()
|
||||||
|
|
||||||
endpoint.wait_for_migrations()
|
endpoint.wait_for_migrations()
|
||||||
|
|||||||
@@ -12,10 +12,10 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
|||||||
env.neon_cli.create_branch("test_neon_superuser_subscriber")
|
env.neon_cli.create_branch("test_neon_superuser_subscriber")
|
||||||
sub = env.endpoints.create("test_neon_superuser_subscriber")
|
sub = env.endpoints.create("test_neon_superuser_subscriber")
|
||||||
|
|
||||||
pub.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
pub.respec(skip_pg_catalog_updates=False)
|
||||||
pub.start()
|
pub.start()
|
||||||
|
|
||||||
sub.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
sub.respec(skip_pg_catalog_updates=False)
|
||||||
sub.start()
|
sub.start()
|
||||||
|
|
||||||
pub.wait_for_migrations()
|
pub.wait_for_migrations()
|
||||||
|
|||||||
@@ -310,7 +310,7 @@ def test_sharding_service_compute_hook(
|
|||||||
notifications.append(request.json)
|
notifications.append(request.json)
|
||||||
return Response(status=200)
|
return Response(status=200)
|
||||||
|
|
||||||
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
|
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||||
|
|
||||||
# Start running
|
# Start running
|
||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
|
|||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: be7a65fe67...018fb05201
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 81e16cd537...6ee78a3c29
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: f7ea954989...550cdd26d4
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
|||||||
{
|
{
|
||||||
"postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a",
|
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
|
||||||
"postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535",
|
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
|
||||||
"postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88"
|
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
|||||||
clap = { version = "4", features = ["derive", "string"] }
|
clap = { version = "4", features = ["derive", "string"] }
|
||||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||||
crossbeam-utils = { version = "0.8" }
|
crossbeam-utils = { version = "0.8" }
|
||||||
diesel = { version = "2", features = ["postgres", "serde_json"] }
|
diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
|
||||||
either = { version = "1" }
|
either = { version = "1" }
|
||||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||||
futures-channel = { version = "0.3", features = ["sink"] }
|
futures-channel = { version = "0.3", features = ["sink"] }
|
||||||
@@ -90,6 +90,7 @@ anyhow = { version = "1", features = ["backtrace"] }
|
|||||||
bytes = { version = "1", features = ["serde"] }
|
bytes = { version = "1", features = ["serde"] }
|
||||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||||
|
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
|
||||||
either = { version = "1" }
|
either = { version = "1" }
|
||||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
||||||
|
|||||||
Reference in New Issue
Block a user