Merge branch 'main' into problame/2024-02-walredo-work/prespawn/switch-to-heavier-once-cell-with-rwlock

This commit is contained in:
Christian Schwarz
2024-02-08 11:19:48 +00:00
50 changed files with 822 additions and 318 deletions

View File

@@ -179,6 +179,12 @@ runs:
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
fi
- name: Cache poetry deps
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Store Allure test stat in the DB (new)
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
shell: bash -euxo pipefail {0}

View File

@@ -44,6 +44,10 @@ inputs:
description: 'Postgres version to use for tests'
required: false
default: 'v14'
benchmark_durations:
description: 'benchmark durations JSON'
required: false
default: '{}'
runs:
using: "composite"
@@ -82,11 +86,10 @@ runs:
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}
@@ -160,7 +163,7 @@ runs:
# We use pytest-split plugin to run benchmarks in parallel on different CI runners
if [ "${TEST_SELECTION}" = "test_runner/performance" ] && [ "${{ inputs.build_type }}" != "remote" ]; then
mkdir -p $TEST_OUTPUT
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/benchmark_durations.json"
echo '${{ inputs.benchmark_durations || '{}' }}' > $TEST_OUTPUT/benchmark_durations.json
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi

View File

@@ -93,6 +93,7 @@ jobs:
--body-file "body.md" \
--head "${BRANCH}" \
--base "main" \
--label "run-e2e-tests-in-draft" \
--draft
fi

View File

@@ -22,7 +22,7 @@ env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
jobs:
check-permissions:
@@ -112,11 +112,10 @@ jobs:
fetch-depth: 1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
@@ -478,8 +477,40 @@ jobs:
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
uses: ./.github/actions/save-coverage-data
get-benchmarks-durations:
outputs:
json: ${{ steps.get-benchmark-durations.outputs.json }}
needs: [ check-permissions, build-buildtools-image ]
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
options: --init
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Cache poetry deps
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
run: ./scripts/pysync
- name: get benchmark durations
id: get-benchmark-durations
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
run: |
poetry run ./scripts/benchmark_durations.py "${TEST_RESULT_CONNSTR}" \
--days 10 \
--output /tmp/benchmark_durations.json
echo "json=$(jq --compact-output '.' /tmp/benchmark_durations.json)" >> $GITHUB_OUTPUT
benchmarks:
needs: [ check-permissions, build-neon, build-buildtools-image ]
needs: [ check-permissions, build-neon, build-buildtools-image, get-benchmarks-durations ]
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
@@ -490,7 +521,7 @@ jobs:
fail-fast: false
matrix:
# the amount of groups (N) should be reflected in `extra_params: --splits N ...`
pytest_split_group: [ 1, 2, 3, 4 ]
pytest_split_group: [ 1, 2, 3, 4, 5 ]
build_type: [ release ]
steps:
- name: Checkout
@@ -503,7 +534,8 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref_name == 'main' }}
extra_params: --splits 4 --group ${{ matrix.pytest_split_group }}
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -660,50 +692,10 @@ jobs:
})
trigger-e2e-tests:
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' }}
needs: [ check-permissions, promote-images, tag ]
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
# to place a job run status update later.
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
}
}"
uses: ./.github/workflows/trigger-e2e-tests.yml
secrets: inherit
neon-image:
needs: [ check-permissions, build-buildtools-image, tag ]

View File

@@ -38,11 +38,10 @@ jobs:
uses: snok/install-poetry@v1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
key: v2-${{ runner.os }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -euxo pipefail {0}

118
.github/workflows/trigger-e2e-tests.yml vendored Normal file
View File

@@ -0,0 +1,118 @@
name: Trigger E2E Tests
on:
pull_request:
types:
- ready_for_review
workflow_call:
defaults:
run:
shell: bash -euxo pipefail {0}
env:
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
jobs:
cancel-previous-e2e-tests:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- name: Cancel previous e2e-tests runs for this PR
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
gh workflow --repo neondatabase/cloud \
run cancel-previous-in-concurrency-group.yml \
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
tag:
runs-on: [ ubuntu-latest ]
outputs:
build-tag: ${{ steps.build-tag.outputs.tag }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Get build tag
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
CURRENT_BRANCH: ${{ github.head_ref || github.ref_name }}
CURRENT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
fi
id: build-tag
trigger-e2e-tests:
needs: [ tag ]
runs-on: [ self-hosted, gen3, small ]
env:
TAG: ${{ needs.tag.outputs.build-tag }}
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
steps:
- name: check if ecr image are present
run: |
for REPO in neon compute-tools compute-node-v14 vm-compute-node-v14 compute-node-v15 vm-compute-node-v15 compute-node-v16 vm-compute-node-v16; do
OUTPUT=$(aws ecr describe-images --repository-name ${REPO} --region eu-central-1 --query "imageDetails[?imageTags[?contains(@, '${TAG}')]]" --output text)
if [ "$OUTPUT" == "" ]; then
echo "$REPO with image tag $TAG not found" >> $GITHUB_OUTPUT
exit 1
fi
done
- name: Set PR's status to pending and request a remote CI test
run: |
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
# to place a job run status update later.
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"storage_image_tag\": \"${TAG}\",
\"compute_image_tag\": \"${TAG}\",
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
}
}"

