Compare commits

...

41 Commits

Author SHA1 Message Date
Andrey Taranik
b66d4e0bdc Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-20 15:45:42 +03:00
John Spray
02a28c01ca Revert "safekeeper: check for non-consecutive writes in safekeeper.rs" (#8771)
Reverts neondatabase/neon#8640

This broke `test_last_log_term_switch` via a merge race of some kind.
2024-08-20 11:34:53 +00:00
Alexander Bayandin
c96593b473 Make Postgres 16 default version (#8745)
## Problem

The default Postgres version is set to 15 in code, while we use 16 in
most of the other places (and Postgres 17 is coming)

## Summary of changes
- Run `benchmarks` job with Postgres 16 (instead of Postgres 14)
- Set `DEFAULT_PG_VERSION` to 16 in all places
- Remove deprecated `--pg-version` pytest argument
- Update `test_metadata_bincode_serde_ensure_roundtrip` for Postgres 16
2024-08-20 10:46:58 +01:00
Christian Schwarz
ef57e73fbf task_mgr::spawn: require a TenantId (#8462)
… to dis-incentivize global tasks via task_mgr in the future

(As of https://github.com/neondatabase/neon/pull/8339 all remaining
task_mgr usage is tenant or timeline scoped.)
2024-08-20 08:26:44 +00:00
Arseny Sher
4c5a0fdc75 safekeeper: check for non-consecutive writes in safekeeper.rs
wal_storage.rs already checks this, but since this is a quite legit scenario
check it at safekeeper.rs (consensus level) as well.

ref https://github.com/neondatabase/neon/issues/8212
2024-08-20 07:12:56 +03:00
Arpad Müller
4b26783c94 scrubber: remove _generic postfix and two unused functions (#8761)
Removes the `_generic` postfix from the `GenericRemoteStorage` using
APIs, as `remote_storage` is the "default" now, and add a `_s3` postfix
to the remaining APIs using the S3 SDK (only in tenant snapshot). Also,
remove two unused functions: `list_objects_with_retries` and
`stream_tenants functions`.

Part of https://github.com/neondatabase/neon/issues/7547
2024-08-19 23:58:47 +02:00
Andrey Taranik
6d9fd2bbb5 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-20 00:27:01 +03:00
Arpad Müller
6949b45e17 Update aws -> infra for repo rename (#8755)
See slack thread:
https://neondb.slack.com/archives/C039YKBRZB4/p1722501766006179
2024-08-19 17:44:10 +02:00
Arpad Müller
3b8ca477ab Migrate physical GC and scan_metadata to remote_storage (#8673)
Migrates most of the remaining parts of the scrubber to remote_storage:

* `pageserver_physical_gc`
* `scan_metadata` for pageservers (safekeepers were done in #8595)
* `download()` in `tenant_snapshot`. The main `tenant_snapshot` is not
migrated as it uses version history to be able to work in the face of
ongoing changes.
 
Part of #7547
2024-08-19 16:39:44 +02:00
Christian Schwarz
eb7241c798 l0_flush: remove support for mode page-cached (#8739)
It's been rolled out everywhere, no configs are referencing it.

All code that's made dead by the removal of the config option is removed
as part of this PR.

The `page_caching::PreWarmingWriter` in `::No` mode is equivalent to a
`size_tracking_writer`, so, use that.

part of https://github.com/neondatabase/neon/issues/7418
2024-08-19 16:35:34 +02:00
Andrey Taranik
e5d9c003f5 try new qemu based runners 2024-08-19 17:10:17 +03:00
Andrey Taranik
290ce3ed46 try aws arm64 8 core 2024-08-18 02:45:55 +03:00
Andrey Taranik
1138e286b9 try aws arm64 16core runners 2024-08-17 23:04:50 +03:00
Andrey Taranik
9173847f81 try 80 core metal again 2024-08-17 20:20:07 +03:00
Andrey Taranik
6ae62574d0 try 16 core hcloud again 2024-08-17 19:34:31 +03:00
Andrey Taranik
5e074d8536 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-17 17:49:47 +03:00
Andrey Taranik
3a22daf33e runner lables fix 2024-08-17 16:37:17 +03:00
Andrey Taranik
f40f627730 tey aws arm64 runners 2024-08-17 16:30:24 +03:00
Andrey Taranik
56b94b7d1b return to large-arm64 2024-08-16 14:18:01 +03:00
Andrey Taranik
fe445b2945 more parallelism 2024-08-16 13:21:19 +03:00
Andrey Taranik
7f49f45a53 tune parallelism 2024-08-16 13:08:36 +03:00
Andrey Taranik
980b212789 try more parallelism 2024-08-16 11:56:11 +03:00
Andrey Taranik
2b17a03911 arm64 80 cores debian 12 2024-08-16 10:20:23 +03:00
Andrey Taranik
b50a9d84d4 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-16 09:06:09 +03:00
Andrey Taranik
6fd3c9daa5 trigger build 2024-08-15 20:27:24 +03:00
Andrey Taranik
409171ab08 try 16 cores 2024-08-15 16:46:57 +03:00
Andrey Taranik
f26240c9dc Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 16:46:00 +03:00
Andrey Taranik
1c771267ab Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 16:15:17 +03:00
Andrey Taranik
9f1b7b72ed Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 14:56:37 +03:00
Andrey Taranik
2476f7ef74 try arm64 with 80 cores 2024-08-15 14:56:14 +03:00
Andrey Taranik
f555cb3970 try cloud arm64 servers 2024-08-15 03:36:00 +03:00
Andrey Taranik
10a726503a Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-15 03:34:07 +03:00
Andrey Taranik
7ba86e15fa debug arm64 builds 2024-08-14 18:57:28 +03:00
Andrey Taranik
7ba42bfdb4 Merge branch 'main' into cicd/debug-regress-tests-on-arm 2024-08-14 18:33:19 +03:00
Andrey Taranik
4f8a39d6c6 try metal arm64 runners 2024-08-14 17:46:13 +03:00
Alexander Bayandin
54c5da3981 CI(build-and-test): set RUSTFLAGS for ARM 2024-08-14 13:57:20 +01:00
Alexander Bayandin
c1378dc43b CI: don't collect code coverage on arm64 runners 2024-08-14 13:53:16 +01:00
Alexander Bayandin
50b9fb430a test_runner: add __arch parameter to Allure report 2024-08-14 13:53:16 +01:00
Alexander Bayandin
486eaba028 CI(build-and-test): run regression tests on arm 2024-08-14 13:53:16 +01:00
Alexander Bayandin
d4d70cc314 CI(build-and-test): make pg-versions configurable 2024-08-14 13:53:16 +01:00
Alexander Bayandin
176eefa47a CI(regress-tests): run debug builds only with the latest Postgres version 2024-08-14 13:53:16 +01:00
46 changed files with 499 additions and 750 deletions

View File

@@ -6,6 +6,10 @@ self-hosted-runner:
- small
- small-arm64
- us-east-2
- aws-arm64-8core
- aws-arm64-16core
- aws-arm64-32core
- qemu-arm64
config-variables:
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB

View File

@@ -43,7 +43,7 @@ inputs:
pg_version:
description: 'Postgres version to use for tests'
required: false
default: 'v14'
default: 'v16'
benchmark_durations:
description: 'benchmark durations JSON'
required: false
@@ -131,7 +131,7 @@ runs:
fi
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n sets the number of parallel processes that pytest-xdist will run
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
EXTRA_PARAMS="-n auto $EXTRA_PARAMS"
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
# to the same worker to make @pytest.mark.order work with xdist
@@ -169,10 +169,8 @@ runs:
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
cov_prefix=()
else
cov_prefix=()
fi

View File

@@ -36,7 +36,7 @@ env:
jobs:
build-neon:
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
@@ -94,11 +94,16 @@ jobs:
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
# corresponding Cargo.toml files for their descriptions.
- name: Set env variables
env:
ARCH: ${{ inputs.arch }}
run: |
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" ]]; then
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked --release"
@@ -110,6 +115,11 @@ jobs:
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
} >> $GITHUB_ENV
# See https://github.com/aws/aws-graviton-getting-started/blob/57dc813626d0266f1cc12ef83474745bb1f31fb4/rust.md
- name: Set RUSTFLAGS for ARM
if: inputs.arch == 'arm64'
run: echo "RUSTFLAGS=-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1" >> $GITHUB_ENV
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v4
@@ -153,11 +163,13 @@ jobs:
run: |
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
export PQ_LIB_DIR
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
- name: Install rust binaries
env:
ARCH: ${{ inputs.arch }}
run: |
# Install target binaries
mkdir -p /tmp/neon/bin/
@@ -172,7 +184,7 @@ jobs:
done
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
@@ -243,10 +255,10 @@ jobs:
uses: ./.github/actions/save-coverage-data
regress-tests:
# Run test on x64 only
if: inputs.arch == 'x64'
# Don't run regression tests on debug arm64 builds
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
needs: [ build-neon ]
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:

View File

@@ -198,7 +198,7 @@ jobs:
strategy:
fail-fast: false
matrix:
arch: [ x64 ]
arch: [ x64, arm64 ]
# Do not build or run tests in debug for release branches
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
include:
@@ -280,6 +280,7 @@ jobs:
save_perf_report: ${{ github.ref_name == 'main' }}
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
pg_version: v16
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
@@ -985,10 +986,10 @@ jobs:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/azure run deploy.yml -f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
-f deployPgSniRouter=false \
-f deployProxy=false \
-f deployStorage=true \
@@ -998,14 +999,14 @@ jobs:
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
gh workflow --repo neondatabase/infra run deploy-prod.yml --ref main \
-f deployStorage=true \
-f deployStorageBroker=true \
-f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
-f deployStorage=false \
@@ -1015,7 +1016,7 @@ jobs:
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
gh workflow --repo neondatabase/infra run deploy-proxy-prod.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
-f branch=main \

View File

@@ -262,7 +262,7 @@ By default, this runs both debug and release modes, and all supported postgres v
testing locally, it is convenient to run just one set of permutations, like this:
```sh
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest
```
## Flamegraphs

View File

@@ -54,7 +54,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PG_VERSION: &str = "16";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";

View File

@@ -27,7 +27,7 @@ use crate::pageserver::PageServerNode;
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 15;
pub const DEFAULT_PG_VERSION: u32 = 16;
//
// This data structures represents neon_local CLI config

View File

@@ -14,7 +14,7 @@ picked tenant (which requested on-demand activation) for around 30 seconds
during the restart at 2024-04-03 16:37 UTC.
Note that lots of shutdowns on loaded pageservers do not finish within the
[10 second systemd enforced timeout](https://github.com/neondatabase/aws/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
[10 second systemd enforced timeout](https://github.com/neondatabase/infra/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
and have to reingest data in order to serve requests after restarting, potentially making first request latencies worse.
This problem is not yet very acutely felt in storage controller managed pageservers since

View File

@@ -383,6 +383,48 @@ impl RemoteStorage for AzureBlobStorage {
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
let properties_future = blob_client.get_properties().into_future();
let properties_future = tokio::time::timeout(self.timeout, properties_future);
let res = tokio::select! {
res = properties_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
if let Ok(inner) = &res {
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, inner, started_at);
}
let data = match res {
Ok(Ok(data)) => Ok(data),
Ok(Err(sdk)) => Err(to_download_error(sdk)),
Err(_timeout) => Err(DownloadError::Timeout),
}?;
let properties = data.blob.properties;
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::from(properties.last_modified),
size: properties.content_length,
})
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -150,7 +150,7 @@ pub enum ListingMode {
NoDelimiter,
}
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct ListingObject {
pub key: RemotePath,
pub last_modified: SystemTime,
@@ -215,6 +215,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}
/// Obtain metadata information about an object.
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -363,6 +370,20 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
// See [`RemoteStorage::head_object`].
pub async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
match self {
Self::LocalFs(s) => s.head_object(key, cancel).await,
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
}
}
/// See [`RemoteStorage::upload`]
pub async fn upload(
&self,
@@ -598,6 +619,7 @@ impl ConcurrencyLimiter {
RequestKind::Delete => &self.write,
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
}
}

View File

@@ -445,6 +445,20 @@ impl RemoteStorage for LocalFs {
}
}
async fn head_object(
&self,
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
key: key.clone(),
last_modified: metadata.modified()?,
size: metadata.len(),
})
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,

View File

@@ -13,6 +13,7 @@ pub(crate) enum RequestKind {
List = 3,
Copy = 4,
TimeTravel = 5,
Head = 6,
}
use scopeguard::ScopeGuard;
@@ -27,6 +28,7 @@ impl RequestKind {
List => "list_objects",
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
}
}
const fn as_index(&self) -> usize {
@@ -34,7 +36,8 @@ impl RequestKind {
}
}
pub(crate) struct RequestTyped<C>([C; 6]);
const REQUEST_KIND_COUNT: usize = 7;
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
impl<C> RequestTyped<C> {
pub(crate) fn get(&self, kind: RequestKind) -> &C {
@@ -43,8 +46,8 @@ impl<C> RequestTyped<C> {
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
let arr = std::array::from_fn::<C, 6, _>(|index| {
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)

View File

@@ -23,7 +23,7 @@ use aws_config::{
use aws_sdk_s3::{
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
operation::get_object::GetObjectError,
operation::{get_object::GetObjectError, head_object::HeadObjectError},
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
Client,
};
@@ -604,6 +604,78 @@ impl RemoteStorage for S3Bucket {
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let head_future = self
.client
.head_object()
.bucket(self.bucket_name())
.key(self.relative_path_to_s3_object(key))
.send();
let head_future = tokio::time::timeout(self.timeout, head_future);
let res = tokio::select! {
res = head_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
let res = res.map_err(|_e| DownloadError::Timeout)?;
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
let data = match res {
Ok(object_output) => object_output,
Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
// e.g. when probing for timeline indices.
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Ok,
started_at,
);
return Err(DownloadError::NotFound);
}
Err(e) => {
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
return Err(DownloadError::Other(
anyhow::Error::new(e).context("s3 head object"),
));
}
};
let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
return Err(DownloadError::Other(anyhow!(
"head_object doesn't contain last_modified or content_length"
)))?;
};
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::try_from(last_modified).map_err(|e| {
DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
})?,
size: size as u64,
})
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -30,6 +30,7 @@ pub struct UnreliableWrapper {
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
ListPrefixes(Option<RemotePath>),
HeadObject(RemotePath),
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
@@ -137,6 +138,16 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<crate::ListingObject, DownloadError> {
self.attempt(RemoteOp::HeadObject(key.clone()))
.map_err(DownloadError::Other)?;
self.inner.head_object(key, cancel).await
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -1,15 +1,10 @@
use std::{num::NonZeroUsize, sync::Arc};
use crate::tenant::ephemeral_file;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
PageCached,
#[serde(rename_all = "snake_case")]
Direct {
max_concurrency: NonZeroUsize,
},
Direct { max_concurrency: NonZeroUsize },
}
impl Default for L0FlushConfig {
@@ -25,14 +20,12 @@ impl Default for L0FlushConfig {
pub struct L0FlushGlobalState(Arc<Inner>);
pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
@@ -44,13 +37,3 @@ impl L0FlushGlobalState {
&self.0
}
}
impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}

View File

@@ -49,7 +49,7 @@ use tracing::{info, info_span};
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 15;
pub const DEFAULT_PG_VERSION: u32 = 16;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -393,7 +393,7 @@ struct PageServerTask {
/// Tasks may optionally be launched for a particular tenant/timeline, enabling
/// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
tenant_shard_id: Option<TenantShardId>,
tenant_shard_id: TenantShardId,
timeline_id: Option<TimelineId>,
mutable: Mutex<MutableTaskState>,
@@ -405,7 +405,7 @@ struct PageServerTask {
pub fn spawn<F>(
runtime: &tokio::runtime::Handle,
kind: TaskKind,
tenant_shard_id: Option<TenantShardId>,
tenant_shard_id: TenantShardId,
timeline_id: Option<TimelineId>,
name: &str,
future: F,
@@ -550,7 +550,7 @@ pub async fn shutdown_tasks(
let tasks = TASKS.lock().unwrap();
for task in tasks.values() {
if (kind.is_none() || Some(task.kind) == kind)
&& (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
&& (tenant_shard_id.is_none() || Some(task.tenant_shard_id) == tenant_shard_id)
&& (timeline_id.is_none() || task.timeline_id == timeline_id)
{
task.cancel.cancel();
@@ -573,13 +573,8 @@ pub async fn shutdown_tasks(
};
if let Some(mut join_handle) = join_handle {
if log_all {
if tenant_shard_id.is_none() {
// there are quite few of these
info!(name = task.name, kind = ?task_kind, "stopping global task");
} else {
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
// warn to catch these in tests; there shouldn't be any
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
}
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
.await

View File

@@ -798,7 +798,7 @@ impl Tenant {
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
Some(tenant_shard_id),
tenant_shard_id,
None,
"attach tenant",
async move {

View File

@@ -21,7 +21,6 @@ pub struct EphemeralFile {
}
mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
mod zero_padded_read_write;
impl EphemeralFile {
@@ -52,12 +51,10 @@ impl EphemeralFile {
)
.await?;
let prewarm = conf.l0_flush.prewarm_on_write();
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, prewarm, gate_guard),
rw: page_caching::RW::new(file, gate_guard),
})
}

View File

@@ -1,15 +1,15 @@
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
//!
//! Subject to removal in <https://github.com/neondatabase/neon/pull/8537>
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::BlockLease;
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::io::{self, ErrorKind};
use std::ops::{Deref, Range};
use std::io::{self};
use tokio_epoll_uring::BoundedBuf;
use tracing::*;
@@ -18,33 +18,17 @@ use super::zero_padded_read_write;
/// See module-level comment.
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
rw: super::zero_padded_read_write::RW<size_tracking_writer::Writer<VirtualFile>>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
/// should we pre-warm the [`crate::page_cache`] with the contents?
#[derive(Clone, Copy)]
pub enum PrewarmOnWrite {
Yes,
No,
}
impl RW {
pub fn new(
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
_gate_guard: utils::sync::gate::GateGuard,
) -> Self {
pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
page_cache_file_id,
file,
prewarm_on_write,
)),
rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)),
_gate_guard,
}
}
@@ -84,10 +68,10 @@ impl RW {
let vec = Vec::with_capacity(size);
// read from disk what we've already flushed
let writer = self.rw.as_writer();
let flushed_range = writer.written_range();
let mut vec = writer
.file
let file_size_tracking_writer = self.rw.as_writer();
let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap();
let mut vec = file_size_tracking_writer
.as_inner()
.read_exact_at(
vec.slice(0..(flushed_range.end - flushed_range.start)),
u64::try_from(flushed_range.start).unwrap(),
@@ -122,7 +106,7 @@ impl RW {
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.rw.as_writer().file.path,
self.rw.as_writer().as_inner().path,
e,
),
)
@@ -132,7 +116,7 @@ impl RW {
}
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = writer
.file
.as_inner()
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
.await?;
let read_guard = write_guard.mark_valid();
@@ -154,137 +138,16 @@ impl Drop for RW {
// unlink the file
// we are clear to do this, because we have entered a gate
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
let path = &self.rw.as_writer().as_inner().path;
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!(
"could not remove ephemeral file '{}': {}",
self.rw.as_writer().file.path,
e
);
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
}
struct PreWarmingWriter {
prewarm_on_write: PrewarmOnWrite,
nwritten_blocks: u32,
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
}
impl PreWarmingWriter {
fn new(
page_cache_file_id: page_cache::FileId,
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
) -> Self {
Self {
prewarm_on_write,
nwritten_blocks: 0,
page_cache_file_id,
file,
}
}
/// Return the byte range within `file` that has been written though `write_all`.
///
/// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
struct Wrapper(Range<usize>);
impl Deref for Wrapper {
type Target = Range<usize>;
fn deref(&self) -> &Range<usize> {
&self.0
}
}
Wrapper(0..nwritten_blocks * PAGE_SZ)
}
}
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
async fn write_all<Buf: tokio_epoll_uring::IoBuf + Send>(
&mut self,
buf: FullSlice<Buf>,
ctx: &RequestContext,
) -> std::io::Result<(usize, FullSlice<Buf>)> {
let buflen = buf.len();
assert_eq!(
buflen % PAGE_SZ,
0,
"{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
);
// Do the IO.
let buf = match self.file.write_all(buf, ctx).await {
(buf, Ok(nwritten)) => {
assert_eq!(nwritten, buflen);
buf
}
(_, Err(e)) => {
return Err(std::io::Error::new(
ErrorKind::Other,
// order error before path because path is long and error is short
format!(
"ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
self.nwritten_blocks, buflen, e, self.file.path,
),
));
}
};
let nblocks = buflen / PAGE_SZ;
let nblocks32 = u32::try_from(nblocks).unwrap();
if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
// Pre-warm page cache with the contents.
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
// benefits the code that writes InMemoryLayer=>L0 layers.
let cache = page_cache::get();
static CTX: Lazy<RequestContext> = Lazy::new(|| {
RequestContext::new(
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
crate::context::DownloadBehavior::Error,
)
});
for blknum_in_buffer in 0..nblocks {
let blk_in_buffer =
&buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
let blknum = self
.nwritten_blocks
.checked_add(blknum_in_buffer as u32)
.unwrap();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
.await
{
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
Ok(v) => match v {
page_cache::ReadBufResult::Found(_guard) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
and this function takes &mut self, so, no concurrent read_blk is possible");
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(blk_in_buffer);
let _ = write_guard.mark_valid();
}
},
}
}
}
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
Ok((buflen, buf))
}
}

View File

@@ -565,7 +565,7 @@ mod tests {
);
let expected_bytes = vec![
/* TimelineMetadataHeader */
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
74, 104, 158, 105, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
/* TimelineMetadataBodyV2 */
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
@@ -574,7 +574,7 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
0, 0, 0, 15, // pg_version (4 bytes)
0, 0, 0, 16, // pg_version (4 bytes)
/* padding bytes */
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

View File

@@ -1728,7 +1728,7 @@ impl RemoteTimelineClient {
task_mgr::spawn(
&self.runtime,
TaskKind::RemoteUploadTask,
Some(self.tenant_shard_id),
self.tenant_shard_id,
Some(self.timeline_id),
"remote upload",
async move {

View File

@@ -13,7 +13,7 @@ use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache, walrecord};
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -249,9 +249,7 @@ impl InMemoryLayer {
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().await;
pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
let end_str = self.end_lsn_or_max();
println!(
@@ -259,39 +257,6 @@ impl InMemoryLayer {
self.timeline_id, self.start_lsn, end_str,
);
if !verbose {
return Ok(());
}
let cursor = inner.file.block_cursor();
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
write!(
&mut desc,
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)?;
}
Err(err) => {
write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
}
}
println!(" key {} at {}: {}", key, lsn, desc);
}
}
Ok(())
}
@@ -536,7 +501,6 @@ impl InMemoryLayer {
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
@@ -568,34 +532,6 @@ impl InMemoryLayer {
.await?;
match l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let mut buf = Vec::new();
let cursor = inner.file.block_cursor();
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
let will_init = Value::des(&buf)?.will_init();
let (tmp, res) = delta_layer_writer
.put_value_bytes(
Key::from_compact(*key),
*lsn,
buf.slice_len(),
will_init,
&ctx,
)
.await;
res?;
buf = tmp.into_raw_slice().into_inner();
}
}
}
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
assert_eq!(

View File

@@ -98,7 +98,7 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_shard_id),
tenant_shard_id,
None,
&format!("compactor for tenant {tenant_shard_id}"),
{
@@ -121,7 +121,7 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
tenant_shard_id,
None,
&format!("garbage collector for tenant {tenant_shard_id}"),
{
@@ -144,7 +144,7 @@ pub fn start_background_loops(
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::IngestHousekeeping,
Some(tenant_shard_id),
tenant_shard_id,
None,
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
{

View File

@@ -2281,7 +2281,7 @@ impl Timeline {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_shard_id),
self.tenant_shard_id,
Some(self.timeline_id),
"layer flush task",
async move {
@@ -2635,7 +2635,7 @@ impl Timeline {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_shard_id),
self.tenant_shard_id,
Some(self.timeline_id),
"initial size calculation",
// NB: don't log errors here, task_mgr will do that.
@@ -2803,7 +2803,7 @@ impl Timeline {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
Some(self.tenant_shard_id),
self.tenant_shard_id,
Some(self.timeline_id),
"ondemand logical size calculation",
async move {
@@ -5162,7 +5162,7 @@ impl Timeline {
let task_id = task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::DownloadAllRemoteLayers,
Some(self.tenant_shard_id),
self.tenant_shard_id,
Some(self.timeline_id),
"download all remote layers task",
async move {

View File

@@ -395,7 +395,7 @@ impl DeleteTimelineFlow {
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_shard_id),
tenant_shard_id,
Some(timeline_id),
"timeline_delete",
async move {

View File

@@ -60,7 +60,7 @@ impl Timeline {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Eviction,
Some(self.tenant_shard_id),
self.tenant_shard_id,
Some(self.timeline_id),
&format!(
"layer eviction for {}/{}",

View File

@@ -18,6 +18,7 @@ import psycopg2
from psycopg2.extras import execute_values
CREATE_TABLE = """
CREATE TYPE arch AS ENUM ('ARM64', 'X64', 'UNKNOWN');
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
parent_suite TEXT NOT NULL,
@@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS results (
stopped_at TIMESTAMPTZ NOT NULL,
duration INT NOT NULL,
flaky BOOLEAN NOT NULL,
arch arch DEFAULT 'X64',
build_type TEXT NOT NULL,
pg_version INT NOT NULL,
run_id BIGINT NOT NULL,
@@ -35,7 +37,7 @@ CREATE TABLE IF NOT EXISTS results (
reference TEXT NOT NULL,
revision CHAR(40) NOT NULL,
raw JSONB COMPRESSION lz4 NOT NULL,
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
);
"""
@@ -50,6 +52,7 @@ class Row:
stopped_at: datetime
duration: int
flaky: bool
arch: str
build_type: str
pg_version: int
run_id: int
@@ -121,6 +124,14 @@ def ingest_test_result(
raw.pop("labels")
raw.pop("extra")
# All allure parameters are prefixed with "__", see test_runner/fixtures/parametrize.py
parameters = {
p["name"].removeprefix("__"): p["value"]
for p in test["parameters"]
if p["name"].startswith("__")
}
arch = parameters.get("arch", "UNKNOWN").strip("'")
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
labels = {label["name"]: label["value"] for label in test["labels"]}
row = Row(
@@ -132,6 +143,7 @@ def ingest_test_result(
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
duration=test["time"]["duration"],
flaky=test["flaky"] or test["retriesStatusChange"],
arch=arch,
build_type=build_type,
pg_version=pg_version,
run_id=run_id,

View File

@@ -44,7 +44,7 @@ run the following commands from the top of the neon.git checkout
# test suite run
export TEST_OUTPUT="$TEST_OUTPUT"
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py
# for interactive use
export NEON_REPO_DIR="$NEON_REPO_DIR"

View File

@@ -1,10 +1,10 @@
use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::Client;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::generation::Generation;
use utils::id::TimelineId;
@@ -16,7 +16,7 @@ use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use remote_storage::RemotePath;
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
pub(crate) struct TimelineAnalysis {
/// Anomalies detected
@@ -48,13 +48,12 @@ impl TimelineAnalysis {
}
pub(crate) async fn branch_cleanup_and_check_errors(
s3_client: &Client,
target: &RootTarget,
remote_client: &GenericRemoteStorage,
id: &TenantShardTimelineId,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<S3TimelineBlobData>,
s3_data: Option<RemoteTimelineBlobData>,
) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new();
@@ -78,7 +77,9 @@ pub(crate) async fn branch_cleanup_and_check_errors(
match s3_data {
Some(s3_data) => {
result.garbage_keys.extend(s3_data.unknown_keys);
result
.garbage_keys
.extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
match s3_data.blob_data {
BlobDataParseResult::Parsed {
@@ -143,11 +144,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
// HEAD request used here to address a race condition when an index was uploaded concurrently
// with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
let response = s3_client
.head_object()
.bucket(target.bucket_name())
.key(path.get_path().as_str())
.send()
let response = remote_client
.head_object(&path, &CancellationToken::new())
.await;
if response.is_err() {
@@ -284,14 +282,14 @@ impl TenantObjectListing {
}
#[derive(Debug)]
pub(crate) struct S3TimelineBlobData {
pub(crate) struct RemoteTimelineBlobData {
pub(crate) blob_data: BlobDataParseResult,
// Index objects that were not used when loading `blob_data`, e.g. those from old generations
pub(crate) unused_index_keys: Vec<String>,
pub(crate) unused_index_keys: Vec<ListingObject>,
// Objects whose keys were not recognized at all, i.e. not layer files, not indices
pub(crate) unknown_keys: Vec<String>,
pub(crate) unknown_keys: Vec<ListingObject>,
}
#[derive(Debug)]
@@ -323,31 +321,37 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
}
pub(crate) async fn list_timeline_blobs(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
id: TenantShardTimelineId,
s3_root: &RootTarget,
) -> anyhow::Result<S3TimelineBlobData> {
root_target: &RootTarget,
) -> anyhow::Result<RemoteTimelineBlobData> {
let mut s3_layers = HashSet::new();
let mut errors = Vec::new();
let mut unknown_keys = Vec::new();
let mut timeline_dir_target = s3_root.timeline_root(&id);
let mut timeline_dir_target = root_target.timeline_root(&id);
timeline_dir_target.delimiter = String::new();
let mut index_part_keys: Vec<String> = Vec::new();
let mut index_part_keys: Vec<ListingObject> = Vec::new();
let mut initdb_archive: bool = false;
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let obj = obj?;
let key = obj.key();
let prefix_str = &timeline_dir_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let (key, Some(obj)) = obj? else {
panic!("ListingObject not specified");
};
let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::debug!("Index key {key}");
index_part_keys.push(key.to_owned())
index_part_keys.push(obj)
}
Some("initdb.tar.zst") => {
tracing::debug!("initdb archive {key}");
@@ -358,7 +362,7 @@ pub(crate) async fn list_timeline_blobs(
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
s3_layers.insert((new_layer, gen));
}
Err(e) => {
@@ -366,13 +370,13 @@ pub(crate) async fn list_timeline_blobs(
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
unknown_keys.push(key.to_string());
unknown_keys.push(obj);
}
},
None => {
tracing::warn!("Unknown key {}", key);
tracing::warn!("Unknown key {key}");
errors.push(format!("S3 list response got an object with odd key {key}"));
unknown_keys.push(key.to_string());
unknown_keys.push(obj);
}
}
}
@@ -381,7 +385,7 @@ pub(crate) async fn list_timeline_blobs(
tracing::debug!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {
return Ok(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys,
unknown_keys: Vec::new(),
@@ -395,13 +399,13 @@ pub(crate) async fn list_timeline_blobs(
// Stripping the index key to the last part, because RemotePath doesn't
// like absolute paths, and depending on prefix_in_bucket it's possible
// for the keys we read back to start with a slash.
let basename = key.rsplit_once('/').unwrap().1;
let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
})
.max_by_key(|i| i.1)
.map(|(k, g)| (k.clone(), g))
{
Some((key, gen)) => (Some(key), gen),
Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
None => {
// Legacy/missing case: one or zero index parts, which did not have a generation
(index_part_keys.pop(), Generation::none())
@@ -416,17 +420,14 @@ pub(crate) async fn list_timeline_blobs(
}
if let Some(index_part_object_key) = index_part_object.as_ref() {
let index_part_bytes = download_object_with_retries(
s3_client,
&timeline_dir_target.bucket_name,
index_part_object_key,
)
.await
.context("index_part.json download")?;
let index_part_bytes =
download_object_with_retries(remote_client, &index_part_object_key.key)
.await
.context("index_part.json download")?;
match serde_json::from_slice(&index_part_bytes) {
Ok(index_part) => {
return Ok(S3TimelineBlobData {
return Ok(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Parsed {
index_part: Box::new(index_part),
index_part_generation,
@@ -448,7 +449,7 @@ pub(crate) async fn list_timeline_blobs(
);
}
Ok(S3TimelineBlobData {
Ok(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,

View File

@@ -6,7 +6,7 @@ use remote_storage::ListingMode;
use serde::{Deserialize, Serialize};
use crate::{
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants,
stream_objects_with_retries, BucketConfig, NodeKind,
};
@@ -50,9 +50,8 @@ pub async fn find_large_objects(
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (remote_client, target) =
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = pin!(stream_tenants(&remote_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);

View File

@@ -19,8 +19,8 @@ use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote_generic, list_objects_with_retries_generic,
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic},
init_remote, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?;
let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants_generic(&remote_client, &target);
let tenants = stream_tenants(&remote_client, &target);
let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone();
let console_cache = console_cache.clone();
@@ -237,14 +237,13 @@ async fn find_garbage_inner(
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
// identify it as purge-able anyway
if console_result.is_none() {
let timelines =
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
if timelines.is_empty() {
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
let tenant_objects = list_objects_with_retries_generic(
let tenant_objects = list_objects_with_retries(
&remote_client,
ListingMode::WithDelimiter,
&target.tenant_root(&tenant_shard_id),
@@ -265,7 +264,7 @@ async fn find_garbage_inner(
for timeline_r in timelines {
let timeline = timeline_r?;
let timeline_objects = list_objects_with_retries_generic(
let timeline_objects = list_objects_with_retries(
&remote_client,
ListingMode::WithDelimiter,
&target.timeline_root(&timeline),
@@ -331,8 +330,7 @@ async fn find_garbage_inner(
// Construct a stream of all timelines within active tenants
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
let timelines =
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t));
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
let timelines = timelines.try_flatten();
@@ -507,7 +505,7 @@ pub async fn purge_garbage(
);
let (remote_client, _target) =
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
assert_eq!(
&garbage_list.bucket_config.bucket,

View File

@@ -15,7 +15,7 @@ use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context};
use anyhow::Context;
use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
@@ -352,7 +352,7 @@ fn make_root_target(
}
}
async fn init_remote(
async fn init_remote_s3(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
@@ -369,7 +369,7 @@ async fn init_remote(
Ok((s3_client, s3_root))
}
async fn init_remote_generic(
async fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
@@ -394,45 +394,10 @@ async fn init_remote_generic(
// We already pass the prefix to the remote client above
let prefix_in_root_target = String::new();
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
let client = GenericRemoteStorage::from_config(&storage_config).await?;
Ok((client, s3_root))
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for trial in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
.prefix(&s3_target.prefix_in_bucket)
.delimiter(&s3_target.delimiter)
.set_continuation_token(continuation_token.clone())
.send()
.await
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
Ok((client, root_target))
}
/// Listing possibly large amounts of keys in a streaming fashion.
@@ -452,23 +417,26 @@ fn stream_objects_with_retries<'a>(
let mut list_stream =
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
while let Some(res) = list_stream.next().await {
if let Err(err) = res {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
match res {
Err(err) => {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
}
}
Ok(res) => {
trial = 0;
yield Ok(res);
}
} else {
trial = 0;
yield res.map_err(anyhow::Error::from);
}
}
}
@@ -476,7 +444,7 @@ fn stream_objects_with_retries<'a>(
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
/// use [`stream_objects_with_retries`] instead.
async fn list_objects_with_retries_generic(
async fn list_objects_with_retries(
remote_client: &GenericRemoteStorage,
listing_mode: ListingMode,
s3_target: &S3Target,
@@ -514,40 +482,34 @@ async fn list_objects_with_retries_generic(
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,
key: &str,
remote_client: &GenericRemoteStorage,
key: &RemotePath,
) -> anyhow::Result<Vec<u8>> {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
let cancel = CancellationToken::new();
for trial in 0..MAX_RETRIES {
let mut buf = Vec::new();
let download = match remote_client.download(key, &cancel).await {
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
continue;
}
};
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
match tokio_util::io::StreamReader::new(download.download_stream)
.read_to_end(&mut buf)
.await
{
Ok(bytes_read) => {
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
return Ok(body_buf);
return Ok(buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
}
}
}
@@ -555,7 +517,7 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file(
async fn download_object_to_file_s3(
s3_client: &Client,
bucket_name: &str,
key: &str,

View File

@@ -2,7 +2,6 @@ use std::str::FromStr;
use anyhow::{anyhow, Context};
use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use tokio_stream::Stream;
@@ -15,7 +14,7 @@ use pageserver_api::shard::TenantShardId;
use utils::id::{TenantId, TimelineId};
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
pub fn stream_tenants_generic<'a>(
pub fn stream_tenants<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
@@ -36,92 +35,36 @@ pub fn stream_tenants_generic<'a>(
}
}
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
try_stream! {
let mut continuation_token = None;
let tenants_target = target.tenants_root();
loop {
let fetch_response =
list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&tenants_target.prefix_in_bucket)?
.strip_suffix('/')
}).map(|entry_id_str| {
entry_id_str
.parse()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
yield i?;
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
pub async fn stream_tenant_shards<'a>(
s3_client: &'a Client,
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant_id: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let shards_target = target.tenant_shards_prefix(&tenant_id);
loop {
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
let fetch_response =
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
let fetch_response = match fetch_response {
Err(e) => {
tenant_shard_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let strip_prefix = target.tenants_root().prefix_in_bucket;
let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
let listing =
list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target)
.await?;
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
});
let tenant_shard_ids = listing
.prefixes
.iter()
.map(|prefix| prefix.get_path().as_str())
.filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) })
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
for i in new_entry_ids {
tenant_shard_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
})
.collect::<Vec<_>>();
tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len());
Ok(stream! {
for i in tenant_shard_ids {
let id = i?;
@@ -130,69 +73,10 @@ pub async fn stream_tenant_shards<'a>(
})
}
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let timelines_target = target.timelines_root(&tenant);
loop {
tracing::debug!("Listing in {}", tenant);
let fetch_response =
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
.await;
let fetch_response = match fetch_response {
Err(e) => {
timeline_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse::<TimelineId>()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
timeline_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
tracing::debug!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
}
})
}
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines_generic<'a>(
pub async fn stream_tenant_timelines<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant: TenantShardId,
@@ -200,6 +84,11 @@ pub async fn stream_tenant_timelines_generic<'a>(
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let timelines_target = target.timelines_root(&tenant);
let prefix_str = &timelines_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timelines_target.prefix_in_bucket);
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
ListingMode::WithDelimiter,
@@ -220,11 +109,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
.prefixes
.iter()
.filter_map(|prefix| -> Option<&str> {
prefix
.get_path()
.as_str()
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
prefix.get_path().as_str().strip_prefix(prefix_str)
})
.map(|entry_id_str| {
entry_id_str
@@ -237,7 +122,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
}
}
tracing::debug!("Yielding for {}", tenant);
tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
@@ -247,37 +132,6 @@ pub async fn stream_tenant_timelines_generic<'a>(
}
pub(crate) fn stream_listing<'a>(
s3_client: &'a Client,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
try_stream! {
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
if target.delimiter.is_empty() {
for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
{
let object_id = ObjectIdentifier::builder().key(object_key).build()?;
yield object_id;
}
} else {
for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
let object_id = ObjectIdentifier::builder().key(prefix).build()?;
yield object_id;
}
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
pub(crate) fn stream_listing_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {

View File

@@ -1,11 +1,10 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
@@ -13,10 +12,11 @@ use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::controller_api::TenantDescribeResponse;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use remote_storage::RemotePath;
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
use reqwest::Method;
use serde::Serialize;
use storage_controller_client::control_api;
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use utils::generation::Generation;
use utils::id::{TenantId, TenantTimelineId};
@@ -240,38 +240,13 @@ impl TenantRefAccumulator {
}
}
async fn is_old_enough(
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
key: &str,
summary: &mut GcSummary,
) -> bool {
fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
// Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
// it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
let age: Duration = match s3_client
.head_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(response) => match response.last_modified {
None => {
tracing::warn!("Missing last_modified");
summary.remote_storage_errors += 1;
return false;
}
Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) {
Ok(Ok(e)) => e,
Err(_) | Ok(Err(_)) => {
tracing::warn!("Bad last_modified time: {last_modified:?}");
return false;
}
},
},
Err(e) => {
tracing::warn!("Failed to HEAD {key}: {e}");
let age = match key.last_modified.elapsed() {
Ok(e) => e,
Err(_) => {
tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
summary.remote_storage_errors += 1;
return false;
}
@@ -289,17 +264,30 @@ async fn is_old_enough(
old_enough
}
/// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
async fn check_is_old_enough(
remote_client: &GenericRemoteStorage,
key: &RemotePath,
min_age: &Duration,
summary: &mut GcSummary,
) -> Option<bool> {
let listing_object = remote_client
.head_object(key, &CancellationToken::new())
.await
.ok()?;
Some(is_old_enough(min_age, &listing_object, summary))
}
async fn maybe_delete_index(
s3_client: &Client,
bucket_config: &BucketConfig,
remote_client: &GenericRemoteStorage,
min_age: &Duration,
latest_gen: Generation,
key: &str,
obj: &ListingObject,
mode: GcMode,
summary: &mut GcSummary,
) {
// Validation: we will only delete things that parse cleanly
let basename = key.rsplit_once('/').unwrap().1;
let basename = obj.key.get_path().file_name().unwrap();
let candidate_generation =
match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
Some(g) => g,
@@ -328,7 +316,7 @@ async fn maybe_delete_index(
return;
}
if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await {
if !is_old_enough(min_age, obj, summary) {
return;
}
@@ -338,11 +326,8 @@ async fn maybe_delete_index(
}
// All validations passed: erase the object
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
match remote_client
.delete(&obj.key, &CancellationToken::new())
.await
{
Ok(_) => {
@@ -358,8 +343,7 @@ async fn maybe_delete_index(
#[allow(clippy::too_many_arguments)]
async fn gc_ancestor(
s3_client: &Client,
bucket_config: &BucketConfig,
remote_client: &GenericRemoteStorage,
root_target: &RootTarget,
min_age: &Duration,
ancestor: TenantShardId,
@@ -368,7 +352,7 @@ async fn gc_ancestor(
summary: &mut GcSummary,
) -> anyhow::Result<()> {
// Scan timelines in the ancestor
let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?;
let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
let mut timelines = std::pin::pin!(timelines);
// Build a list of keys to retain
@@ -376,7 +360,7 @@ async fn gc_ancestor(
while let Some(ttid) = timelines.next().await {
let ttid = ttid?;
let data = list_timeline_blobs(s3_client, ttid, root_target).await?;
let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
let s3_layers = match data.blob_data {
BlobDataParseResult::Parsed {
@@ -427,7 +411,8 @@ async fn gc_ancestor(
// We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
// to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await {
let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
continue;
}
@@ -437,13 +422,7 @@ async fn gc_ancestor(
}
// All validations passed: erase the object
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(&key)
.send()
.await
{
match remote_client.delete(&path, &CancellationToken::new()).await {
Ok(_) => {
tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
summary.ancestor_layers_deleted += 1;
@@ -477,10 +456,10 @@ pub async fn pageserver_physical_gc(
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
};
@@ -493,14 +472,13 @@ pub async fn pageserver_physical_gc(
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn gc_timeline(
s3_client: &Client,
bucket_config: &BucketConfig,
remote_client: &GenericRemoteStorage,
min_age: &Duration,
target: &RootTarget,
mode: GcMode,
@@ -508,7 +486,7 @@ pub async fn pageserver_physical_gc(
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(s3_client, ttid, target).await?;
let data = list_timeline_blobs(remote_client, ttid, target).await?;
let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
@@ -533,17 +511,9 @@ pub async fn pageserver_physical_gc(
accumulator.lock().unwrap().update(ttid, index_part);
for key in candidates {
maybe_delete_index(
s3_client,
bucket_config,
min_age,
latest_gen,
&key,
mode,
&mut summary,
)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
.await;
maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
.await;
}
Ok(summary)
@@ -554,15 +524,7 @@ pub async fn pageserver_physical_gc(
// Drain futures for per-shard GC, populating accumulator as a side effect
{
let timelines = timelines.map_ok(|ttid| {
gc_timeline(
&s3_client,
bucket_config,
&min_age,
&target,
mode,
ttid,
&accumulator,
)
gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
});
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
@@ -586,8 +548,7 @@ pub async fn pageserver_physical_gc(
for ancestor_shard in ancestor_shards {
gc_ancestor(
&s3_client,
bucket_config,
&remote_client,
&target,
&min_age,
ancestor_shard,

View File

@@ -1,16 +1,16 @@
use std::collections::{HashMap, HashSet};
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TenantObjectListing, TimelineAnalysis,
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use utils::id::TenantId;
use utils::shard::ShardCount;
@@ -36,7 +36,7 @@ impl MetadataSummary {
Self::default()
}
fn update_data(&mut self, data: &S3TimelineBlobData) {
fn update_data(&mut self, data: &RemoteTimelineBlobData) {
self.timeline_shard_count += 1;
if let BlobDataParseResult::Parsed {
index_part,
@@ -120,10 +120,10 @@ pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
@@ -133,20 +133,20 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
@@ -157,12 +157,11 @@ pub async fn scan_pageserver_metadata(
let mut tenant_timeline_results = Vec::new();
async fn analyze_tenant(
s3_client: &Client,
target: &RootTarget,
remote_client: &GenericRemoteStorage,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
) {
summary.tenant_count += 1;
@@ -191,8 +190,7 @@ pub async fn scan_pageserver_metadata(
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
s3_client,
target,
remote_client,
&ttid,
&mut tenant_objects,
None,
@@ -273,8 +271,7 @@ pub async fn scan_pageserver_metadata(
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
&s3_client,
&target,
&remote_client,
prev_tenant_id,
&mut summary,
tenant_objects,
@@ -311,8 +308,7 @@ pub async fn scan_pageserver_metadata(
if !tenant_timeline_results.is_empty() {
analyze_tenant(
&s3_client,
&target,
&remote_client,
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,

View File

@@ -14,9 +14,8 @@ use utils::{
};
use crate::{
cloud_admin_api::CloudAdminApiClient, init_remote_generic,
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget,
TenantShardTimelineId,
cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
@@ -107,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?;
let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
@@ -188,14 +187,19 @@ async fn check_timeline(
// we need files, so unset it.
timeline_dir_target.delimiter = String::new();
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
let prefix_str = &timeline_dir_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let (key, _obj) = obj?;
let seg_name = key
.get_path()
.as_str()
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
.strip_prefix(prefix_str)
.expect("failed to extract segment name");
expected_segfiles.remove(seg_name);
}

View File

@@ -1,10 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
use crate::checks::{list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId,
download_object_to_file_s3, init_remote, init_remote_s3, BucketConfig, NodeKind, RootTarget,
TenantShardTimelineId,
};
use anyhow::Context;
use async_stream::stream;
@@ -15,6 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use utils::generation::Generation;
use utils::id::TenantId;
@@ -34,7 +36,8 @@ impl SnapshotDownloader {
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, s3_root) =
init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?;
Ok(Self {
s3_client,
s3_root,
@@ -91,7 +94,7 @@ impl SnapshotDownloader {
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
};
download_object_to_file(
download_object_to_file_s3(
&self.s3_client,
&self.bucket_config.bucket,
&remote_layer_path,
@@ -215,11 +218,11 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) =
let (remote_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -237,18 +240,19 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {
@@ -278,7 +282,7 @@ impl SnapshotDownloader {
for (ttid, layers) in ancestor_layers.into_iter() {
tracing::info!(
"Downloading {} layers from ancvestor timeline {ttid}...",
"Downloading {} layers from ancestor timeline {ttid}...",
layers.len()
);

View File

@@ -71,8 +71,7 @@ a subdirectory for each version with naming convention `v{PG_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
`DEFAULT_PG_VERSION`: The version of Postgres to use,
This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`. Alternatively,
you can use `--pg-version` argument.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.

View File

@@ -4643,6 +4643,7 @@ class StorageScrubber:
]
args = base_args + args
log.info(f"Invoking scrubber command {args} with env: {env}")
(output_path, stdout, status_code) = subprocess_capture(
self.log_dir,
args,

View File

@@ -1,6 +1,7 @@
import os
from typing import Any, Dict, Optional
import allure
import pytest
import toml
from _pytest.python import Metafunc
@@ -91,3 +92,23 @@ def pytest_generate_tests(metafunc: Metafunc):
and (platform := os.getenv("PLATFORM")) is not None
):
metafunc.parametrize("platform", [platform.lower()])
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(*args, **kwargs):
# Add test parameters to Allue report to distinguish the same tests with different parameters.
# Names has `__` prefix to avoid conflicts with `pytest.mark.parametrize` parameters
# A mapping between `uname -m` and `RUNNER_ARCH` values.
# `RUNNER_ARCH` environment variable is set on GitHub Runners,
# possible values are X86, X64, ARM, or ARM64.
# See https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
uname_m = {
"aarch64": "ARM64",
"arm64": "ARM64",
"x86_64": "X64",
}.get(os.uname().machine, "UNKNOWN")
arch = os.getenv("RUNNER_ARCH", uname_m)
allure.dynamic.parameter("__arch", arch)
yield

View File

@@ -3,8 +3,6 @@ import os
from typing import Optional
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
"""
This fixture is used to determine which version of Postgres to use for tests.
@@ -52,7 +50,7 @@ class PgVersion(str, enum.Enum):
return None
DEFAULT_VERSION: PgVersion = PgVersion.V15
DEFAULT_VERSION: PgVersion = PgVersion.V16
def skip_on_postgres(version: PgVersion, reason: str):
@@ -69,22 +67,8 @@ def xfail_on_postgres(version: PgVersion, reason: str):
)
def pytest_addoption(parser: Parser):
parser.addoption(
"--pg-version",
action="store",
type=PgVersion,
help="DEPRECATED: Postgres version to use for tests",
)
def run_only_on_default_postgres(reason: str):
return pytest.mark.skipif(
PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION)) is not DEFAULT_VERSION,
reason=reason,
)
def pytest_configure(config: Config):
if config.getoption("--pg-version"):
raise Exception("--pg-version is deprecated, use DEFAULT_PG_VERSION env var instead")

View File

@@ -7,7 +7,7 @@ easier to see if you have compile errors without scrolling up.
You may also need to run `./scripts/pysync`.
Then run the tests
`DEFAULT_PG_VERSION=15 NEON_BIN=./target/release poetry run pytest test_runner/performance`
`DEFAULT_PG_VERSION=16 NEON_BIN=./target/release poetry run pytest test_runner/performance`
Some handy pytest flags for local development:
- `-x` tells pytest to stop on first error

View File

@@ -11,6 +11,6 @@ It supports mounting snapshots using overlayfs, which improves iteration time.
Here's a full command line.
```
RUST_BACKTRACE=1 NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release \
RUST_BACKTRACE=1 NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=16 BUILD_TYPE=release \
./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
````

View File

@@ -14,7 +14,7 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
"""
Usage:
DEFAULT_PG_VERSION=15 BUILD_TYPE=debug NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 INTERACTIVE=true \
DEFAULT_PG_VERSION=16 BUILD_TYPE=debug NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 INTERACTIVE=true \
./scripts/pytest --timeout 0 test_runner/performance/pageserver/interactive/test_many_small_tenants.py
"""

View File

@@ -39,7 +39,7 @@ from fixtures.workload import Workload
#
# How to run `test_backward_compatibility` locally:
#
# export DEFAULT_PG_VERSION=15
# export DEFAULT_PG_VERSION=16
# export BUILD_TYPE=release
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
@@ -61,7 +61,7 @@ from fixtures.workload import Workload
#
# How to run `test_forward_compatibility` locally:
#
# export DEFAULT_PG_VERSION=15
# export DEFAULT_PG_VERSION=16
# export BUILD_TYPE=release
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}