mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Compare commits
41 Commits
sk-bump-te
...
cicd/debug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b66d4e0bdc | ||
|
|
02a28c01ca | ||
|
|
c96593b473 | ||
|
|
ef57e73fbf | ||
|
|
4c5a0fdc75 | ||
|
|
4b26783c94 | ||
|
|
6d9fd2bbb5 | ||
|
|
6949b45e17 | ||
|
|
3b8ca477ab | ||
|
|
eb7241c798 | ||
|
|
e5d9c003f5 | ||
|
|
290ce3ed46 | ||
|
|
1138e286b9 | ||
|
|
9173847f81 | ||
|
|
6ae62574d0 | ||
|
|
5e074d8536 | ||
|
|
3a22daf33e | ||
|
|
f40f627730 | ||
|
|
56b94b7d1b | ||
|
|
fe445b2945 | ||
|
|
7f49f45a53 | ||
|
|
980b212789 | ||
|
|
2b17a03911 | ||
|
|
b50a9d84d4 | ||
|
|
6fd3c9daa5 | ||
|
|
409171ab08 | ||
|
|
f26240c9dc | ||
|
|
1c771267ab | ||
|
|
9f1b7b72ed | ||
|
|
2476f7ef74 | ||
|
|
f555cb3970 | ||
|
|
10a726503a | ||
|
|
7ba86e15fa | ||
|
|
7ba42bfdb4 | ||
|
|
4f8a39d6c6 | ||
|
|
54c5da3981 | ||
|
|
c1378dc43b | ||
|
|
50b9fb430a | ||
|
|
486eaba028 | ||
|
|
d4d70cc314 | ||
|
|
176eefa47a |
4
.github/actionlint.yml
vendored
4
.github/actionlint.yml
vendored
@@ -6,6 +6,10 @@ self-hosted-runner:
|
||||
- small
|
||||
- small-arm64
|
||||
- us-east-2
|
||||
- aws-arm64-8core
|
||||
- aws-arm64-16core
|
||||
- aws-arm64-32core
|
||||
- qemu-arm64
|
||||
config-variables:
|
||||
- BENCHMARK_PROJECT_ID_PUB
|
||||
- BENCHMARK_PROJECT_ID_SUB
|
||||
|
||||
@@ -43,7 +43,7 @@ inputs:
|
||||
pg_version:
|
||||
description: 'Postgres version to use for tests'
|
||||
required: false
|
||||
default: 'v14'
|
||||
default: 'v16'
|
||||
benchmark_durations:
|
||||
description: 'benchmark durations JSON'
|
||||
required: false
|
||||
@@ -131,7 +131,7 @@ runs:
|
||||
fi
|
||||
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
|
||||
# -n sets the number of parallel processes that pytest-xdist will run
|
||||
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
|
||||
EXTRA_PARAMS="-n auto $EXTRA_PARAMS"
|
||||
|
||||
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
|
||||
# to the same worker to make @pytest.mark.order work with xdist
|
||||
@@ -169,10 +169,8 @@ runs:
|
||||
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
|
||||
cov_prefix=()
|
||||
else
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
26
.github/workflows/_build-and-test-locally.yml
vendored
26
.github/workflows/_build-and-test-locally.yml
vendored
@@ -36,7 +36,7 @@ env:
|
||||
|
||||
jobs:
|
||||
build-neon:
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
|
||||
container:
|
||||
image: ${{ inputs.build-tools-image }}
|
||||
credentials:
|
||||
@@ -94,11 +94,16 @@ jobs:
|
||||
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
|
||||
# corresponding Cargo.toml files for their descriptions.
|
||||
- name: Set env variables
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
run: |
|
||||
CARGO_FEATURES="--features testing"
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=""
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=""
|
||||
CARGO_FLAGS="--locked --release"
|
||||
@@ -110,6 +115,11 @@ jobs:
|
||||
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
|
||||
} >> $GITHUB_ENV
|
||||
|
||||
# See https://github.com/aws/aws-graviton-getting-started/blob/57dc813626d0266f1cc12ef83474745bb1f31fb4/rust.md
|
||||
- name: Set RUSTFLAGS for ARM
|
||||
if: inputs.arch == 'arm64'
|
||||
run: echo "RUSTFLAGS=-Ctarget-feature=+lse -Ctarget-cpu=neoverse-n1" >> $GITHUB_ENV
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
uses: actions/cache@v4
|
||||
@@ -153,11 +163,13 @@ jobs:
|
||||
run: |
|
||||
PQ_LIB_DIR=$(pwd)/pg_install/v16/lib
|
||||
export PQ_LIB_DIR
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests -j$(nproc)
|
||||
|
||||
# Do install *before* running rust tests because they might recompile the
|
||||
# binaries with different features/flags.
|
||||
- name: Install rust binaries
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
run: |
|
||||
# Install target binaries
|
||||
mkdir -p /tmp/neon/bin/
|
||||
@@ -172,7 +184,7 @@ jobs:
|
||||
done
|
||||
|
||||
# Install test executables and write list of all binaries (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
# Keep bloated coverage data files away from the rest of the artifact
|
||||
mkdir -p /tmp/coverage/
|
||||
|
||||
@@ -243,10 +255,10 @@ jobs:
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
regress-tests:
|
||||
# Run test on x64 only
|
||||
if: inputs.arch == 'x64'
|
||||
# Don't run regression tests on debug arm64 builds
|
||||
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
|
||||
needs: [ build-neon ]
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'qemu-arm64' || 'large')) }}
|
||||
container:
|
||||
image: ${{ inputs.build-tools-image }}
|
||||
credentials:
|
||||
|
||||
13
.github/workflows/build_and_test.yml
vendored
13
.github/workflows/build_and_test.yml
vendored
@@ -198,7 +198,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
arch: [ x64 ]
|
||||
arch: [ x64, arm64 ]
|
||||
# Do not build or run tests in debug for release branches
|
||||
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
|
||||
include:
|
||||
@@ -280,6 +280,7 @@ jobs:
|
||||
save_perf_report: ${{ github.ref_name == 'main' }}
|
||||
extra_params: --splits 5 --group ${{ matrix.pytest_split_group }}
|
||||
benchmark_durations: ${{ needs.get-benchmarks-durations.outputs.json }}
|
||||
pg_version: v16
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
@@ -985,10 +986,10 @@ jobs:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
|
||||
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
|
||||
gh workflow --repo neondatabase/azure run deploy.yml -f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
|
||||
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
|
||||
-f deployPgSniRouter=false \
|
||||
-f deployProxy=false \
|
||||
-f deployStorage=true \
|
||||
@@ -998,14 +999,14 @@ jobs:
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}} \
|
||||
-f deployPreprodRegion=true
|
||||
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
|
||||
gh workflow --repo neondatabase/infra run deploy-prod.yml --ref main \
|
||||
-f deployStorage=true \
|
||||
-f deployStorageBroker=true \
|
||||
-f deployStorageController=true \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
|
||||
gh workflow --repo neondatabase/infra run deploy-dev.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f deployStorage=false \
|
||||
@@ -1015,7 +1016,7 @@ jobs:
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}} \
|
||||
-f deployPreprodRegion=true
|
||||
|
||||
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
|
||||
gh workflow --repo neondatabase/infra run deploy-proxy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f branch=main \
|
||||
|
||||
@@ -262,7 +262,7 @@ By default, this runs both debug and release modes, and all supported postgres v
|
||||
testing locally, it is convenient to run just one set of permutations, like this:
|
||||
|
||||
```sh
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest
|
||||
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest
|
||||
```
|
||||
|
||||
## Flamegraphs
|
||||
|
||||
@@ -54,7 +54,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
|
||||
const DEFAULT_BRANCH_NAME: &str = "main";
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_PG_VERSION: &str = "15";
|
||||
const DEFAULT_PG_VERSION: &str = "16";
|
||||
|
||||
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::pageserver::PageServerNode;
|
||||
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
|
||||
//
|
||||
// This data structures represents neon_local CLI config
|
||||
|
||||
@@ -14,7 +14,7 @@ picked tenant (which requested on-demand activation) for around 30 seconds
|
||||
during the restart at 2024-04-03 16:37 UTC.
|
||||
|
||||
Note that lots of shutdowns on loaded pageservers do not finish within the
|
||||
[10 second systemd enforced timeout](https://github.com/neondatabase/aws/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
|
||||
[10 second systemd enforced timeout](https://github.com/neondatabase/infra/blob/0a5280b383e43c063d43cbf87fa026543f6d6ad4/.github/ansible/systemd/pageserver.service#L16). This means we are shutting down without flushing ephemeral layers
|
||||
and have to reingest data in order to serve requests after restarting, potentially making first request latencies worse.
|
||||
|
||||
This problem is not yet very acutely felt in storage controller managed pageservers since
|
||||
|
||||
@@ -383,6 +383,48 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
}
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError> {
|
||||
let kind = RequestKind::Head;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
|
||||
let properties_future = blob_client.get_properties().into_future();
|
||||
|
||||
let properties_future = tokio::time::timeout(self.timeout, properties_future);
|
||||
|
||||
let res = tokio::select! {
|
||||
res = properties_future => res,
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
if let Ok(inner) = &res {
|
||||
// do not incl. timeouts as errors in metrics but cancellations
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, inner, started_at);
|
||||
}
|
||||
|
||||
let data = match res {
|
||||
Ok(Ok(data)) => Ok(data),
|
||||
Ok(Err(sdk)) => Err(to_download_error(sdk)),
|
||||
Err(_timeout) => Err(DownloadError::Timeout),
|
||||
}?;
|
||||
|
||||
let properties = data.blob.properties;
|
||||
Ok(ListingObject {
|
||||
key: key.to_owned(),
|
||||
last_modified: SystemTime::from(properties.last_modified),
|
||||
size: properties.content_length,
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
|
||||
@@ -150,7 +150,7 @@ pub enum ListingMode {
|
||||
NoDelimiter,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub struct ListingObject {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
@@ -215,6 +215,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
Ok(combined)
|
||||
}
|
||||
|
||||
/// Obtain metadata information about an object.
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError>;
|
||||
|
||||
/// Streams the local file contents into remote into the remote storage entry.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
|
||||
@@ -363,6 +370,20 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
// See [`RemoteStorage::head_object`].
|
||||
pub async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.head_object(key, cancel).await,
|
||||
Self::AwsS3(s) => s.head_object(key, cancel).await,
|
||||
Self::AzureBlob(s) => s.head_object(key, cancel).await,
|
||||
Self::Unreliable(s) => s.head_object(key, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::upload`]
|
||||
pub async fn upload(
|
||||
&self,
|
||||
@@ -598,6 +619,7 @@ impl ConcurrencyLimiter {
|
||||
RequestKind::Delete => &self.write,
|
||||
RequestKind::Copy => &self.write,
|
||||
RequestKind::TimeTravel => &self.write,
|
||||
RequestKind::Head => &self.read,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -445,6 +445,20 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
_cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError> {
|
||||
let target_file_path = key.with_base(&self.storage_root);
|
||||
let metadata = file_metadata(&target_file_path).await?;
|
||||
Ok(ListingObject {
|
||||
key: key.clone(),
|
||||
last_modified: metadata.modified()?,
|
||||
size: metadata.len(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
|
||||
|
||||
@@ -13,6 +13,7 @@ pub(crate) enum RequestKind {
|
||||
List = 3,
|
||||
Copy = 4,
|
||||
TimeTravel = 5,
|
||||
Head = 6,
|
||||
}
|
||||
|
||||
use scopeguard::ScopeGuard;
|
||||
@@ -27,6 +28,7 @@ impl RequestKind {
|
||||
List => "list_objects",
|
||||
Copy => "copy_object",
|
||||
TimeTravel => "time_travel_recover",
|
||||
Head => "head_object",
|
||||
}
|
||||
}
|
||||
const fn as_index(&self) -> usize {
|
||||
@@ -34,7 +36,8 @@ impl RequestKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RequestTyped<C>([C; 6]);
|
||||
const REQUEST_KIND_COUNT: usize = 7;
|
||||
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
|
||||
|
||||
impl<C> RequestTyped<C> {
|
||||
pub(crate) fn get(&self, kind: RequestKind) -> &C {
|
||||
@@ -43,8 +46,8 @@ impl<C> RequestTyped<C> {
|
||||
|
||||
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
|
||||
use RequestKind::*;
|
||||
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
|
||||
let arr = std::array::from_fn::<C, 6, _>(|index| {
|
||||
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
|
||||
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
|
||||
let next = it.next().unwrap();
|
||||
assert_eq!(index, next.as_index());
|
||||
f(next)
|
||||
|
||||
@@ -23,7 +23,7 @@ use aws_config::{
|
||||
use aws_sdk_s3::{
|
||||
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
|
||||
error::SdkError,
|
||||
operation::get_object::GetObjectError,
|
||||
operation::{get_object::GetObjectError, head_object::HeadObjectError},
|
||||
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
|
||||
Client,
|
||||
};
|
||||
@@ -604,6 +604,78 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<ListingObject, DownloadError> {
|
||||
let kind = RequestKind::Head;
|
||||
let _permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let head_future = self
|
||||
.client
|
||||
.head_object()
|
||||
.bucket(self.bucket_name())
|
||||
.key(self.relative_path_to_s3_object(key))
|
||||
.send();
|
||||
|
||||
let head_future = tokio::time::timeout(self.timeout, head_future);
|
||||
|
||||
let res = tokio::select! {
|
||||
res = head_future => res,
|
||||
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
};
|
||||
|
||||
let res = res.map_err(|_e| DownloadError::Timeout)?;
|
||||
|
||||
// do not incl. timeouts as errors in metrics but cancellations
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
crate::metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &res, started_at);
|
||||
|
||||
let data = match res {
|
||||
Ok(object_output) => object_output,
|
||||
Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
|
||||
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
|
||||
// an error: we expect to sometimes fetch an object and find it missing,
|
||||
// e.g. when probing for timeline indices.
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
return Err(DownloadError::NotFound);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
|
||||
return Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("s3 head object"),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
|
||||
return Err(DownloadError::Other(anyhow!(
|
||||
"head_object doesn't contain last_modified or content_length"
|
||||
)))?;
|
||||
};
|
||||
Ok(ListingObject {
|
||||
key: key.to_owned(),
|
||||
last_modified: SystemTime::try_from(last_modified).map_err(|e| {
|
||||
DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
|
||||
})?,
|
||||
size: size as u64,
|
||||
})
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
|
||||
@@ -30,6 +30,7 @@ pub struct UnreliableWrapper {
|
||||
#[derive(Debug, Hash, Eq, PartialEq)]
|
||||
enum RemoteOp {
|
||||
ListPrefixes(Option<RemotePath>),
|
||||
HeadObject(RemotePath),
|
||||
Upload(RemotePath),
|
||||
Download(RemotePath),
|
||||
Delete(RemotePath),
|
||||
@@ -137,6 +138,16 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.list(prefix, mode, max_keys, cancel).await
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::ListingObject, DownloadError> {
|
||||
self.attempt(RemoteOp::HeadObject(key.clone()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner.head_object(key, cancel).await
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
|
||||
|
||||
@@ -1,15 +1,10 @@
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
|
||||
use crate::tenant::ephemeral_file;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum L0FlushConfig {
|
||||
PageCached,
|
||||
#[serde(rename_all = "snake_case")]
|
||||
Direct {
|
||||
max_concurrency: NonZeroUsize,
|
||||
},
|
||||
Direct { max_concurrency: NonZeroUsize },
|
||||
}
|
||||
|
||||
impl Default for L0FlushConfig {
|
||||
@@ -25,14 +20,12 @@ impl Default for L0FlushConfig {
|
||||
pub struct L0FlushGlobalState(Arc<Inner>);
|
||||
|
||||
pub enum Inner {
|
||||
PageCached,
|
||||
Direct { semaphore: tokio::sync::Semaphore },
|
||||
}
|
||||
|
||||
impl L0FlushGlobalState {
|
||||
pub fn new(config: L0FlushConfig) -> Self {
|
||||
match config {
|
||||
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
|
||||
L0FlushConfig::Direct { max_concurrency } => {
|
||||
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
|
||||
Self(Arc::new(Inner::Direct { semaphore }))
|
||||
@@ -44,13 +37,3 @@ impl L0FlushGlobalState {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl L0FlushConfig {
|
||||
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
|
||||
use L0FlushConfig::*;
|
||||
match self {
|
||||
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
|
||||
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ use tracing::{info, info_span};
|
||||
/// backwards-compatible changes to the metadata format.
|
||||
pub const STORAGE_FORMAT_VERSION: u16 = 3;
|
||||
|
||||
pub const DEFAULT_PG_VERSION: u32 = 15;
|
||||
pub const DEFAULT_PG_VERSION: u32 = 16;
|
||||
|
||||
// Magic constants used to identify different kinds of files
|
||||
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
|
||||
|
||||
@@ -393,7 +393,7 @@ struct PageServerTask {
|
||||
|
||||
/// Tasks may optionally be launched for a particular tenant/timeline, enabling
|
||||
/// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`]
|
||||
tenant_shard_id: Option<TenantShardId>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
|
||||
mutable: Mutex<MutableTaskState>,
|
||||
@@ -405,7 +405,7 @@ struct PageServerTask {
|
||||
pub fn spawn<F>(
|
||||
runtime: &tokio::runtime::Handle,
|
||||
kind: TaskKind,
|
||||
tenant_shard_id: Option<TenantShardId>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
name: &str,
|
||||
future: F,
|
||||
@@ -550,7 +550,7 @@ pub async fn shutdown_tasks(
|
||||
let tasks = TASKS.lock().unwrap();
|
||||
for task in tasks.values() {
|
||||
if (kind.is_none() || Some(task.kind) == kind)
|
||||
&& (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id)
|
||||
&& (tenant_shard_id.is_none() || Some(task.tenant_shard_id) == tenant_shard_id)
|
||||
&& (timeline_id.is_none() || task.timeline_id == timeline_id)
|
||||
{
|
||||
task.cancel.cancel();
|
||||
@@ -573,13 +573,8 @@ pub async fn shutdown_tasks(
|
||||
};
|
||||
if let Some(mut join_handle) = join_handle {
|
||||
if log_all {
|
||||
if tenant_shard_id.is_none() {
|
||||
// there are quite few of these
|
||||
info!(name = task.name, kind = ?task_kind, "stopping global task");
|
||||
} else {
|
||||
// warn to catch these in tests; there shouldn't be any
|
||||
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
|
||||
}
|
||||
// warn to catch these in tests; there shouldn't be any
|
||||
warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over");
|
||||
}
|
||||
if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle)
|
||||
.await
|
||||
|
||||
@@ -798,7 +798,7 @@ impl Tenant {
|
||||
task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
TaskKind::Attach,
|
||||
Some(tenant_shard_id),
|
||||
tenant_shard_id,
|
||||
None,
|
||||
"attach tenant",
|
||||
async move {
|
||||
|
||||
@@ -21,7 +21,6 @@ pub struct EphemeralFile {
|
||||
}
|
||||
|
||||
mod page_caching;
|
||||
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
|
||||
mod zero_padded_read_write;
|
||||
|
||||
impl EphemeralFile {
|
||||
@@ -52,12 +51,10 @@ impl EphemeralFile {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let prewarm = conf.l0_flush.prewarm_on_write();
|
||||
|
||||
Ok(EphemeralFile {
|
||||
_tenant_shard_id: tenant_shard_id,
|
||||
_timeline_id: timeline_id,
|
||||
rw: page_caching::RW::new(file, prewarm, gate_guard),
|
||||
rw: page_caching::RW::new(file, gate_guard),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
|
||||
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
|
||||
//!
|
||||
//! Subject to removal in <https://github.com/neondatabase/neon/pull/8537>
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::BlockLease;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::io::{self};
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
use tracing::*;
|
||||
|
||||
@@ -18,33 +18,17 @@ use super::zero_padded_read_write;
|
||||
/// See module-level comment.
|
||||
pub struct RW {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
|
||||
rw: super::zero_padded_read_write::RW<size_tracking_writer::Writer<VirtualFile>>,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
|
||||
/// should we pre-warm the [`crate::page_cache`] with the contents?
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum PrewarmOnWrite {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
impl RW {
|
||||
pub fn new(
|
||||
file: VirtualFile,
|
||||
prewarm_on_write: PrewarmOnWrite,
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
) -> Self {
|
||||
pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
|
||||
let page_cache_file_id = page_cache::next_file_id();
|
||||
Self {
|
||||
page_cache_file_id,
|
||||
rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
|
||||
page_cache_file_id,
|
||||
file,
|
||||
prewarm_on_write,
|
||||
)),
|
||||
rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)),
|
||||
_gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -84,10 +68,10 @@ impl RW {
|
||||
let vec = Vec::with_capacity(size);
|
||||
|
||||
// read from disk what we've already flushed
|
||||
let writer = self.rw.as_writer();
|
||||
let flushed_range = writer.written_range();
|
||||
let mut vec = writer
|
||||
.file
|
||||
let file_size_tracking_writer = self.rw.as_writer();
|
||||
let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap();
|
||||
let mut vec = file_size_tracking_writer
|
||||
.as_inner()
|
||||
.read_exact_at(
|
||||
vec.slice(0..(flushed_range.end - flushed_range.start)),
|
||||
u64::try_from(flushed_range.start).unwrap(),
|
||||
@@ -122,7 +106,7 @@ impl RW {
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.rw.as_writer().file.path,
|
||||
self.rw.as_writer().as_inner().path,
|
||||
e,
|
||||
),
|
||||
)
|
||||
@@ -132,7 +116,7 @@ impl RW {
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = writer
|
||||
.file
|
||||
.as_inner()
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
@@ -154,137 +138,16 @@ impl Drop for RW {
|
||||
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
|
||||
let path = &self.rw.as_writer().as_inner().path;
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.rw.as_writer().file.path,
|
||||
e
|
||||
);
|
||||
error!("could not remove ephemeral file '{path}': {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PreWarmingWriter {
|
||||
prewarm_on_write: PrewarmOnWrite,
|
||||
nwritten_blocks: u32,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
file: VirtualFile,
|
||||
}
|
||||
|
||||
impl PreWarmingWriter {
|
||||
fn new(
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
file: VirtualFile,
|
||||
prewarm_on_write: PrewarmOnWrite,
|
||||
) -> Self {
|
||||
Self {
|
||||
prewarm_on_write,
|
||||
nwritten_blocks: 0,
|
||||
page_cache_file_id,
|
||||
file,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the byte range within `file` that has been written though `write_all`.
|
||||
///
|
||||
/// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
|
||||
fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
|
||||
let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
|
||||
struct Wrapper(Range<usize>);
|
||||
impl Deref for Wrapper {
|
||||
type Target = Range<usize>;
|
||||
fn deref(&self) -> &Range<usize> {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
Wrapper(0..nwritten_blocks * PAGE_SZ)
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
|
||||
async fn write_all<Buf: tokio_epoll_uring::IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
let buflen = buf.len();
|
||||
assert_eq!(
|
||||
buflen % PAGE_SZ,
|
||||
0,
|
||||
"{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
|
||||
);
|
||||
|
||||
// Do the IO.
|
||||
let buf = match self.file.write_all(buf, ctx).await {
|
||||
(buf, Ok(nwritten)) => {
|
||||
assert_eq!(nwritten, buflen);
|
||||
buf
|
||||
}
|
||||
(_, Err(e)) => {
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::Other,
|
||||
// order error before path because path is long and error is short
|
||||
format!(
|
||||
"ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
|
||||
self.nwritten_blocks, buflen, e, self.file.path,
|
||||
),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let nblocks = buflen / PAGE_SZ;
|
||||
let nblocks32 = u32::try_from(nblocks).unwrap();
|
||||
|
||||
if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
|
||||
// Pre-warm page cache with the contents.
|
||||
// At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
|
||||
// benefits the code that writes InMemoryLayer=>L0 layers.
|
||||
|
||||
let cache = page_cache::get();
|
||||
static CTX: Lazy<RequestContext> = Lazy::new(|| {
|
||||
RequestContext::new(
|
||||
crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
|
||||
crate::context::DownloadBehavior::Error,
|
||||
)
|
||||
});
|
||||
for blknum_in_buffer in 0..nblocks {
|
||||
let blk_in_buffer =
|
||||
&buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
|
||||
let blknum = self
|
||||
.nwritten_blocks
|
||||
.checked_add(blknum_in_buffer as u32)
|
||||
.unwrap();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
|
||||
.await
|
||||
{
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
}
|
||||
Ok(v) => match v {
|
||||
page_cache::ReadBufResult::Found(_guard) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
|
||||
and this function takes &mut self, so, no concurrent read_blk is possible");
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(blk_in_buffer);
|
||||
let _ = write_guard.mark_valid();
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
|
||||
Ok((buflen, buf))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -565,7 +565,7 @@ mod tests {
|
||||
);
|
||||
let expected_bytes = vec![
|
||||
/* TimelineMetadataHeader */
|
||||
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
|
||||
74, 104, 158, 105, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
|
||||
/* TimelineMetadataBodyV2 */
|
||||
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
|
||||
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
|
||||
@@ -574,7 +574,7 @@ mod tests {
|
||||
0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes)
|
||||
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
|
||||
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
|
||||
0, 0, 0, 15, // pg_version (4 bytes)
|
||||
0, 0, 0, 16, // pg_version (4 bytes)
|
||||
/* padding bytes */
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
|
||||
@@ -1728,7 +1728,7 @@ impl RemoteTimelineClient {
|
||||
task_mgr::spawn(
|
||||
&self.runtime,
|
||||
TaskKind::RemoteUploadTask,
|
||||
Some(self.tenant_shard_id),
|
||||
self.tenant_shard_id,
|
||||
Some(self.timeline_id),
|
||||
"remote upload",
|
||||
async move {
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::{l0_flush, page_cache, walrecord};
|
||||
use crate::{l0_flush, page_cache};
|
||||
use anyhow::{anyhow, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
@@ -249,9 +249,7 @@ impl InMemoryLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
///
|
||||
/// this is likely completly unused
|
||||
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
pub async fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let end_str = self.end_lsn_or_max();
|
||||
|
||||
println!(
|
||||
@@ -259,39 +257,6 @@ impl InMemoryLayer {
|
||||
self.timeline_id, self.start_lsn, end_str,
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let cursor = inner.file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
let mut desc = String::new();
|
||||
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
|
||||
let val = Value::des(&buf);
|
||||
match val {
|
||||
Ok(Value::Image(img)) => {
|
||||
write!(&mut desc, " img {} bytes", img.len())?;
|
||||
}
|
||||
Ok(Value::WalRecord(rec)) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec).unwrap();
|
||||
write!(
|
||||
&mut desc,
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)?;
|
||||
}
|
||||
Err(err) => {
|
||||
write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
|
||||
}
|
||||
}
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -536,7 +501,6 @@ impl InMemoryLayer {
|
||||
|
||||
use l0_flush::Inner;
|
||||
let _concurrency_permit = match l0_flush_global_state {
|
||||
Inner::PageCached => None,
|
||||
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
|
||||
};
|
||||
|
||||
@@ -568,34 +532,6 @@ impl InMemoryLayer {
|
||||
.await?;
|
||||
|
||||
match l0_flush_global_state {
|
||||
l0_flush::Inner::PageCached => {
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
|
||||
let mut buf = Vec::new();
|
||||
|
||||
let cursor = inner.file.block_cursor();
|
||||
|
||||
for (key, vec_map) in inner.index.iter() {
|
||||
// Write all page versions
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?;
|
||||
let will_init = Value::des(&buf)?.will_init();
|
||||
let (tmp, res) = delta_layer_writer
|
||||
.put_value_bytes(
|
||||
Key::from_compact(*key),
|
||||
*lsn,
|
||||
buf.slice_len(),
|
||||
will_init,
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
res?;
|
||||
buf = tmp.into_raw_slice().into_inner();
|
||||
}
|
||||
}
|
||||
}
|
||||
l0_flush::Inner::Direct { .. } => {
|
||||
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
|
||||
assert_eq!(
|
||||
|
||||
@@ -98,7 +98,7 @@ pub fn start_background_loops(
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_shard_id),
|
||||
tenant_shard_id,
|
||||
None,
|
||||
&format!("compactor for tenant {tenant_shard_id}"),
|
||||
{
|
||||
@@ -121,7 +121,7 @@ pub fn start_background_loops(
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::GarbageCollector,
|
||||
Some(tenant_shard_id),
|
||||
tenant_shard_id,
|
||||
None,
|
||||
&format!("garbage collector for tenant {tenant_shard_id}"),
|
||||
{
|
||||
@@ -144,7 +144,7 @@ pub fn start_background_loops(
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::IngestHousekeeping,
|
||||
Some(tenant_shard_id),
|
||||
tenant_shard_id,
|
||||
None,
|
||||
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
|
||||
{
|
||||
|
||||
@@ -2281,7 +2281,7 @@ impl Timeline {
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_shard_id),
|
||||
self.tenant_shard_id,
|
||||
Some(self.timeline_id),
|
||||
"layer flush task",
|
||||
async move {
|
||||
@@ -2635,7 +2635,7 @@ impl Timeline {
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::InitialLogicalSizeCalculation,
|
||||
Some(self.tenant_shard_id),
|
||||
self.tenant_shard_id,
|
||||
Some(self.timeline_id),
|
||||
"initial size calculation",
|
||||
// NB: don't log errors here, task_mgr will do that.
|
||||
@@ -2803,7 +2803,7 @@ impl Timeline {
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
|
||||
Some(self.tenant_shard_id),
|
||||
self.tenant_shard_id,
|
||||
Some(self.timeline_id),
|
||||
"ondemand logical size calculation",
|
||||
async move {
|
||||
@@ -5162,7 +5162,7 @@ impl Timeline {
|
||||
let task_id = task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
task_mgr::TaskKind::DownloadAllRemoteLayers,
|
||||
Some(self.tenant_shard_id),
|
||||
self.tenant_shard_id,
|
||||
Some(self.timeline_id),
|
||||
"download all remote layers task",
|
||||
async move {
|
||||
|
||||
@@ -395,7 +395,7 @@ impl DeleteTimelineFlow {
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_shard_id),
|
||||
tenant_shard_id,
|
||||
Some(timeline_id),
|
||||
"timeline_delete",
|
||||
async move {
|
||||
|
||||
@@ -60,7 +60,7 @@ impl Timeline {
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Eviction,
|
||||
Some(self.tenant_shard_id),
|
||||
self.tenant_shard_id,
|
||||
Some(self.timeline_id),
|
||||
&format!(
|
||||
"layer eviction for {}/{}",
|
||||
|
||||
@@ -18,6 +18,7 @@ import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
CREATE_TABLE = """
|
||||
CREATE TYPE arch AS ENUM ('ARM64', 'X64', 'UNKNOWN');
|
||||
CREATE TABLE IF NOT EXISTS results (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
parent_suite TEXT NOT NULL,
|
||||
@@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS results (
|
||||
stopped_at TIMESTAMPTZ NOT NULL,
|
||||
duration INT NOT NULL,
|
||||
flaky BOOLEAN NOT NULL,
|
||||
arch arch DEFAULT 'X64',
|
||||
build_type TEXT NOT NULL,
|
||||
pg_version INT NOT NULL,
|
||||
run_id BIGINT NOT NULL,
|
||||
@@ -35,7 +37,7 @@ CREATE TABLE IF NOT EXISTS results (
|
||||
reference TEXT NOT NULL,
|
||||
revision CHAR(40) NOT NULL,
|
||||
raw JSONB COMPRESSION lz4 NOT NULL,
|
||||
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
);
|
||||
"""
|
||||
|
||||
@@ -50,6 +52,7 @@ class Row:
|
||||
stopped_at: datetime
|
||||
duration: int
|
||||
flaky: bool
|
||||
arch: str
|
||||
build_type: str
|
||||
pg_version: int
|
||||
run_id: int
|
||||
@@ -121,6 +124,14 @@ def ingest_test_result(
|
||||
raw.pop("labels")
|
||||
raw.pop("extra")
|
||||
|
||||
# All allure parameters are prefixed with "__", see test_runner/fixtures/parametrize.py
|
||||
parameters = {
|
||||
p["name"].removeprefix("__"): p["value"]
|
||||
for p in test["parameters"]
|
||||
if p["name"].startswith("__")
|
||||
}
|
||||
arch = parameters.get("arch", "UNKNOWN").strip("'")
|
||||
|
||||
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
|
||||
labels = {label["name"]: label["value"] for label in test["labels"]}
|
||||
row = Row(
|
||||
@@ -132,6 +143,7 @@ def ingest_test_result(
|
||||
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
|
||||
duration=test["time"]["duration"],
|
||||
flaky=test["flaky"] or test["retriesStatusChange"],
|
||||
arch=arch,
|
||||
build_type=build_type,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
|
||||
@@ -44,7 +44,7 @@ run the following commands from the top of the neon.git checkout
|
||||
|
||||
# test suite run
|
||||
export TEST_OUTPUT="$TEST_OUTPUT"
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py
|
||||
DEFAULT_PG_VERSION=16 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py
|
||||
|
||||
# for interactive use
|
||||
export NEON_REPO_DIR="$NEON_REPO_DIR"
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::Client;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
@@ -16,7 +16,7 @@ use futures_util::StreamExt;
|
||||
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
|
||||
|
||||
pub(crate) struct TimelineAnalysis {
|
||||
/// Anomalies detected
|
||||
@@ -48,13 +48,12 @@ impl TimelineAnalysis {
|
||||
}
|
||||
|
||||
pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: &TenantShardTimelineId,
|
||||
tenant_objects: &mut TenantObjectListing,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
s3_data: Option<S3TimelineBlobData>,
|
||||
s3_data: Option<RemoteTimelineBlobData>,
|
||||
) -> TimelineAnalysis {
|
||||
let mut result = TimelineAnalysis::new();
|
||||
|
||||
@@ -78,7 +77,9 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
|
||||
match s3_data {
|
||||
Some(s3_data) => {
|
||||
result.garbage_keys.extend(s3_data.unknown_keys);
|
||||
result
|
||||
.garbage_keys
|
||||
.extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
|
||||
|
||||
match s3_data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
@@ -143,11 +144,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
|
||||
// HEAD request used here to address a race condition when an index was uploaded concurrently
|
||||
// with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
|
||||
let response = s3_client
|
||||
.head_object()
|
||||
.bucket(target.bucket_name())
|
||||
.key(path.get_path().as_str())
|
||||
.send()
|
||||
let response = remote_client
|
||||
.head_object(&path, &CancellationToken::new())
|
||||
.await;
|
||||
|
||||
if response.is_err() {
|
||||
@@ -284,14 +282,14 @@ impl TenantObjectListing {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct S3TimelineBlobData {
|
||||
pub(crate) struct RemoteTimelineBlobData {
|
||||
pub(crate) blob_data: BlobDataParseResult,
|
||||
|
||||
// Index objects that were not used when loading `blob_data`, e.g. those from old generations
|
||||
pub(crate) unused_index_keys: Vec<String>,
|
||||
pub(crate) unused_index_keys: Vec<ListingObject>,
|
||||
|
||||
// Objects whose keys were not recognized at all, i.e. not layer files, not indices
|
||||
pub(crate) unknown_keys: Vec<String>,
|
||||
pub(crate) unknown_keys: Vec<ListingObject>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -323,31 +321,37 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
|
||||
}
|
||||
|
||||
pub(crate) async fn list_timeline_blobs(
|
||||
s3_client: &Client,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: TenantShardTimelineId,
|
||||
s3_root: &RootTarget,
|
||||
) -> anyhow::Result<S3TimelineBlobData> {
|
||||
root_target: &RootTarget,
|
||||
) -> anyhow::Result<RemoteTimelineBlobData> {
|
||||
let mut s3_layers = HashSet::new();
|
||||
|
||||
let mut errors = Vec::new();
|
||||
let mut unknown_keys = Vec::new();
|
||||
|
||||
let mut timeline_dir_target = s3_root.timeline_root(&id);
|
||||
let mut timeline_dir_target = root_target.timeline_root(&id);
|
||||
timeline_dir_target.delimiter = String::new();
|
||||
|
||||
let mut index_part_keys: Vec<String> = Vec::new();
|
||||
let mut index_part_keys: Vec<ListingObject> = Vec::new();
|
||||
let mut initdb_archive: bool = false;
|
||||
|
||||
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
|
||||
while let Some(obj) = stream.next().await {
|
||||
let obj = obj?;
|
||||
let key = obj.key();
|
||||
let prefix_str = &timeline_dir_target
|
||||
.prefix_in_bucket
|
||||
.strip_prefix("/")
|
||||
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
|
||||
|
||||
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
|
||||
let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
|
||||
while let Some(obj) = stream.next().await {
|
||||
let (key, Some(obj)) = obj? else {
|
||||
panic!("ListingObject not specified");
|
||||
};
|
||||
|
||||
let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
|
||||
match blob_name {
|
||||
Some(name) if name.starts_with("index_part.json") => {
|
||||
tracing::debug!("Index key {key}");
|
||||
index_part_keys.push(key.to_owned())
|
||||
index_part_keys.push(obj)
|
||||
}
|
||||
Some("initdb.tar.zst") => {
|
||||
tracing::debug!("initdb archive {key}");
|
||||
@@ -358,7 +362,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
}
|
||||
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
|
||||
Ok((new_layer, gen)) => {
|
||||
tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
|
||||
tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
|
||||
s3_layers.insert((new_layer, gen));
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -366,13 +370,13 @@ pub(crate) async fn list_timeline_blobs(
|
||||
errors.push(
|
||||
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
|
||||
);
|
||||
unknown_keys.push(key.to_string());
|
||||
unknown_keys.push(obj);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::warn!("Unknown key {}", key);
|
||||
tracing::warn!("Unknown key {key}");
|
||||
errors.push(format!("S3 list response got an object with odd key {key}"));
|
||||
unknown_keys.push(key.to_string());
|
||||
unknown_keys.push(obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -381,7 +385,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
tracing::debug!(
|
||||
"Timeline is empty apart from initdb archive: expected post-deletion state."
|
||||
);
|
||||
return Ok(S3TimelineBlobData {
|
||||
return Ok(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Relic,
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys: Vec::new(),
|
||||
@@ -395,13 +399,13 @@ pub(crate) async fn list_timeline_blobs(
|
||||
// Stripping the index key to the last part, because RemotePath doesn't
|
||||
// like absolute paths, and depending on prefix_in_bucket it's possible
|
||||
// for the keys we read back to start with a slash.
|
||||
let basename = key.rsplit_once('/').unwrap().1;
|
||||
let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
|
||||
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
|
||||
})
|
||||
.max_by_key(|i| i.1)
|
||||
.map(|(k, g)| (k.clone(), g))
|
||||
{
|
||||
Some((key, gen)) => (Some(key), gen),
|
||||
Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
|
||||
None => {
|
||||
// Legacy/missing case: one or zero index parts, which did not have a generation
|
||||
(index_part_keys.pop(), Generation::none())
|
||||
@@ -416,17 +420,14 @@ pub(crate) async fn list_timeline_blobs(
|
||||
}
|
||||
|
||||
if let Some(index_part_object_key) = index_part_object.as_ref() {
|
||||
let index_part_bytes = download_object_with_retries(
|
||||
s3_client,
|
||||
&timeline_dir_target.bucket_name,
|
||||
index_part_object_key,
|
||||
)
|
||||
.await
|
||||
.context("index_part.json download")?;
|
||||
let index_part_bytes =
|
||||
download_object_with_retries(remote_client, &index_part_object_key.key)
|
||||
.await
|
||||
.context("index_part.json download")?;
|
||||
|
||||
match serde_json::from_slice(&index_part_bytes) {
|
||||
Ok(index_part) => {
|
||||
return Ok(S3TimelineBlobData {
|
||||
return Ok(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Parsed {
|
||||
index_part: Box::new(index_part),
|
||||
index_part_generation,
|
||||
@@ -448,7 +449,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
);
|
||||
}
|
||||
|
||||
Ok(S3TimelineBlobData {
|
||||
Ok(RemoteTimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
|
||||
unused_index_keys: index_part_keys,
|
||||
unknown_keys,
|
||||
|
||||
@@ -6,7 +6,7 @@ use remote_storage::ListingMode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
|
||||
checks::parse_layer_object_name, init_remote, metadata_stream::stream_tenants,
|
||||
stream_objects_with_retries, BucketConfig, NodeKind,
|
||||
};
|
||||
|
||||
@@ -50,9 +50,8 @@ pub async fn find_large_objects(
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (remote_client, target) =
|
||||
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
|
||||
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = pin!(stream_tenants(&remote_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
|
||||
@@ -19,8 +19,8 @@ use utils::id::TenantId;
|
||||
|
||||
use crate::{
|
||||
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
|
||||
init_remote_generic, list_objects_with_retries_generic,
|
||||
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic},
|
||||
init_remote, list_objects_with_retries,
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants},
|
||||
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
|
||||
};
|
||||
|
||||
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<GarbageList> {
|
||||
// Construct clients for S3 and for Console API
|
||||
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?;
|
||||
let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
|
||||
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
|
||||
|
||||
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
|
||||
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
|
||||
let tenants = stream_tenants_generic(&remote_client, &target);
|
||||
let tenants = stream_tenants(&remote_client, &target);
|
||||
let tenants_checked = tenants.map_ok(|t| {
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
let console_cache = console_cache.clone();
|
||||
@@ -237,14 +237,13 @@ async fn find_garbage_inner(
|
||||
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
|
||||
// identify it as purge-able anyway
|
||||
if console_result.is_none() {
|
||||
let timelines =
|
||||
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id)
|
||||
.await?
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
let timelines = stream_tenant_timelines(&remote_client, &target, tenant_shard_id)
|
||||
.await?
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if timelines.is_empty() {
|
||||
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
|
||||
let tenant_objects = list_objects_with_retries_generic(
|
||||
let tenant_objects = list_objects_with_retries(
|
||||
&remote_client,
|
||||
ListingMode::WithDelimiter,
|
||||
&target.tenant_root(&tenant_shard_id),
|
||||
@@ -265,7 +264,7 @@ async fn find_garbage_inner(
|
||||
|
||||
for timeline_r in timelines {
|
||||
let timeline = timeline_r?;
|
||||
let timeline_objects = list_objects_with_retries_generic(
|
||||
let timeline_objects = list_objects_with_retries(
|
||||
&remote_client,
|
||||
ListingMode::WithDelimiter,
|
||||
&target.timeline_root(&timeline),
|
||||
@@ -331,8 +330,7 @@ async fn find_garbage_inner(
|
||||
|
||||
// Construct a stream of all timelines within active tenants
|
||||
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
|
||||
let timelines =
|
||||
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
|
||||
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, *t));
|
||||
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
@@ -507,7 +505,7 @@ pub async fn purge_garbage(
|
||||
);
|
||||
|
||||
let (remote_client, _target) =
|
||||
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
|
||||
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
|
||||
|
||||
assert_eq!(
|
||||
&garbage_list.bucket_config.bucket,
|
||||
|
||||
@@ -15,7 +15,7 @@ use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use anyhow::Context;
|
||||
use aws_config::retry::{RetryConfigBuilder, RetryMode};
|
||||
use aws_sdk_s3::config::Region;
|
||||
use aws_sdk_s3::error::DisplayErrorContext;
|
||||
@@ -352,7 +352,7 @@ fn make_root_target(
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_remote(
|
||||
async fn init_remote_s3(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
|
||||
@@ -369,7 +369,7 @@ async fn init_remote(
|
||||
Ok((s3_client, s3_root))
|
||||
}
|
||||
|
||||
async fn init_remote_generic(
|
||||
async fn init_remote(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
|
||||
@@ -394,45 +394,10 @@ async fn init_remote_generic(
|
||||
|
||||
// We already pass the prefix to the remote client above
|
||||
let prefix_in_root_target = String::new();
|
||||
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
|
||||
let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
|
||||
|
||||
let client = GenericRemoteStorage::from_config(&storage_config).await?;
|
||||
Ok((client, s3_root))
|
||||
}
|
||||
|
||||
async fn list_objects_with_retries(
|
||||
s3_client: &Client,
|
||||
s3_target: &S3Target,
|
||||
continuation_token: Option<String>,
|
||||
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
|
||||
for trial in 0..MAX_RETRIES {
|
||||
match s3_client
|
||||
.list_objects_v2()
|
||||
.bucket(&s3_target.bucket_name)
|
||||
.prefix(&s3_target.prefix_in_bucket)
|
||||
.delimiter(&s3_target.delimiter)
|
||||
.set_continuation_token(continuation_token.clone())
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => return Ok(response),
|
||||
Err(e) => {
|
||||
if trial == MAX_RETRIES - 1 {
|
||||
return Err(e)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
}
|
||||
error!(
|
||||
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
|
||||
s3_target.bucket_name,
|
||||
s3_target.prefix_in_bucket,
|
||||
s3_target.delimiter,
|
||||
DisplayErrorContext(e),
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
|
||||
Ok((client, root_target))
|
||||
}
|
||||
|
||||
/// Listing possibly large amounts of keys in a streaming fashion.
|
||||
@@ -452,23 +417,26 @@ fn stream_objects_with_retries<'a>(
|
||||
let mut list_stream =
|
||||
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
|
||||
while let Some(res) = list_stream.next().await {
|
||||
if let Err(err) = res {
|
||||
let yield_err = if err.is_permanent() {
|
||||
true
|
||||
} else {
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
trial += 1;
|
||||
trial == MAX_RETRIES - 1
|
||||
};
|
||||
if yield_err {
|
||||
yield Err(err)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
break;
|
||||
match res {
|
||||
Err(err) => {
|
||||
let yield_err = if err.is_permanent() {
|
||||
true
|
||||
} else {
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
trial += 1;
|
||||
trial == MAX_RETRIES - 1
|
||||
};
|
||||
if yield_err {
|
||||
yield Err(err)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(res) => {
|
||||
trial = 0;
|
||||
yield Ok(res);
|
||||
}
|
||||
} else {
|
||||
trial = 0;
|
||||
yield res.map_err(anyhow::Error::from);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -476,7 +444,7 @@ fn stream_objects_with_retries<'a>(
|
||||
|
||||
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
|
||||
/// use [`stream_objects_with_retries`] instead.
|
||||
async fn list_objects_with_retries_generic(
|
||||
async fn list_objects_with_retries(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
listing_mode: ListingMode,
|
||||
s3_target: &S3Target,
|
||||
@@ -514,40 +482,34 @@ async fn list_objects_with_retries_generic(
|
||||
}
|
||||
|
||||
async fn download_object_with_retries(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
key: &str,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
key: &RemotePath,
|
||||
) -> anyhow::Result<Vec<u8>> {
|
||||
for _ in 0..MAX_RETRIES {
|
||||
let mut body_buf = Vec::new();
|
||||
let response_stream = match s3_client
|
||||
.get_object()
|
||||
.bucket(bucket_name)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
let cancel = CancellationToken::new();
|
||||
for trial in 0..MAX_RETRIES {
|
||||
let mut buf = Vec::new();
|
||||
let download = match remote_client.download(key, &cancel).await {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
error!("Failed to download object for key {key}: {e}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match response_stream
|
||||
.body
|
||||
.into_async_read()
|
||||
.read_to_end(&mut body_buf)
|
||||
match tokio_util::io::StreamReader::new(download.download_stream)
|
||||
.read_to_end(&mut buf)
|
||||
.await
|
||||
{
|
||||
Ok(bytes_read) => {
|
||||
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
|
||||
return Ok(body_buf);
|
||||
return Ok(buf);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to stream object body for key {key}: {e}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -555,7 +517,7 @@ async fn download_object_with_retries(
|
||||
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
|
||||
}
|
||||
|
||||
async fn download_object_to_file(
|
||||
async fn download_object_to_file_s3(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
key: &str,
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::str::FromStr;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use async_stream::{stream, try_stream};
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use futures::StreamExt;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
|
||||
use tokio_stream::Stream;
|
||||
@@ -15,7 +14,7 @@ use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
|
||||
pub fn stream_tenants_generic<'a>(
|
||||
pub fn stream_tenants<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
@@ -36,92 +35,36 @@ pub fn stream_tenants_generic<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
||||
pub fn stream_tenants<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
try_stream! {
|
||||
let mut continuation_token = None;
|
||||
let tenants_target = target.tenants_root();
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &tenants_target, continuation_token.clone()).await?;
|
||||
|
||||
let new_entry_ids = fetch_response
|
||||
.common_prefixes()
|
||||
.iter()
|
||||
.filter_map(|prefix| prefix.prefix())
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.strip_prefix(&tenants_target.prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
}).map(|entry_id_str| {
|
||||
entry_id_str
|
||||
.parse()
|
||||
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
|
||||
});
|
||||
|
||||
for i in new_entry_ids {
|
||||
yield i?;
|
||||
}
|
||||
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_tenant_shards<'a>(
|
||||
s3_client: &'a Client,
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
|
||||
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
|
||||
let mut continuation_token = None;
|
||||
let shards_target = target.tenant_shards_prefix(&tenant_id);
|
||||
|
||||
loop {
|
||||
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
|
||||
let fetch_response = match fetch_response {
|
||||
Err(e) => {
|
||||
tenant_shard_ids.push(Err(e));
|
||||
break;
|
||||
}
|
||||
Ok(r) => r,
|
||||
};
|
||||
let strip_prefix = target.tenants_root().prefix_in_bucket;
|
||||
let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
|
||||
|
||||
let new_entry_ids = fetch_response
|
||||
.common_prefixes()
|
||||
.iter()
|
||||
.filter_map(|prefix| prefix.prefix())
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
})
|
||||
.map(|entry_id_str| {
|
||||
let first_part = entry_id_str.split('/').next().unwrap();
|
||||
tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
|
||||
let listing =
|
||||
list_objects_with_retries(remote_client, ListingMode::WithDelimiter, &shards_target)
|
||||
.await?;
|
||||
|
||||
first_part
|
||||
.parse::<TenantShardId>()
|
||||
.with_context(|| format!("Incorrect entry id str: {first_part}"))
|
||||
});
|
||||
let tenant_shard_ids = listing
|
||||
.prefixes
|
||||
.iter()
|
||||
.map(|prefix| prefix.get_path().as_str())
|
||||
.filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) })
|
||||
.map(|entry_id_str| {
|
||||
let first_part = entry_id_str.split('/').next().unwrap();
|
||||
|
||||
for i in new_entry_ids {
|
||||
tenant_shard_ids.push(i);
|
||||
}
|
||||
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
first_part
|
||||
.parse::<TenantShardId>()
|
||||
.with_context(|| format!("Incorrect entry id str: {first_part}"))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len());
|
||||
Ok(stream! {
|
||||
for i in tenant_shard_ids {
|
||||
let id = i?;
|
||||
@@ -130,69 +73,10 @@ pub async fn stream_tenant_shards<'a>(
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
|
||||
/// using ListObjectsv2. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
pub async fn stream_tenant_timelines<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
tenant: TenantShardId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
|
||||
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
|
||||
let mut continuation_token = None;
|
||||
let timelines_target = target.timelines_root(&tenant);
|
||||
|
||||
loop {
|
||||
tracing::debug!("Listing in {}", tenant);
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
|
||||
.await;
|
||||
let fetch_response = match fetch_response {
|
||||
Err(e) => {
|
||||
timeline_ids.push(Err(e));
|
||||
break;
|
||||
}
|
||||
Ok(r) => r,
|
||||
};
|
||||
|
||||
let new_entry_ids = fetch_response
|
||||
.common_prefixes()
|
||||
.iter()
|
||||
.filter_map(|prefix| prefix.prefix())
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.strip_prefix(&timelines_target.prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
})
|
||||
.map(|entry_id_str| {
|
||||
entry_id_str
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
|
||||
});
|
||||
|
||||
for i in new_entry_ids {
|
||||
timeline_ids.push(i);
|
||||
}
|
||||
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Yielding for {}", tenant);
|
||||
Ok(stream! {
|
||||
for i in timeline_ids {
|
||||
let id = i?;
|
||||
yield Ok(TenantShardTimelineId::new(tenant, id));
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
|
||||
/// using a listing. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
pub async fn stream_tenant_timelines_generic<'a>(
|
||||
pub async fn stream_tenant_timelines<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
tenant: TenantShardId,
|
||||
@@ -200,6 +84,11 @@ pub async fn stream_tenant_timelines_generic<'a>(
|
||||
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
|
||||
let timelines_target = target.timelines_root(&tenant);
|
||||
|
||||
let prefix_str = &timelines_target
|
||||
.prefix_in_bucket
|
||||
.strip_prefix("/")
|
||||
.unwrap_or(&timelines_target.prefix_in_bucket);
|
||||
|
||||
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
|
||||
remote_client,
|
||||
ListingMode::WithDelimiter,
|
||||
@@ -220,11 +109,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
|
||||
.prefixes
|
||||
.iter()
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.get_path()
|
||||
.as_str()
|
||||
.strip_prefix(&timelines_target.prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
prefix.get_path().as_str().strip_prefix(prefix_str)
|
||||
})
|
||||
.map(|entry_id_str| {
|
||||
entry_id_str
|
||||
@@ -237,7 +122,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Yielding for {}", tenant);
|
||||
tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant);
|
||||
Ok(stream! {
|
||||
for i in timeline_ids {
|
||||
let id = i?;
|
||||
@@ -247,37 +132,6 @@ pub async fn stream_tenant_timelines_generic<'a>(
|
||||
}
|
||||
|
||||
pub(crate) fn stream_listing<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a S3Target,
|
||||
) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
|
||||
try_stream! {
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
|
||||
|
||||
if target.delimiter.is_empty() {
|
||||
for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
|
||||
{
|
||||
let object_id = ObjectIdentifier::builder().key(object_key).build()?;
|
||||
yield object_id;
|
||||
}
|
||||
} else {
|
||||
for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
|
||||
let object_id = ObjectIdentifier::builder().key(prefix).build()?;
|
||||
yield object_id;
|
||||
}
|
||||
}
|
||||
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn stream_listing_generic<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a S3Target,
|
||||
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
|
||||
@@ -13,10 +12,11 @@ use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::controller_api::TenantDescribeResponse;
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
|
||||
use reqwest::Method;
|
||||
use serde::Serialize;
|
||||
use storage_controller_client::control_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TenantTimelineId};
|
||||
@@ -240,38 +240,13 @@ impl TenantRefAccumulator {
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_old_enough(
|
||||
s3_client: &Client,
|
||||
bucket_config: &BucketConfig,
|
||||
min_age: &Duration,
|
||||
key: &str,
|
||||
summary: &mut GcSummary,
|
||||
) -> bool {
|
||||
fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
|
||||
// Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
|
||||
// it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
|
||||
let age: Duration = match s3_client
|
||||
.head_object()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(response) => match response.last_modified {
|
||||
None => {
|
||||
tracing::warn!("Missing last_modified");
|
||||
summary.remote_storage_errors += 1;
|
||||
return false;
|
||||
}
|
||||
Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) {
|
||||
Ok(Ok(e)) => e,
|
||||
Err(_) | Ok(Err(_)) => {
|
||||
tracing::warn!("Bad last_modified time: {last_modified:?}");
|
||||
return false;
|
||||
}
|
||||
},
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to HEAD {key}: {e}");
|
||||
let age = match key.last_modified.elapsed() {
|
||||
Ok(e) => e,
|
||||
Err(_) => {
|
||||
tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
|
||||
summary.remote_storage_errors += 1;
|
||||
return false;
|
||||
}
|
||||
@@ -289,17 +264,30 @@ async fn is_old_enough(
|
||||
old_enough
|
||||
}
|
||||
|
||||
/// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
|
||||
async fn check_is_old_enough(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
key: &RemotePath,
|
||||
min_age: &Duration,
|
||||
summary: &mut GcSummary,
|
||||
) -> Option<bool> {
|
||||
let listing_object = remote_client
|
||||
.head_object(key, &CancellationToken::new())
|
||||
.await
|
||||
.ok()?;
|
||||
Some(is_old_enough(min_age, &listing_object, summary))
|
||||
}
|
||||
|
||||
async fn maybe_delete_index(
|
||||
s3_client: &Client,
|
||||
bucket_config: &BucketConfig,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
min_age: &Duration,
|
||||
latest_gen: Generation,
|
||||
key: &str,
|
||||
obj: &ListingObject,
|
||||
mode: GcMode,
|
||||
summary: &mut GcSummary,
|
||||
) {
|
||||
// Validation: we will only delete things that parse cleanly
|
||||
let basename = key.rsplit_once('/').unwrap().1;
|
||||
let basename = obj.key.get_path().file_name().unwrap();
|
||||
let candidate_generation =
|
||||
match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
|
||||
Some(g) => g,
|
||||
@@ -328,7 +316,7 @@ async fn maybe_delete_index(
|
||||
return;
|
||||
}
|
||||
|
||||
if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await {
|
||||
if !is_old_enough(min_age, obj, summary) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -338,11 +326,8 @@ async fn maybe_delete_index(
|
||||
}
|
||||
|
||||
// All validations passed: erase the object
|
||||
match s3_client
|
||||
.delete_object()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.key(key)
|
||||
.send()
|
||||
match remote_client
|
||||
.delete(&obj.key, &CancellationToken::new())
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
@@ -358,8 +343,7 @@ async fn maybe_delete_index(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn gc_ancestor(
|
||||
s3_client: &Client,
|
||||
bucket_config: &BucketConfig,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
root_target: &RootTarget,
|
||||
min_age: &Duration,
|
||||
ancestor: TenantShardId,
|
||||
@@ -368,7 +352,7 @@ async fn gc_ancestor(
|
||||
summary: &mut GcSummary,
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan timelines in the ancestor
|
||||
let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?;
|
||||
let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
|
||||
let mut timelines = std::pin::pin!(timelines);
|
||||
|
||||
// Build a list of keys to retain
|
||||
@@ -376,7 +360,7 @@ async fn gc_ancestor(
|
||||
while let Some(ttid) = timelines.next().await {
|
||||
let ttid = ttid?;
|
||||
|
||||
let data = list_timeline_blobs(s3_client, ttid, root_target).await?;
|
||||
let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
|
||||
|
||||
let s3_layers = match data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
@@ -427,7 +411,8 @@ async fn gc_ancestor(
|
||||
|
||||
// We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
|
||||
// to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
|
||||
if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await {
|
||||
let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
|
||||
if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -437,13 +422,7 @@ async fn gc_ancestor(
|
||||
}
|
||||
|
||||
// All validations passed: erase the object
|
||||
match s3_client
|
||||
.delete_object()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.key(&key)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
match remote_client.delete(&path, &CancellationToken::new()).await {
|
||||
Ok(_) => {
|
||||
tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
|
||||
summary.ancestor_layers_deleted += 1;
|
||||
@@ -477,10 +456,10 @@ pub async fn pageserver_physical_gc(
|
||||
min_age: Duration,
|
||||
mode: GcMode,
|
||||
) -> anyhow::Result<GcSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
let tenants = if tenant_shard_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&s3_client, &target))
|
||||
futures::future::Either::Left(stream_tenants(&remote_client, &target))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
|
||||
};
|
||||
@@ -493,14 +472,13 @@ pub async fn pageserver_physical_gc(
|
||||
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn gc_timeline(
|
||||
s3_client: &Client,
|
||||
bucket_config: &BucketConfig,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
min_age: &Duration,
|
||||
target: &RootTarget,
|
||||
mode: GcMode,
|
||||
@@ -508,7 +486,7 @@ pub async fn pageserver_physical_gc(
|
||||
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
|
||||
) -> anyhow::Result<GcSummary> {
|
||||
let mut summary = GcSummary::default();
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
|
||||
let (index_part, latest_gen, candidates) = match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
@@ -533,17 +511,9 @@ pub async fn pageserver_physical_gc(
|
||||
accumulator.lock().unwrap().update(ttid, index_part);
|
||||
|
||||
for key in candidates {
|
||||
maybe_delete_index(
|
||||
s3_client,
|
||||
bucket_config,
|
||||
min_age,
|
||||
latest_gen,
|
||||
&key,
|
||||
mode,
|
||||
&mut summary,
|
||||
)
|
||||
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
|
||||
.await;
|
||||
maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
|
||||
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
@@ -554,15 +524,7 @@ pub async fn pageserver_physical_gc(
|
||||
// Drain futures for per-shard GC, populating accumulator as a side effect
|
||||
{
|
||||
let timelines = timelines.map_ok(|ttid| {
|
||||
gc_timeline(
|
||||
&s3_client,
|
||||
bucket_config,
|
||||
&min_age,
|
||||
&target,
|
||||
mode,
|
||||
ttid,
|
||||
&accumulator,
|
||||
)
|
||||
gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
|
||||
});
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||
|
||||
@@ -586,8 +548,7 @@ pub async fn pageserver_physical_gc(
|
||||
|
||||
for ancestor_shard in ancestor_shards {
|
||||
gc_ancestor(
|
||||
&s3_client,
|
||||
bucket_config,
|
||||
&remote_client,
|
||||
&target,
|
||||
&min_age,
|
||||
ancestor_shard,
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::checks::{
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
|
||||
TenantObjectListing, TimelineAnalysis,
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult,
|
||||
RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::Serialize;
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::ShardCount;
|
||||
@@ -36,7 +36,7 @@ impl MetadataSummary {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn update_data(&mut self, data: &S3TimelineBlobData) {
|
||||
fn update_data(&mut self, data: &RemoteTimelineBlobData) {
|
||||
self.timeline_shard_count += 1;
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
@@ -120,10 +120,10 @@ pub async fn scan_pageserver_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
|
||||
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
|
||||
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&s3_client, &target))
|
||||
futures::future::Either::Left(stream_tenants(&remote_client, &target))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
|
||||
};
|
||||
@@ -133,20 +133,20 @@ pub async fn scan_pageserver_metadata(
|
||||
const CONCURRENCY: usize = 32;
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn report_on_timeline(
|
||||
s3_client: &Client,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
@@ -157,12 +157,11 @@ pub async fn scan_pageserver_metadata(
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
async fn analyze_tenant(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
summary: &mut MetadataSummary,
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
|
||||
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
|
||||
highest_shard_count: ShardCount,
|
||||
) {
|
||||
summary.tenant_count += 1;
|
||||
@@ -191,8 +190,7 @@ pub async fn scan_pageserver_metadata(
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis = branch_cleanup_and_check_errors(
|
||||
s3_client,
|
||||
target,
|
||||
remote_client,
|
||||
&ttid,
|
||||
&mut tenant_objects,
|
||||
None,
|
||||
@@ -273,8 +271,7 @@ pub async fn scan_pageserver_metadata(
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(
|
||||
&s3_client,
|
||||
&target,
|
||||
&remote_client,
|
||||
prev_tenant_id,
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
@@ -311,8 +308,7 @@ pub async fn scan_pageserver_metadata(
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
&s3_client,
|
||||
&target,
|
||||
&remote_client,
|
||||
tenant_id.expect("Must be set if results are present"),
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
|
||||
@@ -14,9 +14,8 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
cloud_admin_api::CloudAdminApiClient, init_remote_generic,
|
||||
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget,
|
||||
TenantShardTimelineId,
|
||||
cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
|
||||
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
|
||||
};
|
||||
|
||||
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
|
||||
@@ -107,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
|
||||
let timelines = client.query(&query, &[]).await?;
|
||||
info!("loaded {} timelines", timelines.len());
|
||||
|
||||
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?;
|
||||
let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
|
||||
|
||||
@@ -188,14 +187,19 @@ async fn check_timeline(
|
||||
// we need files, so unset it.
|
||||
timeline_dir_target.delimiter = String::new();
|
||||
|
||||
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
|
||||
let prefix_str = &timeline_dir_target
|
||||
.prefix_in_bucket
|
||||
.strip_prefix("/")
|
||||
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
|
||||
|
||||
let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
|
||||
while let Some(obj) = stream.next().await {
|
||||
let (key, _obj) = obj?;
|
||||
|
||||
let seg_name = key
|
||||
.get_path()
|
||||
.as_str()
|
||||
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
|
||||
.strip_prefix(prefix_str)
|
||||
.expect("failed to extract segment name");
|
||||
expected_segfiles.remove(seg_name);
|
||||
}
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
|
||||
use crate::checks::{list_timeline_blobs, BlobDataParseResult, RemoteTimelineBlobData};
|
||||
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
|
||||
use crate::{
|
||||
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId,
|
||||
download_object_to_file_s3, init_remote, init_remote_s3, BucketConfig, NodeKind, RootTarget,
|
||||
TenantShardTimelineId,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use async_stream::stream;
|
||||
@@ -15,6 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
|
||||
@@ -34,7 +36,8 @@ impl SnapshotDownloader {
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let (s3_client, s3_root) =
|
||||
init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
Ok(Self {
|
||||
s3_client,
|
||||
s3_root,
|
||||
@@ -91,7 +94,7 @@ impl SnapshotDownloader {
|
||||
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
|
||||
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
|
||||
};
|
||||
download_object_to_file(
|
||||
download_object_to_file_s3(
|
||||
&self.s3_client,
|
||||
&self.bucket_config.bucket,
|
||||
&remote_layer_path,
|
||||
@@ -215,11 +218,11 @@ impl SnapshotDownloader {
|
||||
}
|
||||
|
||||
pub async fn download(&self) -> anyhow::Result<()> {
|
||||
let (s3_client, target) =
|
||||
let (remote_client, target) =
|
||||
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
// Generate a stream of TenantShardId
|
||||
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
|
||||
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
|
||||
let shards: Vec<TenantShardId> = shards.try_collect().await?;
|
||||
|
||||
// Only read from shards that have the highest count: avoids redundantly downloading
|
||||
@@ -237,18 +240,19 @@ impl SnapshotDownloader {
|
||||
|
||||
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
|
||||
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn load_timeline_index(
|
||||
s3_client: &Client,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
|
||||
let timelines =
|
||||
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
|
||||
|
||||
while let Some(i) = timelines.next().await {
|
||||
@@ -278,7 +282,7 @@ impl SnapshotDownloader {
|
||||
|
||||
for (ttid, layers) in ancestor_layers.into_iter() {
|
||||
tracing::info!(
|
||||
"Downloading {} layers from ancvestor timeline {ttid}...",
|
||||
"Downloading {} layers from ancestor timeline {ttid}...",
|
||||
layers.len()
|
||||
);
|
||||
|
||||
|
||||
@@ -71,8 +71,7 @@ a subdirectory for each version with naming convention `v{PG_VERSION}/`.
|
||||
Inside that dir, a `bin/postgres` binary should be present.
|
||||
`DEFAULT_PG_VERSION`: The version of Postgres to use,
|
||||
This is used to construct full path to the postgres binaries.
|
||||
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`. Alternatively,
|
||||
you can use `--pg-version` argument.
|
||||
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
|
||||
`TEST_OUTPUT`: Set the directory where test state and test output files
|
||||
should go.
|
||||
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
|
||||
|
||||
@@ -4643,6 +4643,7 @@ class StorageScrubber:
|
||||
]
|
||||
args = base_args + args
|
||||
|
||||
log.info(f"Invoking scrubber command {args} with env: {env}")
|
||||
(output_path, stdout, status_code) = subprocess_capture(
|
||||
self.log_dir,
|
||||
args,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
import toml
|
||||
from _pytest.python import Metafunc
|
||||
@@ -91,3 +92,23 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
and (platform := os.getenv("PLATFORM")) is not None
|
||||
):
|
||||
metafunc.parametrize("platform", [platform.lower()])
|
||||
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
|
||||
def pytest_runtest_makereport(*args, **kwargs):
|
||||
# Add test parameters to Allue report to distinguish the same tests with different parameters.
|
||||
# Names has `__` prefix to avoid conflicts with `pytest.mark.parametrize` parameters
|
||||
|
||||
# A mapping between `uname -m` and `RUNNER_ARCH` values.
|
||||
# `RUNNER_ARCH` environment variable is set on GitHub Runners,
|
||||
# possible values are X86, X64, ARM, or ARM64.
|
||||
# See https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
|
||||
uname_m = {
|
||||
"aarch64": "ARM64",
|
||||
"arm64": "ARM64",
|
||||
"x86_64": "X64",
|
||||
}.get(os.uname().machine, "UNKNOWN")
|
||||
arch = os.getenv("RUNNER_ARCH", uname_m)
|
||||
allure.dynamic.parameter("__arch", arch)
|
||||
|
||||
yield
|
||||
|
||||
@@ -3,8 +3,6 @@ import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
|
||||
"""
|
||||
This fixture is used to determine which version of Postgres to use for tests.
|
||||
@@ -52,7 +50,7 @@ class PgVersion(str, enum.Enum):
|
||||
return None
|
||||
|
||||
|
||||
DEFAULT_VERSION: PgVersion = PgVersion.V15
|
||||
DEFAULT_VERSION: PgVersion = PgVersion.V16
|
||||
|
||||
|
||||
def skip_on_postgres(version: PgVersion, reason: str):
|
||||
@@ -69,22 +67,8 @@ def xfail_on_postgres(version: PgVersion, reason: str):
|
||||
)
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
parser.addoption(
|
||||
"--pg-version",
|
||||
action="store",
|
||||
type=PgVersion,
|
||||
help="DEPRECATED: Postgres version to use for tests",
|
||||
)
|
||||
|
||||
|
||||
def run_only_on_default_postgres(reason: str):
|
||||
return pytest.mark.skipif(
|
||||
PgVersion(os.environ.get("DEFAULT_PG_VERSION", DEFAULT_VERSION)) is not DEFAULT_VERSION,
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
def pytest_configure(config: Config):
|
||||
if config.getoption("--pg-version"):
|
||||
raise Exception("--pg-version is deprecated, use DEFAULT_PG_VERSION env var instead")
|
||||
|
||||
@@ -7,7 +7,7 @@ easier to see if you have compile errors without scrolling up.
|
||||
You may also need to run `./scripts/pysync`.
|
||||
|
||||
Then run the tests
|
||||
`DEFAULT_PG_VERSION=15 NEON_BIN=./target/release poetry run pytest test_runner/performance`
|
||||
`DEFAULT_PG_VERSION=16 NEON_BIN=./target/release poetry run pytest test_runner/performance`
|
||||
|
||||
Some handy pytest flags for local development:
|
||||
- `-x` tells pytest to stop on first error
|
||||
|
||||
@@ -11,6 +11,6 @@ It supports mounting snapshots using overlayfs, which improves iteration time.
|
||||
Here's a full command line.
|
||||
|
||||
```
|
||||
RUST_BACKTRACE=1 NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release \
|
||||
RUST_BACKTRACE=1 NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=16 BUILD_TYPE=release \
|
||||
./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
|
||||
````
|
||||
|
||||
@@ -14,7 +14,7 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
|
||||
"""
|
||||
Usage:
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=debug NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 INTERACTIVE=true \
|
||||
DEFAULT_PG_VERSION=16 BUILD_TYPE=debug NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 INTERACTIVE=true \
|
||||
./scripts/pytest --timeout 0 test_runner/performance/pageserver/interactive/test_many_small_tenants.py
|
||||
"""
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ from fixtures.workload import Workload
|
||||
#
|
||||
# How to run `test_backward_compatibility` locally:
|
||||
#
|
||||
# export DEFAULT_PG_VERSION=15
|
||||
# export DEFAULT_PG_VERSION=16
|
||||
# export BUILD_TYPE=release
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_SNAPSHOT_DIR=test_output/compatibility_snapshot_pgv${DEFAULT_PG_VERSION}
|
||||
@@ -61,7 +61,7 @@ from fixtures.workload import Workload
|
||||
#
|
||||
# How to run `test_forward_compatibility` locally:
|
||||
#
|
||||
# export DEFAULT_PG_VERSION=15
|
||||
# export DEFAULT_PG_VERSION=16
|
||||
# export BUILD_TYPE=release
|
||||
# export CHECK_ONDISK_DATA_COMPATIBILITY=true
|
||||
# export COMPATIBILITY_NEON_BIN=neon_previous/target/${BUILD_TYPE}
|
||||
|
||||
Reference in New Issue
Block a user