View File

@@ -54,6 +54,9 @@ _An instruction for maintainers_
- If and only if it looks **safe** (i.e. it doesn't contain any malicious code which could expose secrets or harm the CI), then:
- Press the "Approve and run" button in GitHub UI
- Add the `approved-for-ci-run` label to the PR
- Currently draft PR will skip e2e test (only for internal contributors). After turning the PR 'Ready to Review' CI will trigger e2e test
- Add `run-e2e-tests-in-draft` label to run e2e test in draft PR (override above behaviour)
- The `approved-for-ci-run` workflow will add `run-e2e-tests-in-draft` automatically to run e2e test for external contributors
Repeat all steps after any change to the PR.
- When the changes are ready to get merged — merge the original PR (not the internal one)

30
Cargo.lock generated
View File

@@ -289,6 +289,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"r2d2",
"reqwest",
"serde",
"serde_json",
@@ -1651,6 +1652,7 @@ dependencies = [
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"serde_json",
]
@@ -2867,6 +2869,7 @@ dependencies = [
"chrono",
"libc",
"once_cell",
"procfs",
"prometheus",
"rand 0.8.5",
"rand_distr",
@@ -3984,6 +3987,8 @@ checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
dependencies = [
"bitflags 1.3.2",
"byteorder",
"chrono",
"flate2",
"hex",
"lazy_static",
"rustix 0.36.16",
@@ -4166,6 +4171,17 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.7.3"
@@ -4879,6 +4895,15 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
@@ -5714,7 +5739,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc"
dependencies = [
"futures",
"nix 0.26.4",
@@ -6239,7 +6264,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#d6a1c93442fb6b3a5bec490204961134e54925dc"
dependencies = [
"io-uring",
"libc",
@@ -6807,6 +6832,7 @@ dependencies = [
"clap_builder",
"crossbeam-utils",
"diesel",
"diesel_derives",
"either",
"fail",
"futures-channel",

View File

@@ -113,6 +113,7 @@ parquet = { version = "49.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "49.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.14"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"

View File

@@ -111,7 +111,7 @@ USER nonroot:nonroot
WORKDIR /home/nonroot
# Python
ENV PYTHON_VERSION=3.9.2 \
ENV PYTHON_VERSION=3.9.18 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \

2
NOTICE
View File

@@ -1,5 +1,5 @@
Neon
Copyright 2022 Neon Inc.
Copyright 2022 - 2024 Neon Inc.
The PostgreSQL submodules in vendor/ are licensed under the PostgreSQL license.
See vendor/postgres-vX/COPYRIGHT for details.

View File

@@ -207,6 +207,7 @@ fn maybe_cgexec(cmd: &str) -> Command {
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
/// that we give to customers
#[instrument(skip_all)]
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let roles = spec
.cluster
@@ -772,12 +773,11 @@ impl ComputeNode {
// 'Close' connection
drop(client);
if self.has_feature(ComputeFeature::Migrations) {
thread::spawn(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client)
});
}
// Run migrations separately to not hold up cold starts
thread::spawn(move || {
let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client)
});
Ok(())
}

View File

@@ -24,8 +24,9 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
utils = { path = "../../libs/utils/" }
metrics = { path = "../../libs/metrics/" }

View File

@@ -170,7 +170,7 @@ impl ComputeHook {
reconfigure_request: &ComputeHookNotifyRequest,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let req = client.request(Method::POST, url);
let req = client.request(Method::PUT, url);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
@@ -240,7 +240,7 @@ impl ComputeHook {
let client = reqwest::Client::new();
backoff::retry(
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|e| matches!(e, NotifyError::Fatal(_)),
|e| matches!(e, NotifyError::Fatal(_) | NotifyError::Unexpected(_)),
3,
10,
"Send compute notification",

View File

@@ -34,9 +34,9 @@ struct Cli {
#[arg(short, long)]
listen: std::net::SocketAddr,
/// Path to public key for JWT authentication of clients
/// Public key for JWT authentication of clients
#[arg(long)]
public_key: Option<camino::Utf8PathBuf>,
public_key: Option<String>,
/// Token for authenticating this service with the pageservers it controls
#[arg(long)]
@@ -159,7 +159,7 @@ impl Secrets {
fn load_cli(database_url: &str, args: &Cli) -> anyhow::Result<Self> {
let public_key = match &args.public_key {
None => None,
Some(key_path) => Some(JwtAuth::from_key_path(key_path)?),
Some(key) => Some(JwtAuth::from_key(key.clone()).context("Loading public key")?),
};
Ok(Self {
database_url: database_url.to_owned(),
@@ -170,6 +170,7 @@ impl Secrets {
}
}
/// Execute the diesel migrations that are built into this binary
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
use diesel::PgConnection;
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
@@ -183,8 +184,18 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
tokio::runtime::Builder::new_current_thread()
// We use spawn_blocking for database operations, so require approximately
// as many blocking threads as we will open database connections.
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
.enable_all()
.build()
.unwrap()
.block_on(async_main())
}
async fn async_main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
logging::init(

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use camino::Utf8Path;
use camino::Utf8PathBuf;
@@ -44,7 +45,7 @@ use crate::PlacementPolicy;
/// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
database_url: String,
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
@@ -64,6 +65,8 @@ pub(crate) enum DatabaseError {
Query(#[from] diesel::result::Error),
#[error(transparent)]
Connection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
ConnectionPool(#[from] r2d2::Error),
#[error("Logical error: {0}")]
Logical(String),
}
@@ -71,9 +74,31 @@ pub(crate) enum DatabaseError {
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
pub const MAX_CONNECTIONS: u32 = 99;
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
// to execute queries (database queries are not generally on latency-sensitive paths).
let connection_pool = diesel::r2d2::Pool::builder()
.max_size(Self::MAX_CONNECTIONS)
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
// Always keep at least one connection ready to go
.min_idle(Some(1))
.test_on_check_out(true)
.build(manager)
.expect("Could not build connection pool");
Self {
database_url,
connection_pool,
json_path,
}
}
@@ -84,14 +109,10 @@ impl Persistence {
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
let database_url = self.database_url.clone();
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
// TODO: connection pooling, such as via diesel::r2d2
let mut conn = PgConnection::establish(&database_url)?;
func(&mut conn)
})
.await
.expect("Task panic")
let mut conn = self.connection_pool.get()?;
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
.await
.expect("Task panic")
}
/// When a node is first registered, persist it before using it for anything

View File

@@ -103,7 +103,9 @@ impl From<DatabaseError> for ApiError {
match err {
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
ApiError::ShuttingDown
}
DatabaseError::Logical(reason) => {
ApiError::InternalServerError(anyhow::anyhow!(reason))
}
@@ -987,7 +989,15 @@ impl Service {
.collect();
} else {
// This was an update, wait for reconciliation
self.await_waiters(waiters).await?;
if let Err(e) = self.await_waiters(waiters).await {
// Do not treat a reconcile error as fatal: we have already applied any requested
// Intent changes, and the reconcile can fail for external reasons like unavailable
// compute notification API. In these cases, it is important that we do not
// cause the cloud control plane to retry forever on this API.
tracing::warn!(
"Failed to reconcile after /location_config: {e}, returning success anyway"
);
}
}
Ok(result)

View File

@@ -28,7 +28,7 @@ pub struct AttachmentService {
listen: String,
path: Utf8PathBuf,
jwt_token: Option<String>,
public_key_path: Option<Utf8PathBuf>,
public_key: Option<String>,
postgres_port: u16,
client: reqwest::Client,
}
@@ -207,7 +207,7 @@ impl AttachmentService {
.pageservers
.first()
.expect("Config is validated to contain at least one pageserver");
let (jwt_token, public_key_path) = match ps_conf.http_auth_type {
let (jwt_token, public_key) = match ps_conf.http_auth_type {
AuthType::Trust => (None, None),
AuthType::NeonJWT => {
let jwt_token = env
@@ -219,7 +219,26 @@ impl AttachmentService {
let public_key_path =
camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
.unwrap();
(Some(jwt_token), Some(public_key_path))
// This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
let public_key = if std::fs::metadata(&public_key_path)
.expect("Can't stat public key")
.is_dir()
{
// Our config may specify a directory: this is for the pageserver's ability to handle multiple
// keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
let mut dir =
std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
let dent = dir
.next()
.expect("Empty key dir")
.expect("Error reading key dir");
std::fs::read_to_string(dent.path()).expect("Can't read public key")
} else {
std::fs::read_to_string(&public_key_path).expect("Can't read public key")
};
(Some(jwt_token), Some(public_key))
}
};
@@ -228,7 +247,7 @@ impl AttachmentService {
path,
listen,
jwt_token,
public_key_path,
public_key,
postgres_port,
client: reqwest::ClientBuilder::new()
.build()
@@ -453,8 +472,8 @@ impl AttachmentService {
args.push(format!("--jwt-token={jwt_token}"));
}
if let Some(public_key_path) = &self.public_key_path {
args.push(format!("--public-key={public_key_path}"));
if let Some(public_key) = &self.public_key {
args.push(format!("--public-key=\"{public_key}\""));
}
if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {

View File

@@ -90,9 +90,6 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Enable running migrations
Migrations,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -13,6 +13,9 @@ twox-hash.workspace = true
workspace_hack.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
procfs.workspace = true
[dev-dependencies]
rand = "0.8"
rand_distr = "0.4.3"

View File

@@ -31,6 +31,8 @@ pub use wrappers::{CountedReader, CountedWriter};
mod hll;
pub mod metric_vec_duration;
pub use hll::{HyperLogLog, HyperLogLogVec};
#[cfg(target_os = "linux")]
pub mod more_process_metrics;
pub type UIntGauge = GenericGauge<AtomicU64>;
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;

View File

@@ -0,0 +1,54 @@
//! process metrics that the [`::prometheus`] crate doesn't provide.
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
use crate::UIntGauge;
pub struct Collector {
descs: Vec<prometheus::core::Desc>,
vmlck: crate::UIntGauge,
}
const NMETRICS: usize = 1;
impl prometheus::core::Collector for Collector {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
let Ok(myself) = procfs::process::Process::myself() else {
return vec![];
};
let mut mfs = Vec::with_capacity(NMETRICS);
if let Ok(status) = myself.status() {
if let Some(vmlck) = status.vmlck {
self.vmlck.set(vmlck);
mfs.extend(self.vmlck.collect())
}
}
mfs
}
}
impl Collector {
pub fn new() -> Self {
let mut descs = Vec::new();
let vmlck =
UIntGauge::new("libmetrics_process_status_vmlck", "/proc/self/status vmlck").unwrap();
descs.extend(
prometheus::core::Collector::desc(&vmlck)
.into_iter()
.cloned(),
);
Self { descs, vmlck }
}
}
impl Default for Collector {
fn default() -> Self {
Self::new()
}
}

View File

@@ -649,6 +649,27 @@ pub struct WalRedoManagerStatus {
pub pid: Option<u32>,
}
pub mod virtual_file {
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
StdFs,
#[cfg(target_os = "linux")]
TokioEpollUring,
}
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {

View File

@@ -453,9 +453,12 @@ mod tests {
event_mask: 0,
}),
expected_messages: vec![
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
// TODO: When updating Postgres versions, this test will cause
// problems. Postgres version in message needs updating.
//
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160002, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
vec![
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,

View File

@@ -339,4 +339,16 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn put_io_engine(
&self,
engine: &pageserver_api::models::virtual_file::IoEngineKind,
) -> Result<()> {
let uri = format!("{}/v1/io_engine", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, engine)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -51,6 +51,10 @@ pub(crate) struct Args {
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
#[clap(long)]
keyspace_cache: Option<Utf8PathBuf>,
/// Before starting the benchmark, live-reconfigure the pageserver to use the given
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -109,6 +113,10 @@ async fn main_impl(
args.pageserver_jwt.as_deref(),
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.put_io_engine(engine_str).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,

View File

@@ -272,6 +272,12 @@ fn start_pageserver(
);
set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_launch_timestamp_metric(launch_ts);
#[cfg(target_os = "linux")]
metrics::register_internal(Box::new(metrics::more_process_metrics::Collector::new())).unwrap();
metrics::register_internal(Box::new(
pageserver::metrics::tokio_epoll_uring::Collector::new(),
))
.unwrap();
pageserver::preinitialize_metrics();
// If any failpoints were set from FAILPOINTS environment variable,

View File

@@ -1908,6 +1908,15 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}
async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
crate::virtual_file::io_engine::set(kind);
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -2165,5 +2174,6 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.any(handler_404))
}

View File

@@ -2400,6 +2400,72 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}
}
pub mod tokio_epoll_uring {
use metrics::UIntGauge;
pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
}
const NMETRICS: usize = 2;
impl metrics::core::Collector for Collector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
self.systems_created.set(systems_created);
mfs.extend(self.systems_created.collect());
self.systems_destroyed.set(systems_destroyed);
mfs.extend(self.systems_destroyed.collect());
mfs
}
}
impl Collector {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut descs = Vec::new();
let systems_created = UIntGauge::new(
"pageserver_tokio_epoll_uring_systems_created",
"counter of tokio-epoll-uring systems that were created",
)
.unwrap();
descs.extend(
metrics::core::Collector::desc(&systems_created)
.into_iter()
.cloned(),
);
let systems_destroyed = UIntGauge::new(
"pageserver_tokio_epoll_uring_systems_destroyed",
"counter of tokio-epoll-uring systems that were destroyed",
)
.unwrap();
descs.extend(
metrics::core::Collector::desc(&systems_destroyed)
.into_iter()
.cloned(),
);
Self {
descs,
systems_created,
systems_destroyed,
}
}
}
}
pub fn preinitialize_metrics() {
// Python tests need these and on some we do alerting.
//

View File

@@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection(
modification.commit(&ctx).await?;
uncommitted_records = 0;
filtered_records = 0;
//
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
// layer size can become much larger than `checkpoint_distance`.
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
// amount of data to key-value storage. So performing this check only after processing
// all WAL records in the chunk, can cause huge L0 layer files.
//
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
}
}

View File

@@ -28,9 +28,10 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
mod io_engine;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
mod open_options;
pub use io_engine::IoEngineKind;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use open_options::*;
///

View File

@@ -7,67 +7,100 @@
//!
//! Then use [`get`] and [`super::OpenOptions`].
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
pub(crate) use super::api::IoEngineKind;
#[derive(Clone, Copy)]
#[repr(u8)]
pub(crate) enum IoEngine {
NotSet,
StdFs,
#[cfg(target_os = "linux")]
TokioEpollUring,
}
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
#[cfg(not(test))]
pub(super) fn init(engine: IoEngineKind) {
if IO_ENGINE.set(engine).is_err() {
panic!("called twice");
impl From<IoEngineKind> for IoEngine {
fn from(value: IoEngineKind) -> Self {
match value {
IoEngineKind::StdFs => IoEngine::StdFs,
#[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => IoEngine::TokioEpollUring,
}
}
crate::metrics::virtual_file_io_engine::KIND
.with_label_values(&[&format!("{engine}")])
.set(1);
}
pub(super) fn get() -> &'static IoEngineKind {
#[cfg(test)]
{
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
}
},
Err(std::env::VarError::NotPresent) => {
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
.parse()
.unwrap()
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
impl TryFrom<u8> for IoEngine {
type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (IoEngine::NotSet as u8) => IoEngine::NotSet,
v if v == (IoEngine::StdFs as u8) => IoEngine::StdFs,
#[cfg(target_os = "linux")]
v if v == (IoEngine::TokioEpollUring as u8) => IoEngine::TokioEpollUring,
x => return Err(x),
})
}
#[cfg(not(test))]
IO_ENGINE.get().unwrap()
}
use std::os::unix::prelude::FileExt;
static IO_ENGINE: AtomicU8 = AtomicU8::new(IoEngine::NotSet as u8);
pub(crate) fn set(engine_kind: IoEngineKind) {
let engine: IoEngine = engine_kind.into();
IO_ENGINE.store(engine as u8, std::sync::atomic::Ordering::Relaxed);
#[cfg(not(test))]
{
let metric = &crate::metrics::virtual_file_io_engine::KIND;
metric.reset();
metric
.with_label_values(&[&format!("{engine_kind}")])
.set(1);
}
}
#[cfg(not(test))]
pub(super) fn init(engine_kind: IoEngineKind) {
set(engine_kind);
}
pub(super) fn get() -> IoEngine {
let cur = IoEngine::try_from(IO_ENGINE.load(Ordering::Relaxed)).unwrap();
if cfg!(test) {
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
match cur {
IoEngine::NotSet => {
let kind = match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
}
},
Err(std::env::VarError::NotPresent) => {
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
.parse()
.unwrap()
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
};
self::set(kind);
self::get()
}
x => x,
}
} else {
cur
}
}
use std::{
os::unix::prelude::FileExt,
sync::atomic::{AtomicU8, Ordering},
};
use super::FileGuard;
impl IoEngineKind {
impl IoEngine {
pub(super) async fn read_at<B>(
&self,
file_guard: FileGuard,
@@ -78,7 +111,8 @@ impl IoEngineKind {
B: tokio_epoll_uring::BoundedBufMut + Send,
{
match self {
IoEngineKind::StdFs => {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
@@ -96,7 +130,7 @@ impl IoEngineKind {
((file_guard, buf), res)
}
#[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => {
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(

View File

@@ -1,6 +1,6 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use super::IoEngineKind;
use super::io_engine::IoEngine;
use std::{os::fd::OwnedFd, path::Path};
#[derive(Debug, Clone)]
@@ -13,9 +13,10 @@ pub enum OpenOptions {
impl Default for OpenOptions {
fn default() -> Self {
match super::io_engine::get() {
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
IoEngine::NotSet => panic!("io engine not set"),
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
#[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => {
IoEngine::TokioEpollUring => {
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
}
}

View File

@@ -20,7 +20,7 @@ BENCHMARKS_DURATION_QUERY = """
FROM results
WHERE
started_at > CURRENT_DATE - INTERVAL '%s' day
AND parent_suite = 'test_runner.performance'
AND starts_with(parent_suite, 'test_runner.performance')
AND status = 'passed'
GROUP BY
parent_suite, suite, name
@@ -31,68 +31,75 @@ BENCHMARKS_DURATION_QUERY = """
# the total duration varies from 8 to 40 minutes.
# We use some pre-collected durations as a fallback to have a better distribution.
FALLBACK_DURATION = {
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 62.144,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 90.941,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 26.053,
"test_runner/performance/test_branching.py::test_compare_child_and_root_pgbench_perf": 25.67,
"test_runner/performance/test_branching.py::test_compare_child_and_root_read_perf": 14.497,
"test_runner/performance/test_branching.py::test_compare_child_and_root_write_perf": 18.852,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]": 26.572,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]": 6.259,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[10]": 21.206,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[1]": 3.474,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[5]": 11.262,
"test_runner/performance/test_bulk_update.py::test_bulk_update[100]": 94.225,
"test_runner/performance/test_bulk_update.py::test_bulk_update[10]": 68.159,
"test_runner/performance/test_bulk_update.py::test_bulk_update[50]": 76.719,
"test_runner/performance/test_compaction.py::test_compaction": 110.222,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[neon-5-10-100]": 10.743,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[vanilla-5-10-100]": 16.541,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[neon-5-10-100]": 11.109,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[vanilla-5-10-100]": 18.121,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[neon-5-10-100]": 11.3,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[vanilla-5-10-100]": 16.086,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-10]": 12.024,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-1]": 11.14,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-10]": 10.375,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-1]": 10.075,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[neon-5-10-100]": 11.147,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[vanilla-5-10-100]": 16.321,
"test_runner/performance/test_copy.py::test_copy[neon]": 16.579,
"test_runner/performance/test_copy.py::test_copy[vanilla]": 10.094,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 590.157,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.102,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 8.677,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 31.079,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[vanilla-1]": 38.119,
"test_runner/performance/test_layer_map.py::test_layer_map": 24.784,
"test_runner/performance/test_logical_replication.py::test_logical_replication": 117.707,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[neon]": 21.194,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[vanilla]": 59.068,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[neon]": 73.235,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[vanilla]": 82.586,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[neon-45-10]": 106.536,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[vanilla-45-10]": 98.753,
"test_runner/performance/test_random_writes.py::test_random_writes[neon]": 6.975,
"test_runner/performance/test_random_writes.py::test_random_writes[vanilla]": 3.69,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-100000-100-0]": 3.529,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-0]": 64.522,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-4]": 40.964,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.55,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 12.189,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.899,
"test_runner/performance/test_startup.py::test_startup_simple": 2.51,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 527.245,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 583.46,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[vanilla-10-5-5]": 113.653,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_off-1000]": 233.728,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_on-1000]": 419.093,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[vanilla-1000]": 982.461,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_off-45-100]": 116.522,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_on-45-100]": 115.583,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[vanilla-45-100]": 155.282,
"test_runner/performance/test_write_amplification.py::test_write_amplification[neon]": 26.704,
"test_runner/performance/test_write_amplification.py::test_write_amplification[vanilla]": 16.088,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-13-30]": 400.15,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[1-6-30]": 372.521,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-13-30]": 420.017,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[10-6-30]": 373.769,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-13-30]": 678.742,
"test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py::test_pageserver_max_throughput_getpage_at_latest_lsn[100-6-30]": 512.135,
"test_runner/performance/test_branch_creation.py::test_branch_creation_heavy_write[20]": 58.036,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many_relations": 22.104,
"test_runner/performance/test_branch_creation.py::test_branch_creation_many[1024]": 126.073,
"test_runner/performance/test_branching.py::test_compare_child_and_root_pgbench_perf": 25.759,
"test_runner/performance/test_branching.py::test_compare_child_and_root_read_perf": 6.885,
"test_runner/performance/test_branching.py::test_compare_child_and_root_write_perf": 8.758,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]": 18.275,
"test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]": 9.533,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[1]": 12.09,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[10]": 35.145,
"test_runner/performance/test_bulk_tenant_create.py::test_bulk_tenant_create[5]": 22.28,
"test_runner/performance/test_bulk_update.py::test_bulk_update[10]": 66.353,
"test_runner/performance/test_bulk_update.py::test_bulk_update[100]": 75.487,
"test_runner/performance/test_bulk_update.py::test_bulk_update[50]": 54.142,
"test_runner/performance/test_compaction.py::test_compaction": 110.715,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[neon-5-10-100]": 11.68,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_ro_with_pgbench_select_only[vanilla-5-10-100]": 16.384,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[neon-5-10-100]": 11.315,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_rw_with_pgbench_default[vanilla-5-10-100]": 18.783,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[neon-5-10-100]": 11.647,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wal_with_pgbench_default[vanilla-5-10-100]": 17.04,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-1]": 11.01,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[neon-10-10]": 11.902,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-1]": 10.077,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_heavy_write[vanilla-10-10]": 10.4,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[neon-5-10-100]": 11.33,
"test_runner/performance/test_compare_pg_stats.py::test_compare_pg_stats_wo_with_pgbench_simple_update[vanilla-5-10-100]": 16.434,
"test_runner/performance/test_copy.py::test_copy[neon]": 13.817,
"test_runner/performance/test_copy.py::test_copy[vanilla]": 11.736,
"test_runner/performance/test_gc_feedback.py::test_gc_feedback": 575.735,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[neon]": 14.868,
"test_runner/performance/test_gist_build.py::test_gist_buffering_build[vanilla]": 14.393,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[neon-1]": 20.588,
"test_runner/performance/test_latency.py::test_measure_read_latency_heavy_write_workload[vanilla-1]": 30.849,
"test_runner/performance/test_layer_map.py::test_layer_map": 39.378,
"test_runner/performance/test_lazy_startup.py::test_lazy_startup": 2848.938,
"test_runner/performance/test_logical_replication.py::test_logical_replication": 120.952,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[neon]": 35.552,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_different_tables[vanilla]": 66.762,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[neon]": 85.177,
"test_runner/performance/test_parallel_copy_to.py::test_parallel_copy_same_table[vanilla]": 92.12,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[neon-45-10]": 107.009,
"test_runner/performance/test_perf_pgbench.py::test_pgbench[vanilla-45-10]": 99.582,
"test_runner/performance/test_random_writes.py::test_random_writes[neon]": 4.737,
"test_runner/performance/test_random_writes.py::test_random_writes[vanilla]": 2.686,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-100000-100-0]": 3.271,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-0]": 50.719,
"test_runner/performance/test_seqscans.py::test_seqscans[neon-10000000-1-4]": 15.992,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-100000-100-0]": 0.566,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-0]": 13.542,
"test_runner/performance/test_seqscans.py::test_seqscans[vanilla-10000000-1-4]": 13.35,
"test_runner/performance/test_startup.py::test_startup_simple": 13.043,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_off-10-5-5]": 194.841,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[neon_on-10-5-5]": 286.667,
"test_runner/performance/test_wal_backpressure.py::test_heavy_write_workload[vanilla-10-5-5]": 85.577,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_off-1000]": 297.626,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[neon_on-1000]": 646.187,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_intensive_init_workload[vanilla-1000]": 989.776,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_off-45-100]": 125.638,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[neon_on-45-100]": 123.554,
"test_runner/performance/test_wal_backpressure.py::test_pgbench_simple_update_workload[vanilla-45-100]": 190.083,
"test_runner/performance/test_write_amplification.py::test_write_amplification[neon]": 21.016,
"test_runner/performance/test_write_amplification.py::test_write_amplification[vanilla]": 23.028,
}

View File

@@ -3131,10 +3131,7 @@ class Endpoint(PgProtocol):
log.info(json.dumps(dict(data_dict, **kwargs)))
json.dump(dict(data_dict, **kwargs), file, indent=4)
# Please note: if you didn't respec this endpoint to have the `migrations`
# feature, this function will probably fail because neon_migration.migration_id
# won't exist. This is temporary - soon we'll get rid of the feature flag and
# migrations will be enabled for everyone.
# Please note: Migrations only run if pg_skip_catalog_updates is false
def wait_for_migrations(self):
with self.cursor() as cur:

View File

@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
self,
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
timestamp,
timestamp: datetime,
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
)
self.verbose_error(res)
res_json = res.json()

View File

@@ -26,86 +26,81 @@ from fixtures.neon_fixtures import NeonEnvBuilder
# apply during config step, like more users, databases, or extensions. By default
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
# test we only load neon.
@pytest.mark.timeout(1000)
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
@pytest.mark.timeout(1800)
@pytest.mark.parametrize("slru", ["lazy", "eager"])
def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
lazy_tenant, _ = env.neon_cli.create_tenant(
lazy_slru_download = "true" if slru == "lazy" else "false"
tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": "true",
"lazy_slru_download": lazy_slru_download,
}
)
eager_tenant, _ = env.neon_cli.create_tenant(
conf={
"lazy_slru_download": "false",
}
)
tenants = [lazy_tenant, eager_tenant]
slru = "lazy"
for tenant in tenants:
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
endpoint.safe_psql(
"""
CREATE PROCEDURE updating() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..10000000 LOOP
UPDATE t SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
"""
)
endpoint.safe_psql("SET statement_timeout=0")
endpoint.safe_psql("call updating()")
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
cur.execute("ALTER TABLE t SET (autovacuum_enabled = false)")
cur.execute("INSERT INTO t VALUES (1, 0)")
cur.execute(
"""
CREATE PROCEDURE updating() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..1000000 LOOP
UPDATE t SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
"""
)
cur.execute("SET statement_timeout=0")
cur.execute("call updating()")
endpoint.stop()
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
endpoint.start()
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
assert sum == 1000000
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
"basebackup_ms": f"{slru}_{i}_basebackup",
"start_postgres_ms": f"{slru}_{i}_start_postgres",
"config_ms": f"{slru}_{i}_config",
"total_startup_ms": f"{slru}_{i}_total_startup",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
basebackup_bytes = metrics["basebackup_bytes"]
zenbenchmark.record(
f"{slru}_{i}_basebackup_bytes",
basebackup_bytes,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)
# Stop so we can restart
endpoint.stop()
# We do two iterations so we can see if the second startup is faster. It should
# be because the compute node should already be configured with roles, databases,
# extensions, etc from the first run.
for i in range(2):
# Start
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
endpoint.start()
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
assert sum == 10000000
# Get metrics
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
durations = {
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
"basebackup_ms": f"{slru}_{i}_basebackup",
"start_postgres_ms": f"{slru}_{i}_start_postgres",
"config_ms": f"{slru}_{i}_config",
"total_startup_ms": f"{slru}_{i}_total_startup",
}
for key, name in durations.items():
value = metrics[key]
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
basebackup_bytes = metrics["basebackup_bytes"]
zenbenchmark.record(
f"{slru}_{i}_basebackup_bytes",
basebackup_bytes,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)
# Stop so we can restart
endpoint.stop()
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
slru = "eager"
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)

View File

@@ -0,0 +1,66 @@
import os
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
logical_replication_sync,
)
from fixtures.pg_version import PgVersion
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
if env.pg_version != PgVersion.V16:
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
endpoint = env.endpoints.create_start(
"test_logical_replication", config_lines=["log_statement=all"]
)
log.info("postgres is running on 'test_logical_replication' branch")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# create table...
cur.execute("create table t(pk integer primary key)")
cur.execute("create publication pub1 for table t")
# Create slot to hold WAL
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
# now start subscriber
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key)")
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
cur.execute(
"""create or replace function create_snapshots(n integer) returns void as $$
declare
i integer;
begin
for i in 1..n loop
perform pg_log_standby_snapshot();
end loop;
end; $$ language plpgsql"""
)
cur.execute("set statement_timeout=0")
cur.execute("select create_snapshots(10000)")
# Wait logical replication to sync
logical_replication_sync(vanilla_pg, endpoint)
time.sleep(10)
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, env.initial_tenant, timeline
)
log.info(f"Check {timeline_path}")
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
assert os.path.getsize(timeline_path + filename) < 512_000_000

View File

@@ -64,18 +64,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Check edge cases
# Timestamp is in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
assert result["kind"] == "future"
# make sure that we return a well advanced lsn here
assert Lsn(result["lsn"]) > start_lsn
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) < start_lsn
@@ -83,9 +79,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
assert result["kind"] not in ["past", "nodata"]
lsn = result["lsn"]
# Call get_lsn_by_timestamp to get the LSN
@@ -108,9 +102,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
)
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) >= last_flush_lsn

View File

@@ -10,7 +10,7 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint = env.endpoints.create("test_migrations")
log_path = endpoint.endpoint_path() / "compute.log"
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
endpoint.wait_for_migrations()

View File

@@ -12,10 +12,10 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
env.neon_cli.create_branch("test_neon_superuser_subscriber")
sub = env.endpoints.create("test_neon_superuser_subscriber")
pub.respec(skip_pg_catalog_updates=False, features=["migrations"])
pub.respec(skip_pg_catalog_updates=False)
pub.start()
sub.respec(skip_pg_catalog_updates=False, features=["migrations"])
sub.respec(skip_pg_catalog_updates=False)
sub.start()
pub.wait_for_migrations()

View File

@@ -310,7 +310,7 @@ def test_sharding_service_compute_hook(
notifications.append(request.json)
return Response(status=200)
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start()

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a",
"postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535",
"postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88"
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
}

View File

@@ -29,7 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
diesel = { version = "2", features = ["postgres", "serde_json"] }
diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] }
@@ -90,6 +90,7 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }