mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Compare commits
13 Commits
test-local
...
skyzh/anon
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
329ad0ad69 | ||
|
|
c06be08106 | ||
|
|
76f8d7ae01 | ||
|
|
82ea0d0c9b | ||
|
|
24398bf060 | ||
|
|
63b3491c1b | ||
|
|
858867c627 | ||
|
|
299cde899b | ||
|
|
4c9835f4a3 | ||
|
|
f3a3eefd26 | ||
|
|
a7c05686cc | ||
|
|
8b47938140 | ||
|
|
35e7d91bc9 |
@@ -353,13 +353,10 @@ COPY compute/patches/pgvector.patch /pgvector.patch
|
||||
# because we build the images on different machines than where we run them.
|
||||
# Pass OPTFLAGS="" to remove it.
|
||||
#
|
||||
# v17 is not supported yet because of upstream issue
|
||||
# https://github.com/pgvector/pgvector/issues/669
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
|
||||
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
|
||||
# vector 0.7.4 supports v17
|
||||
# last release v0.7.4 - Aug 5, 2024
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.4.tar.gz -O pgvector.tar.gz && \
|
||||
echo "0341edf89b1924ae0d552f617e14fb7f8867c0194ed775bcc44fa40288642583 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /pgvector.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -790,6 +787,22 @@ RUN case "${PG_VERSION}" in \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-sudo"
|
||||
# compile pg-neon-sudo extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-sudo-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/skyzh/pg_sudo/archive/refs/heads/main.tar.gz -O pg_sudo.tar.gz && \
|
||||
mkdir pg_sudo-src && cd pg_sudo-src && tar xzf ../pg_sudo.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_neon_sudo.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-embedding-pg-build"
|
||||
@@ -830,11 +843,7 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
# This is an experimental extension, never got to real production.
|
||||
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
|
||||
ENV PATH="/usr/local/pgsql/bin/:$PATH"
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
|
||||
esac && \
|
||||
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/yliang412/postgresql_anonymizer/archive/refs/heads/master.tar.gz -O pg_anon.tar.gz && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
@@ -1087,6 +1096,7 @@ COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-sudo-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
@@ -1275,7 +1285,7 @@ COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
|
||||
#COPY --from=pg-embedding-pg-build /home/nonroot/pg_embedding-src/ /ext-src
|
||||
#COPY --from=wal2json-pg-build /wal2json_2_5.tar.gz /ext-src
|
||||
COPY --from=pg-anon-pg-build /pg_anon.tar.gz /ext-src
|
||||
COPY compute/patches/pg_anon.patch /ext-src
|
||||
COPY --from=pg-sudo-pg-build /pg_sudo.tar.gz /ext-src
|
||||
COPY --from=pg-ivm-build /pg_ivm.tar.gz /ext-src
|
||||
COPY --from=pg-partman-build /pg_partman.tar.gz /ext-src
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
@@ -1298,14 +1308,7 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
esac && \
|
||||
cd /ext-src/pg_hint_plan-src && patch -p1 < /ext-src/pg_hint_plan.patch
|
||||
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
patch -p1 </ext-src/pg_anon.patch
|
||||
RUN case "${PG_VERSION}" in "v17") \
|
||||
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
|
||||
esac && \
|
||||
patch -p1 </ext-src/pg_cron.patch
|
||||
RUN patch -p1 </ext-src/pg_cron.patch
|
||||
ENV PATH=/usr/local/pgsql/bin:$PATH
|
||||
ENV PGHOST=compute
|
||||
ENV PGPORT=55433
|
||||
|
||||
@@ -34,6 +34,7 @@ use nix::sys::signal::{kill, Signal};
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
|
||||
use crate::checker::create_availability_check_data;
|
||||
use crate::installed_extensions::get_installed_extensions_sync;
|
||||
use crate::local_proxy;
|
||||
use crate::logger::inlinify;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -1121,6 +1122,11 @@ impl ComputeNode {
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
|
||||
let connstr = self.connstr.clone();
|
||||
thread::spawn(move || {
|
||||
get_installed_extensions_sync(connstr).context("get_installed_extensions")
|
||||
});
|
||||
}
|
||||
|
||||
let startup_end_time = Utc::now();
|
||||
@@ -1484,28 +1490,6 @@ LIMIT 100",
|
||||
info!("Pageserver config changed");
|
||||
}
|
||||
}
|
||||
|
||||
// Gather info about installed extensions
|
||||
pub fn get_installed_extensions(&self) -> Result<()> {
|
||||
let connstr = self.connstr.clone();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create runtime");
|
||||
let result = rt
|
||||
.block_on(crate::installed_extensions::get_installed_extensions(
|
||||
connstr,
|
||||
))
|
||||
.expect("failed to get installed extensions");
|
||||
|
||||
info!(
|
||||
"{}",
|
||||
serde_json::to_string(&result).expect("failed to serialize extensions list")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn forward_termination_signal() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use compute_api::responses::{InstalledExtension, InstalledExtensions};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use tracing::info;
|
||||
use url::Url;
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -79,3 +80,23 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
// Gather info about installed extensions
|
||||
pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create runtime");
|
||||
let result = rt
|
||||
.block_on(crate::installed_extensions::get_installed_extensions(
|
||||
connstr,
|
||||
))
|
||||
.expect("failed to get installed extensions");
|
||||
|
||||
info!(
|
||||
"[NEON_EXT_STAT] {}",
|
||||
serde_json::to_string(&result).expect("failed to serialize extensions list")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -102,6 +102,7 @@ pub struct ConfigToml {
|
||||
pub ingest_batch_size: u64,
|
||||
pub max_vectored_read_bytes: MaxVectoredReadBytes,
|
||||
pub image_compression: ImageCompressionAlgorithm,
|
||||
pub timeline_offloading: bool,
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
pub l0_flush: Option<crate::models::L0FlushConfig>,
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
@@ -385,6 +386,7 @@ impl Default for ConfigToml {
|
||||
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
|
||||
)),
|
||||
image_compression: (DEFAULT_IMAGE_COMPRESSION),
|
||||
timeline_offloading: false,
|
||||
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: None,
|
||||
virtual_file_io_mode: None,
|
||||
|
||||
@@ -743,8 +743,6 @@ pub struct TimelineInfo {
|
||||
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
|
||||
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
|
||||
// read.
|
||||
/// The last aux file policy being used on this timeline
|
||||
pub last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
pub is_archived: Option<bool>,
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,12 @@ mod simulate_failures;
|
||||
mod support;
|
||||
|
||||
use std::{
|
||||
collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc,
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
num::NonZeroU32,
|
||||
ops::Bound,
|
||||
pin::{pin, Pin},
|
||||
sync::Arc,
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
@@ -28,6 +33,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{stream::Stream, StreamExt};
|
||||
use itertools::Itertools as _;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -261,7 +267,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Listing, DownloadError> {
|
||||
let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel));
|
||||
let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
|
||||
let mut combined = stream.next().await.expect("At least one item required")?;
|
||||
while let Some(list) = stream.next().await {
|
||||
let list = list?;
|
||||
@@ -324,6 +330,35 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Deletes all objects matching the given prefix.
|
||||
///
|
||||
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
|
||||
/// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
|
||||
///
|
||||
/// If the operation fails because of timeout or cancellation, the root cause of the error will
|
||||
/// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
|
||||
/// through.
|
||||
async fn delete_prefix(
|
||||
&self,
|
||||
prefix: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut stream =
|
||||
pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
|
||||
while let Some(result) = stream.next().await {
|
||||
let keys = match result {
|
||||
Ok(listing) if listing.keys.is_empty() => continue,
|
||||
Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
|
||||
Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
|
||||
Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
tracing::info!("Deleting {} keys from remote storage", keys.len());
|
||||
self.delete_objects(&keys, cancel).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Copy a remote object inside a bucket from one path to another.
|
||||
async fn copy(
|
||||
&self,
|
||||
@@ -488,6 +523,20 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::delete_prefix`]
|
||||
pub async fn delete_prefix(
|
||||
&self,
|
||||
prefix: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
|
||||
Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
|
||||
Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
|
||||
Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`RemoteStorage::copy`]
|
||||
pub async fn copy_object(
|
||||
&self,
|
||||
|
||||
@@ -199,6 +199,138 @@ async fn list_no_delimiter_works(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"),
|
||||
/// but only with NoDelimiter.
|
||||
#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
|
||||
#[tokio::test]
|
||||
async fn list_partial_prefix(
|
||||
ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx = match ctx {
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
|
||||
anyhow::bail!("S3 init failed: {e:?}")
|
||||
}
|
||||
};
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let test_client = Arc::clone(&ctx.enabled.client);
|
||||
|
||||
// Prefix "fold" should match all "folder{i}" directories with NoDelimiter.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("fold")?),
|
||||
ListingMode::NoDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert_eq!(&objects, &ctx.remote_blobs);
|
||||
|
||||
// Prefix "fold" matches nothing with WithDelimiter.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("fold")?),
|
||||
ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert!(objects.is_empty());
|
||||
|
||||
// Prefix "" matches everything.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("")?),
|
||||
ListingMode::NoDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert_eq!(&objects, &ctx.remote_blobs);
|
||||
|
||||
// Prefix "" matches nothing with WithDelimiter.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("")?),
|
||||
ListingMode::WithDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert!(objects.is_empty());
|
||||
|
||||
// Prefix "foo" matches nothing.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("foo")?),
|
||||
ListingMode::NoDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert!(objects.is_empty());
|
||||
|
||||
// Prefix "folder2/blob" matches.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("folder2/blob")?),
|
||||
ListingMode::NoDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
let expect: HashSet<_> = ctx
|
||||
.remote_blobs
|
||||
.iter()
|
||||
.filter(|o| o.get_path().starts_with("folder2"))
|
||||
.cloned()
|
||||
.collect();
|
||||
assert_eq!(&objects, &expect);
|
||||
|
||||
// Prefix "folder2/foo" matches nothing.
|
||||
let objects: HashSet<_> = test_client
|
||||
.list(
|
||||
Some(&RemotePath::from_string("folder2/foo")?),
|
||||
ListingMode::NoDelimiter,
|
||||
None,
|
||||
&cancel,
|
||||
)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert!(objects.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
|
||||
@@ -265,6 +397,80 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that delete_prefix() will delete all objects matching a prefix, including
|
||||
/// partial prefixes (i.e. "/foo" matches "/foobar").
|
||||
#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
|
||||
#[tokio::test]
|
||||
async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
|
||||
let ctx = match ctx {
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
|
||||
MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
|
||||
anyhow::bail!("S3 init failed: {e:?}")
|
||||
}
|
||||
};
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let test_client = Arc::clone(&ctx.enabled.client);
|
||||
|
||||
/// Asserts that the S3 listing matches the given paths.
|
||||
macro_rules! assert_list {
|
||||
($expect:expr) => {{
|
||||
let listing = test_client
|
||||
.list(None, ListingMode::NoDelimiter, None, &cancel)
|
||||
.await?
|
||||
.keys
|
||||
.into_iter()
|
||||
.map(|o| o.key)
|
||||
.collect();
|
||||
assert_eq!($expect, listing);
|
||||
}};
|
||||
}
|
||||
|
||||
// We start with the full set of uploaded files.
|
||||
let mut expect = ctx.remote_blobs.clone();
|
||||
|
||||
// Deleting a non-existing prefix should do nothing.
|
||||
test_client
|
||||
.delete_prefix(&RemotePath::from_string("xyz")?, &cancel)
|
||||
.await?;
|
||||
assert_list!(expect);
|
||||
|
||||
// Prefixes are case-sensitive.
|
||||
test_client
|
||||
.delete_prefix(&RemotePath::from_string("Folder")?, &cancel)
|
||||
.await?;
|
||||
assert_list!(expect);
|
||||
|
||||
// Deleting a path which overlaps with an existing object should do nothing. We pick the first
|
||||
// path in the set as our common prefix.
|
||||
let path = expect.iter().next().expect("empty set").clone().join("xyz");
|
||||
test_client.delete_prefix(&path, &cancel).await?;
|
||||
assert_list!(expect);
|
||||
|
||||
// Deleting an exact path should work. We pick the first path in the set.
|
||||
let path = expect.iter().next().expect("empty set").clone();
|
||||
test_client.delete_prefix(&path, &cancel).await?;
|
||||
expect.remove(&path);
|
||||
assert_list!(expect);
|
||||
|
||||
// Deleting a prefix should delete all matching objects.
|
||||
test_client
|
||||
.delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
|
||||
.await?;
|
||||
expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
|
||||
assert_list!(expect);
|
||||
|
||||
// Deleting a common prefix should delete all objects.
|
||||
test_client
|
||||
.delete_prefix(&RemotePath::from_string("fold")?, &cancel)
|
||||
.await?;
|
||||
expect.clear();
|
||||
assert_list!(expect);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
|
||||
|
||||
@@ -164,6 +164,9 @@ pub struct PageServerConf {
|
||||
|
||||
pub image_compression: ImageCompressionAlgorithm,
|
||||
|
||||
/// Whether to offload archived timelines automatically
|
||||
pub timeline_offloading: bool,
|
||||
|
||||
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
|
||||
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
|
||||
/// of ephemeral data.
|
||||
@@ -321,6 +324,7 @@ impl PageServerConf {
|
||||
ingest_batch_size,
|
||||
max_vectored_read_bytes,
|
||||
image_compression,
|
||||
timeline_offloading,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
l0_flush,
|
||||
virtual_file_io_mode,
|
||||
@@ -364,6 +368,7 @@ impl PageServerConf {
|
||||
ingest_batch_size,
|
||||
max_vectored_read_bytes,
|
||||
image_compression,
|
||||
timeline_offloading,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -1218,16 +1218,7 @@ mod filesystem_level_usage {
|
||||
let stat = Statvfs::get(tenants_dir, mock_config)
|
||||
.context("statvfs failed, presumably directory got unlinked")?;
|
||||
|
||||
// https://unix.stackexchange.com/a/703650
|
||||
let blocksize = if stat.fragment_size() > 0 {
|
||||
stat.fragment_size()
|
||||
} else {
|
||||
stat.block_size()
|
||||
};
|
||||
|
||||
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
|
||||
let avail_bytes = stat.blocks_available() * blocksize;
|
||||
let total_bytes = stat.blocks() * blocksize;
|
||||
let (avail_bytes, total_bytes) = stat.get_avail_total_bytes();
|
||||
|
||||
Ok(Usage {
|
||||
config,
|
||||
|
||||
@@ -18,7 +18,6 @@ use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
|
||||
use pageserver_api::models::IngestAuxFilesRequest;
|
||||
use pageserver_api::models::ListAuxFilesRequest;
|
||||
@@ -474,8 +473,6 @@ async fn build_timeline_info_common(
|
||||
is_archived: Some(is_archived),
|
||||
|
||||
walreceiver_status,
|
||||
|
||||
last_aux_file_policy: timeline.last_aux_file_policy.load(),
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
@@ -2254,7 +2251,7 @@ async fn tenant_scan_remote_handler(
|
||||
%timeline_id))
|
||||
.await
|
||||
{
|
||||
Ok((index_part, index_generation)) => {
|
||||
Ok((index_part, index_generation, _index_mtime)) => {
|
||||
tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)",
|
||||
index_part.layer_metadata.len(), index_part.metadata.disk_consistent_lsn());
|
||||
generation = std::cmp::max(generation, index_generation);
|
||||
@@ -2399,31 +2396,6 @@ async fn post_tracing_event_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn force_aux_policy_switch_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&r, None)?;
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
|
||||
let policy: AuxFilePolicy = json_request(&mut r).await?;
|
||||
|
||||
let state = get_state(&r);
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
timeline
|
||||
.do_switch_aux_policy(policy)
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn put_io_engine_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3136,10 +3108,6 @@ pub fn make_router(
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|
||||
|r| api_handler(r, force_aux_policy_switch_handler),
|
||||
)
|
||||
.get("/v1/utilization", |r| api_handler(r, get_utilization))
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|
||||
|
||||
@@ -22,7 +22,6 @@ use pageserver_api::key::{
|
||||
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
};
|
||||
use pageserver_api::keyspace::SparseKeySpace;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
@@ -33,7 +32,7 @@ use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
@@ -677,21 +676,6 @@ impl Timeline {
|
||||
self.get(CHECKPOINT_KEY, lsn, ctx).await
|
||||
}
|
||||
|
||||
async fn list_aux_files_v1(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
|
||||
match self.get(AUX_FILES_KEY, lsn, ctx).await {
|
||||
Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
|
||||
Err(e) => {
|
||||
// This is expected: historical databases do not have the key.
|
||||
debug!("Failed to get info about AUX files: {}", e);
|
||||
Ok(HashMap::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_aux_files_v2(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
@@ -722,10 +706,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PageReconstructError> {
|
||||
let current_policy = self.last_aux_file_policy.load();
|
||||
if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
|
||||
self.list_aux_files_v2(lsn, ctx).await?;
|
||||
}
|
||||
self.list_aux_files_v2(lsn, ctx).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -734,51 +715,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
|
||||
let current_policy = self.last_aux_file_policy.load();
|
||||
match current_policy {
|
||||
Some(AuxFilePolicy::V1) => {
|
||||
let res = self.list_aux_files_v1(lsn, ctx).await?;
|
||||
let empty_str = if res.is_empty() { ", empty" } else { "" };
|
||||
warn!(
|
||||
"this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
|
||||
);
|
||||
Ok(res)
|
||||
}
|
||||
None => {
|
||||
let res = self.list_aux_files_v1(lsn, ctx).await?;
|
||||
if !res.is_empty() {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
|
||||
Some(AuxFilePolicy::CrossValidation) => {
|
||||
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
|
||||
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
|
||||
match (v1_result, v2_result) {
|
||||
(Ok(v1), Ok(v2)) => {
|
||||
if v1 != v2 {
|
||||
tracing::error!(
|
||||
"unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
|
||||
);
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"unmatched aux file v1 v2 result"
|
||||
)));
|
||||
}
|
||||
Ok(v1)
|
||||
}
|
||||
(Ok(_), Err(v2)) => {
|
||||
tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
|
||||
Err(v2)
|
||||
}
|
||||
(Err(v1), Ok(_)) => {
|
||||
tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
|
||||
Err(v1)
|
||||
}
|
||||
(Err(_), Err(v2)) => Err(v2),
|
||||
}
|
||||
}
|
||||
}
|
||||
self.list_aux_files_v2(lsn, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_replorigins(
|
||||
@@ -954,9 +891,6 @@ impl Timeline {
|
||||
|
||||
result.add_key(CONTROLFILE_KEY);
|
||||
result.add_key(CHECKPOINT_KEY);
|
||||
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
|
||||
result.add_key(AUX_FILES_KEY);
|
||||
}
|
||||
|
||||
// Add extra keyspaces in the test cases. Some test cases write keys into the storage without
|
||||
// creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
|
||||
@@ -1166,9 +1100,6 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_directory_entries.push((DirectoryKind::Db, 0));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory
|
||||
self.init_aux_dir()?;
|
||||
|
||||
let buf = if self.tline.pg_version >= 17 {
|
||||
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
|
||||
xids: HashSet::new(),
|
||||
@@ -1347,9 +1278,6 @@ impl<'a> DatadirModification<'a> {
|
||||
// 'true', now write the updated 'dbdirs' map back.
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory as well
|
||||
self.init_aux_dir()?;
|
||||
}
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
@@ -1726,200 +1654,60 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
|
||||
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
|
||||
return Ok(());
|
||||
}
|
||||
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, 0));
|
||||
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn put_file(
|
||||
&mut self,
|
||||
path: &str,
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let switch_policy = self.tline.get_switch_aux_file_policy();
|
||||
|
||||
let policy = {
|
||||
let current_policy = self.tline.last_aux_file_policy.load();
|
||||
// Allowed switch path:
|
||||
// * no aux files -> v1/v2/cross-validation
|
||||
// * cross-validation->v2
|
||||
|
||||
let current_policy = if current_policy.is_none() {
|
||||
// This path will only be hit once per tenant: we will decide the final policy in this code block.
|
||||
// The next call to `put_file` will always have `last_aux_file_policy != None`.
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
|
||||
if aux_files_key_v1.is_empty() {
|
||||
None
|
||||
} else {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
|
||||
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
|
||||
Some(AuxFilePolicy::V1)
|
||||
}
|
||||
} else {
|
||||
current_policy
|
||||
};
|
||||
|
||||
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
|
||||
self.tline.do_switch_aux_policy(switch_policy)?;
|
||||
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
|
||||
switch_policy
|
||||
} else {
|
||||
// This branch handles non-valid migration path, and the case that switch_policy == current_policy.
|
||||
// And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
|
||||
current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
|
||||
}
|
||||
let key = aux_file::encode_aux_file_key(path);
|
||||
// retrieve the key from the engine
|
||||
let old_val = match self.get(key, ctx).await {
|
||||
Ok(val) => Some(val),
|
||||
Err(PageReconstructError::MissingKey(_)) => None,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
|
||||
let key = aux_file::encode_aux_file_key(path);
|
||||
// retrieve the key from the engine
|
||||
let old_val = match self.get(key, ctx).await {
|
||||
Ok(val) => Some(val),
|
||||
Err(PageReconstructError::MissingKey(_)) => None,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
|
||||
aux_file::decode_file_value(old_val)?
|
||||
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
|
||||
aux_file::decode_file_value(old_val)?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let mut other_files = Vec::with_capacity(files.len());
|
||||
let mut modifying_file = None;
|
||||
for file @ (p, content) in files {
|
||||
if path == p {
|
||||
assert!(
|
||||
modifying_file.is_none(),
|
||||
"duplicated entries found for {}",
|
||||
path
|
||||
);
|
||||
modifying_file = Some(content);
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let mut other_files = Vec::with_capacity(files.len());
|
||||
let mut modifying_file = None;
|
||||
for file @ (p, content) in files {
|
||||
if path == p {
|
||||
assert!(
|
||||
modifying_file.is_none(),
|
||||
"duplicated entries found for {}",
|
||||
path
|
||||
);
|
||||
modifying_file = Some(content);
|
||||
} else {
|
||||
other_files.push(file);
|
||||
}
|
||||
other_files.push(file);
|
||||
}
|
||||
let mut new_files = other_files;
|
||||
match (modifying_file, content.is_empty()) {
|
||||
(Some(old_content), false) => {
|
||||
self.tline
|
||||
.aux_file_size_estimator
|
||||
.on_update(old_content.len(), content.len());
|
||||
new_files.push((path, content));
|
||||
}
|
||||
(Some(old_content), true) => {
|
||||
self.tline
|
||||
.aux_file_size_estimator
|
||||
.on_remove(old_content.len());
|
||||
// not adding the file key to the final `new_files` vec.
|
||||
}
|
||||
(None, false) => {
|
||||
self.tline.aux_file_size_estimator.on_add(content.len());
|
||||
new_files.push((path, content));
|
||||
}
|
||||
(None, true) => warn!("removing non-existing aux file: {}", path),
|
||||
}
|
||||
let new_val = aux_file::encode_file_value(&new_files)?;
|
||||
self.put(key, Value::Image(new_val.into()));
|
||||
}
|
||||
|
||||
if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
|
||||
let file_path = path.to_string();
|
||||
let content = if content.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Bytes::copy_from_slice(content))
|
||||
};
|
||||
|
||||
let n_files;
|
||||
let mut aux_files = self.tline.aux_files.lock().await;
|
||||
if let Some(mut dir) = aux_files.dir.take() {
|
||||
// We already updated aux files in `self`: emit a delta and update our latest value.
|
||||
dir.upsert(file_path.clone(), content.clone());
|
||||
n_files = dir.files.len();
|
||||
if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
aux_files.n_deltas = 0;
|
||||
} else {
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
|
||||
);
|
||||
aux_files.n_deltas += 1;
|
||||
}
|
||||
aux_files.dir = Some(dir);
|
||||
} else {
|
||||
// Check if the AUX_FILES_KEY is initialized
|
||||
match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(dir_bytes) => {
|
||||
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// Key is already set, we may append a delta
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
dir.upsert(file_path, content);
|
||||
n_files = dir.files.len();
|
||||
aux_files.dir = Some(dir);
|
||||
}
|
||||
Err(
|
||||
e @ (PageReconstructError::Cancelled
|
||||
| PageReconstructError::AncestorLsnTimeout(_)),
|
||||
) => {
|
||||
// Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// reset the map.
|
||||
return Err(e.into());
|
||||
}
|
||||
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
|
||||
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
|
||||
// the same for now, though in theory, we should only match the `MissingKey` variant.
|
||||
Err(
|
||||
e @ (PageReconstructError::Other(_)
|
||||
| PageReconstructError::WalRedo(_)
|
||||
| PageReconstructError::MissingKey(_)),
|
||||
) => {
|
||||
// Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
if !matches!(e, PageReconstructError::MissingKey(_)) {
|
||||
let e = utils::error::report_compact_sources(&e);
|
||||
tracing::warn!("treating error as if it was a missing key: {}", e);
|
||||
}
|
||||
|
||||
let mut dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
dir.upsert(file_path, content);
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
n_files = 1;
|
||||
aux_files.dir = Some(dir);
|
||||
}
|
||||
}
|
||||
let mut new_files = other_files;
|
||||
match (modifying_file, content.is_empty()) {
|
||||
(Some(old_content), false) => {
|
||||
self.tline
|
||||
.aux_file_size_estimator
|
||||
.on_update(old_content.len(), content.len());
|
||||
new_files.push((path, content));
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, n_files));
|
||||
(Some(old_content), true) => {
|
||||
self.tline
|
||||
.aux_file_size_estimator
|
||||
.on_remove(old_content.len());
|
||||
// not adding the file key to the final `new_files` vec.
|
||||
}
|
||||
(None, false) => {
|
||||
self.tline.aux_file_size_estimator.on_add(content.len());
|
||||
new_files.push((path, content));
|
||||
}
|
||||
(None, true) => warn!("removing non-existing aux file: {}", path),
|
||||
}
|
||||
let new_val = aux_file::encode_file_value(&new_files)?;
|
||||
self.put(key, Value::Image(new_val.into()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2089,12 +1877,6 @@ impl<'a> DatadirModification<'a> {
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
/// Only used during unit tests, force putting a key into the modification.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
|
||||
self.put(key, val);
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
if Self::is_data_key(&key) {
|
||||
self.put_data(key.to_compact(), val)
|
||||
@@ -2212,21 +1994,6 @@ struct RelDirectory {
|
||||
rels: HashSet<(Oid, u8)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
|
||||
pub(crate) struct AuxFilesDirectory {
|
||||
pub(crate) files: HashMap<String, Bytes>,
|
||||
}
|
||||
|
||||
impl AuxFilesDirectory {
|
||||
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
|
||||
if let Some(value) = value {
|
||||
self.files.insert(key, value);
|
||||
} else {
|
||||
self.files.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct RelSizeEntry {
|
||||
nblocks: u32,
|
||||
|
||||
@@ -53,6 +53,22 @@ impl Statvfs {
|
||||
Statvfs::Mock(stat) => stat.block_size,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the available and total bytes on the filesystem.
|
||||
pub fn get_avail_total_bytes(&self) -> (u64, u64) {
|
||||
// https://unix.stackexchange.com/a/703650
|
||||
let blocksize = if self.fragment_size() > 0 {
|
||||
self.fragment_size()
|
||||
} else {
|
||||
self.block_size()
|
||||
};
|
||||
|
||||
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
|
||||
let avail_bytes = self.blocks_available() * blocksize;
|
||||
let total_bytes = self.blocks() * blocksize;
|
||||
|
||||
(avail_bytes, total_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
pub mod mock {
|
||||
|
||||
@@ -20,7 +20,6 @@ use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::TimelineArchivalState;
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -800,7 +799,6 @@ impl Tenant {
|
||||
index_part: Option<IndexPart>,
|
||||
metadata: TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let tenant_id = self.tenant_shard_id;
|
||||
@@ -811,10 +809,6 @@ impl Tenant {
|
||||
ancestor.clone(),
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
// This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`,
|
||||
// there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence.
|
||||
// Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2.
|
||||
last_aux_file_policy,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
@@ -829,10 +823,6 @@ impl Tenant {
|
||||
|
||||
if let Some(index_part) = index_part.as_ref() {
|
||||
timeline.remote_client.init_upload_queue(index_part)?;
|
||||
|
||||
timeline
|
||||
.last_aux_file_policy
|
||||
.store(index_part.last_aux_file_policy());
|
||||
} else {
|
||||
// No data on the remote storage, but we have local metadata file. We can end up
|
||||
// here with timeline_create being interrupted before finishing index part upload.
|
||||
@@ -1403,15 +1393,12 @@ impl Tenant {
|
||||
None
|
||||
};
|
||||
|
||||
let last_aux_file_policy = index_part.last_aux_file_policy();
|
||||
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
resources,
|
||||
Some(index_part),
|
||||
remote_metadata,
|
||||
ancestor,
|
||||
last_aux_file_policy,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1824,7 +1811,6 @@ impl Tenant {
|
||||
create_guard,
|
||||
initdb_lsn,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -2187,7 +2173,8 @@ impl Tenant {
|
||||
.iter()
|
||||
.any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id))
|
||||
};
|
||||
let can_offload = can_offload && has_no_unoffloaded_children;
|
||||
let can_offload =
|
||||
can_offload && has_no_unoffloaded_children && self.conf.timeline_offloading;
|
||||
if (is_active, can_offload) == (false, false) {
|
||||
None
|
||||
} else {
|
||||
@@ -3031,7 +3018,6 @@ impl Tenant {
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
resources: TimelineResources,
|
||||
cause: CreateTimelineCause,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let state = match cause {
|
||||
CreateTimelineCause::Load => {
|
||||
@@ -3060,7 +3046,6 @@ impl Tenant {
|
||||
resources,
|
||||
pg_version,
|
||||
state,
|
||||
last_aux_file_policy,
|
||||
self.attach_wal_lag_cooldown.clone(),
|
||||
self.cancel.child_token(),
|
||||
);
|
||||
@@ -3719,7 +3704,6 @@ impl Tenant {
|
||||
timeline_create_guard,
|
||||
start_lsn + 1,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
src_timeline.last_aux_file_policy.load(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -3913,7 +3897,6 @@ impl Tenant {
|
||||
timeline_create_guard,
|
||||
pgdata_lsn,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -3985,7 +3968,6 @@ impl Tenant {
|
||||
create_guard: TimelineCreateGuard<'a>,
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
) -> anyhow::Result<UninitializedTimeline<'a>> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
@@ -4001,7 +3983,6 @@ impl Tenant {
|
||||
ancestor,
|
||||
resources,
|
||||
CreateTimelineCause::Load,
|
||||
last_aux_file_policy,
|
||||
)
|
||||
.context("Failed to create timeline data structure")?;
|
||||
|
||||
@@ -4599,7 +4580,6 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::keyspace::KeySpaceAccum;
|
||||
use crate::pgdatadir_mapping::AuxFilesDirectory;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
@@ -4608,7 +4588,7 @@ mod tests {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hex_literal::hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
|
||||
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
use rand::{thread_rng, Rng};
|
||||
@@ -4617,7 +4597,6 @@ mod tests {
|
||||
use tests::timeline::{GetVectoredError, ShutdownMode};
|
||||
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
|
||||
use timeline::{DeltaLayerTestDesc, GcInfo};
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::TenantId;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
@@ -6421,16 +6400,9 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_copies_dirty_aux_file_flag() {
|
||||
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag")
|
||||
.await
|
||||
.unwrap();
|
||||
async fn test_aux_file_e2e() {
|
||||
let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap();
|
||||
|
||||
// the default aux file policy to switch is v2 if not set by the admins
|
||||
assert_eq!(
|
||||
harness.tenant_conf.switch_aux_file_policy,
|
||||
AuxFilePolicy::default_tenant_config()
|
||||
);
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let mut lsn = Lsn(0x08);
|
||||
@@ -6440,9 +6412,6 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// no aux file is written at this point, so the persistent flag should be unset
|
||||
assert_eq!(tline.last_aux_file_policy.load(), None);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
@@ -6453,30 +6422,6 @@ mod tests {
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
// there is no tenant manager to pass the configuration through, so lets mimic it
|
||||
tenant.set_new_location_config(
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt {
|
||||
switch_aux_file_policy: Some(AuxFilePolicy::V2),
|
||||
..Default::default()
|
||||
},
|
||||
tenant.generation,
|
||||
&pageserver_api::models::ShardParameters::default(),
|
||||
))
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline.get_switch_aux_file_policy(),
|
||||
AuxFilePolicy::V2,
|
||||
"wanted state has been updated"
|
||||
);
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V2),
|
||||
"aux file is written with switch_aux_file_policy unset (which is v2), so we should use v2 there"
|
||||
);
|
||||
|
||||
// we can read everything from the storage
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
@@ -6494,12 +6439,6 @@ mod tests {
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V2),
|
||||
"keep v2 storage format when new files are written"
|
||||
);
|
||||
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test2"),
|
||||
@@ -6511,321 +6450,9 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// child copies the last flag even if that is not on remote storage yet
|
||||
assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2);
|
||||
assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
|
||||
|
||||
let files = child.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(files.get("pg_logical/mappings/test1"), None);
|
||||
assert_eq!(files.get("pg_logical/mappings/test2"), None);
|
||||
|
||||
// even if we crash here without flushing parent timeline with it's new
|
||||
// last_aux_file_policy we are safe, because child was never meant to access ancestor's
|
||||
// files. the ancestor can even switch back to V1 because of a migration safely.
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn aux_file_policy_switch() {
|
||||
let mut harness = TenantHarness::create("aux_file_policy_switch")
|
||||
.await
|
||||
.unwrap();
|
||||
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let mut lsn = Lsn(0x08);
|
||||
|
||||
let tline: Arc<Timeline> = tenant
|
||||
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
None,
|
||||
"no aux file is written so it should be unset"
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test1", b"first", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
// there is no tenant manager to pass the configuration through, so lets mimic it
|
||||
tenant.set_new_location_config(
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt {
|
||||
switch_aux_file_policy: Some(AuxFilePolicy::V2),
|
||||
..Default::default()
|
||||
},
|
||||
tenant.generation,
|
||||
&pageserver_api::models::ShardParameters::default(),
|
||||
))
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline.get_switch_aux_file_policy(),
|
||||
AuxFilePolicy::V2,
|
||||
"wanted state has been updated"
|
||||
);
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::CrossValidation),
|
||||
"dirty index_part.json reflected state is yet to be updated"
|
||||
);
|
||||
|
||||
// we can still read the auxfile v1 before we ingest anything new
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test1"),
|
||||
Some(&bytes::Bytes::from_static(b"first"))
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test2", b"second", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V2),
|
||||
"ingesting a file should apply the wanted switch state when applicable"
|
||||
);
|
||||
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test1"),
|
||||
Some(&bytes::Bytes::from_static(b"first")),
|
||||
"cross validation writes to both v1 and v2 so this should be available in v2"
|
||||
);
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test2"),
|
||||
Some(&bytes::Bytes::from_static(b"second"))
|
||||
);
|
||||
|
||||
// mimic again by trying to flip it from V2 to V1 (not switched to while ingesting a file)
|
||||
tenant.set_new_location_config(
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt {
|
||||
switch_aux_file_policy: Some(AuxFilePolicy::V1),
|
||||
..Default::default()
|
||||
},
|
||||
tenant.generation,
|
||||
&pageserver_api::models::ShardParameters::default(),
|
||||
))
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test2", b"third", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
tline.get_switch_aux_file_policy(),
|
||||
AuxFilePolicy::V1,
|
||||
"wanted state has been updated again, even if invalid request"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V2),
|
||||
"ingesting a file should apply the wanted switch state when applicable"
|
||||
);
|
||||
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test1"),
|
||||
Some(&bytes::Bytes::from_static(b"first"))
|
||||
);
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test2"),
|
||||
Some(&bytes::Bytes::from_static(b"third"))
|
||||
);
|
||||
|
||||
// mimic again by trying to flip it from from V1 to V2 (not switched to while ingesting a file)
|
||||
tenant.set_new_location_config(
|
||||
AttachedTenantConf::try_from(LocationConf::attached_single(
|
||||
TenantConfOpt {
|
||||
switch_aux_file_policy: Some(AuxFilePolicy::V2),
|
||||
..Default::default()
|
||||
},
|
||||
tenant.generation,
|
||||
&pageserver_api::models::ShardParameters::default(),
|
||||
))
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test3", b"last", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(tline.get_switch_aux_file_policy(), AuxFilePolicy::V2);
|
||||
|
||||
assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
|
||||
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test1"),
|
||||
Some(&bytes::Bytes::from_static(b"first"))
|
||||
);
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test2"),
|
||||
Some(&bytes::Bytes::from_static(b"third"))
|
||||
);
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test3"),
|
||||
Some(&bytes::Bytes::from_static(b"last"))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn aux_file_policy_force_switch() {
|
||||
let mut harness = TenantHarness::create("aux_file_policy_force_switch")
|
||||
.await
|
||||
.unwrap();
|
||||
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let mut lsn = Lsn(0x08);
|
||||
|
||||
let tline: Arc<Timeline> = tenant
|
||||
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
None,
|
||||
"no aux file is written so it should be unset"
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test1", b"first", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V2),
|
||||
"dirty index_part.json reflected state is yet to be updated"
|
||||
);
|
||||
|
||||
// lose all data from v1
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(files.get("pg_logical/mappings/test1"), None);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test2", b"second", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
// read data ingested in v2
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test2"),
|
||||
Some(&bytes::Bytes::from_static(b"second"))
|
||||
);
|
||||
// lose all data from v1
|
||||
assert_eq!(files.get("pg_logical/mappings/test1"), None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn aux_file_policy_auto_detect() {
|
||||
let mut harness = TenantHarness::create("aux_file_policy_auto_detect")
|
||||
.await
|
||||
.unwrap();
|
||||
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let mut lsn = Lsn(0x08);
|
||||
|
||||
let tline: Arc<Timeline> = tenant
|
||||
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
None,
|
||||
"no aux file is written so it should be unset"
|
||||
);
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
|
||||
files: vec![(
|
||||
"test_file".to_string(),
|
||||
Bytes::copy_from_slice(b"test_file"),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
})
|
||||
.unwrap();
|
||||
modification.put_for_test(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
{
|
||||
lsn += 8;
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification
|
||||
.put_file("pg_logical/mappings/test1", b"first", &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
modification.commit(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V1),
|
||||
"keep using v1 because there are aux files writting with v1"
|
||||
);
|
||||
|
||||
// we can still read the auxfile v1
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(
|
||||
files.get("pg_logical/mappings/test1"),
|
||||
Some(&bytes::Bytes::from_static(b"first"))
|
||||
);
|
||||
assert_eq!(
|
||||
files.get("test_file"),
|
||||
Some(&bytes::Bytes::from_static(b"test_file"))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -11,6 +11,7 @@ use pageserver_api::shard::{
|
||||
};
|
||||
use pageserver_api::upcall_api::ReAttachResponseTenant;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
@@ -1350,47 +1351,17 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_tenant_remote(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let remote_path = remote_tenant_path(&tenant_shard_id);
|
||||
let mut keys_stream = self.resources.remote_storage.list_streaming(
|
||||
Some(&remote_path),
|
||||
remote_storage::ListingMode::NoDelimiter,
|
||||
None,
|
||||
&self.cancel,
|
||||
);
|
||||
while let Some(chunk) = keys_stream.next().await {
|
||||
let keys = match chunk {
|
||||
Ok(listing) => listing.keys,
|
||||
Err(remote_storage::DownloadError::Cancelled) => {
|
||||
return Err(DeleteTenantError::Cancelled)
|
||||
}
|
||||
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
|
||||
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
|
||||
};
|
||||
|
||||
if keys.is_empty() {
|
||||
tracing::info!("Remote storage already deleted");
|
||||
} else {
|
||||
tracing::info!("Deleting {} keys from remote storage", keys.len());
|
||||
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
|
||||
self.resources
|
||||
.remote_storage
|
||||
.delete_objects(&keys, &self.cancel)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// If a tenant is attached, detach it. Then remove its data from remote storage.
|
||||
///
|
||||
/// A tenant is considered deleted once it is gone from remote storage. It is the caller's
|
||||
/// responsibility to avoid trying to attach the tenant again or use it any way once deletion
|
||||
/// has started: this operation is not atomic, and must be retried until it succeeds.
|
||||
///
|
||||
/// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove
|
||||
/// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage
|
||||
/// controller uses this to purge all remote tenant data, including any stale parent shards that
|
||||
/// may remain after splits. Ideally, this special case would be handled elsewhere. See:
|
||||
/// <https://github.com/neondatabase/neon/pull/9394>.
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -1442,25 +1413,29 @@ impl TenantManager {
|
||||
// in 500 responses to delete requests.
|
||||
// - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
|
||||
// 503/retry, rather than kicking off a wasteful concurrent deletion.
|
||||
match backoff::retry(
|
||||
|| async move { self.delete_tenant_remote(tenant_shard_id).await },
|
||||
|e| match e {
|
||||
DeleteTenantError::Cancelled => true,
|
||||
DeleteTenantError::SlotError(_) => {
|
||||
unreachable!("Remote deletion doesn't touch slots")
|
||||
}
|
||||
_ => false,
|
||||
// NB: this also deletes partial prefixes, i.e. a <tenant_id> path will delete all
|
||||
// <tenant_id>_<shard_id>/* objects. See method comment for why.
|
||||
backoff::retry(
|
||||
|| async move {
|
||||
self.resources
|
||||
.remote_storage
|
||||
.delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel)
|
||||
.await
|
||||
},
|
||||
|_| false, // backoff::retry handles cancellation
|
||||
1,
|
||||
3,
|
||||
&format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(r) => r,
|
||||
None => Err(DeleteTenantError::Cancelled),
|
||||
}
|
||||
.unwrap_or(Err(TimeoutOrCancel::Cancel.into()))
|
||||
.map_err(|err| {
|
||||
if TimeoutOrCancel::caused_by_cancel(&err) {
|
||||
return DeleteTenantError::Cancelled;
|
||||
}
|
||||
DeleteTenantError::Other(err)
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
|
||||
|
||||
@@ -187,7 +187,7 @@ use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
pub(crate) use download::download_initdb_tar_zst;
|
||||
use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
|
||||
use pageserver_api::models::TimelineArchivalState;
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -505,7 +505,7 @@ impl RemoteTimelineClient {
|
||||
},
|
||||
);
|
||||
|
||||
let (index_part, _index_generation) = download::download_index_part(
|
||||
let (index_part, index_generation, index_last_modified) = download::download_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_shard_id,
|
||||
&self.timeline_id,
|
||||
@@ -519,6 +519,49 @@ impl RemoteTimelineClient {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
|
||||
// old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
|
||||
// in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
|
||||
// when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
|
||||
// also a newer index available, that is surprising.
|
||||
const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
|
||||
let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
|
||||
if e.duration() > Duration::from_secs(5) {
|
||||
// We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
|
||||
// timestamp, it is common to be out by at least 1 second.
|
||||
tracing::warn!("Index has modification time in the future: {e}");
|
||||
}
|
||||
Duration::ZERO
|
||||
});
|
||||
if index_age > INDEX_AGE_CHECKS_THRESHOLD {
|
||||
tracing::info!(
|
||||
?index_generation,
|
||||
age = index_age.as_secs_f64(),
|
||||
"Loaded an old index, checking for other indices..."
|
||||
);
|
||||
|
||||
// Find the highest-generation index
|
||||
let (_latest_index_part, latest_index_generation, latest_index_mtime) =
|
||||
download::download_index_part(
|
||||
&self.storage_impl,
|
||||
&self.tenant_shard_id,
|
||||
&self.timeline_id,
|
||||
Generation::MAX,
|
||||
cancel,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if latest_index_generation > index_generation {
|
||||
// Unexpected! Why are we loading such an old index if a more recent one exists?
|
||||
tracing::warn!(
|
||||
?index_generation,
|
||||
?latest_index_generation,
|
||||
?latest_index_mtime,
|
||||
"Found a newer index while loading an old one"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if index_part.deleted_at.is_some() {
|
||||
Ok(MaybeDeletedIndexPart::Deleted(index_part))
|
||||
} else {
|
||||
@@ -628,18 +671,6 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
|
||||
pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
|
||||
self: &Arc<Self>,
|
||||
last_aux_file_policy: Option<AuxFilePolicy>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
upload_queue.dirty.last_aux_file_policy = last_aux_file_policy;
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
|
||||
///
|
||||
/// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -343,10 +344,10 @@ async fn do_download_index_part(
|
||||
timeline_id: &TimelineId,
|
||||
index_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation), DownloadError> {
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry_forever(
|
||||
let (index_part_bytes, index_part_mtime) = download_retry_forever(
|
||||
|| async {
|
||||
let download = storage
|
||||
.download(&remote_path, &DownloadOpts::default(), cancel)
|
||||
@@ -359,7 +360,7 @@ async fn do_download_index_part(
|
||||
|
||||
tokio::io::copy_buf(&mut stream, &mut bytes).await?;
|
||||
|
||||
Ok(bytes)
|
||||
Ok((bytes, download.last_modified))
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
@@ -370,7 +371,7 @@ async fn do_download_index_part(
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((index_part, index_generation))
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
/// index_part.json objects are suffixed with a generation number, so we cannot
|
||||
@@ -385,7 +386,7 @@ pub(crate) async fn download_index_part(
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation), DownloadError> {
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if my_generation.is_none() {
|
||||
|
||||
@@ -133,10 +133,6 @@ impl IndexPart {
|
||||
pub(crate) fn example() -> Self {
|
||||
Self::empty(TimelineMetadata::example())
|
||||
}
|
||||
|
||||
pub(crate) fn last_aux_file_policy(&self) -> Option<AuxFilePolicy> {
|
||||
self.last_aux_file_policy
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata gathered for each of the layer files.
|
||||
|
||||
@@ -341,6 +341,10 @@ impl Layer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
self.0.needs_download().await
|
||||
}
|
||||
|
||||
/// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
|
||||
/// while the guard exists.
|
||||
///
|
||||
|
||||
@@ -28,9 +28,9 @@ use pageserver_api::{
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, CompactionAlgorithmSettings,
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
|
||||
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
|
||||
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
|
||||
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
|
||||
LsnLease, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
@@ -98,12 +98,12 @@ use crate::{
|
||||
use crate::{
|
||||
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
|
||||
use crate::{
|
||||
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
|
||||
pgdatadir_mapping::DirectoryKind,
|
||||
virtual_file::{MaybeFatalIo, VirtualFile},
|
||||
};
|
||||
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
|
||||
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -206,11 +206,6 @@ pub struct TimelineResources {
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
pub(crate) struct AuxFilesState {
|
||||
pub(crate) dir: Option<AuxFilesDirectory>,
|
||||
pub(crate) n_deltas: usize,
|
||||
}
|
||||
|
||||
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
||||
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
||||
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
|
||||
@@ -413,15 +408,9 @@ pub struct Timeline {
|
||||
timeline_get_throttle:
|
||||
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
|
||||
|
||||
/// Keep aux directory cache to avoid it's reconstruction on each update
|
||||
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
|
||||
|
||||
/// Size estimator for aux file v2
|
||||
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
|
||||
|
||||
/// Indicate whether aux file v2 storage is enabled.
|
||||
pub(crate) last_aux_file_policy: AtomicAuxFilePolicy,
|
||||
|
||||
/// Some test cases directly place keys into the timeline without actually modifying the directory
|
||||
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
|
||||
/// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense
|
||||
@@ -1565,6 +1554,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Checks if the internal state of the timeline is consistent with it being able to be offloaded.
|
||||
///
|
||||
/// This is neccessary but not sufficient for offloading of the timeline as it might have
|
||||
/// child timelines that are not offloaded yet.
|
||||
pub(crate) fn can_offload(&self) -> bool {
|
||||
@@ -2011,14 +2001,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
|
||||
}
|
||||
|
||||
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.switch_aux_file_policy
|
||||
.unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy)
|
||||
}
|
||||
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -2151,7 +2133,6 @@ impl Timeline {
|
||||
resources: TimelineResources,
|
||||
pg_version: u32,
|
||||
state: TimelineState,
|
||||
aux_file_policy: Option<AuxFilePolicy>,
|
||||
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
@@ -2281,15 +2262,8 @@ impl Timeline {
|
||||
|
||||
timeline_get_throttle: resources.timeline_get_throttle,
|
||||
|
||||
aux_files: tokio::sync::Mutex::new(AuxFilesState {
|
||||
dir: None,
|
||||
n_deltas: 0,
|
||||
}),
|
||||
|
||||
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
|
||||
|
||||
last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy),
|
||||
|
||||
#[cfg(test)]
|
||||
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
|
||||
|
||||
@@ -2300,10 +2274,6 @@ impl Timeline {
|
||||
attach_wal_lag_cooldown,
|
||||
};
|
||||
|
||||
if aux_file_policy == Some(AuxFilePolicy::V1) {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)");
|
||||
}
|
||||
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
|
||||
@@ -4478,14 +4448,6 @@ impl Timeline {
|
||||
) -> Result<(), detach_ancestor::Error> {
|
||||
detach_ancestor::complete(self, tenant, attempt, ctx).await
|
||||
}
|
||||
|
||||
/// Switch aux file policy and schedule upload to the index part.
|
||||
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
|
||||
self.last_aux_file_policy.store(Some(policy));
|
||||
self.remote_client
|
||||
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Timeline {
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
|
||||
@@ -1691,6 +1692,45 @@ impl Timeline {
|
||||
unreachable!("key retention is empty")
|
||||
}
|
||||
|
||||
/// Check how much space is left on the disk
|
||||
async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
|
||||
let tenants_dir = self.conf.tenants_path();
|
||||
|
||||
let stat = Statvfs::get(&tenants_dir, None)
|
||||
.context("statvfs failed, presumably directory got unlinked")?;
|
||||
|
||||
let (avail_bytes, _) = stat.get_avail_total_bytes();
|
||||
|
||||
Ok(avail_bytes)
|
||||
}
|
||||
|
||||
/// Check if the compaction can proceed safely without running out of space. We assume the size
|
||||
/// upper bound of the produced files of a compaction job is the same as all layers involved in
|
||||
/// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
|
||||
/// compaction.
|
||||
async fn check_compaction_space(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
) -> anyhow::Result<()> {
|
||||
let available_space = self.check_available_space().await?;
|
||||
let mut remote_layer_size = 0;
|
||||
let mut all_layer_size = 0;
|
||||
for layer in layer_selection {
|
||||
let needs_download = layer.needs_download().await?;
|
||||
if needs_download.is_some() {
|
||||
remote_layer_size += layer.layer_desc().file_size;
|
||||
}
|
||||
all_layer_size += layer.layer_desc().file_size;
|
||||
}
|
||||
let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
|
||||
if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
|
||||
{
|
||||
return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
|
||||
available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// An experimental compaction building block that combines compaction with garbage collection.
|
||||
///
|
||||
/// The current implementation picks all delta + image layers that are below or intersecting with
|
||||
@@ -1806,6 +1846,8 @@ impl Timeline {
|
||||
lowest_retain_lsn
|
||||
);
|
||||
|
||||
self.check_compaction_space(&layer_selection).await?;
|
||||
|
||||
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
|
||||
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
|
||||
@@ -283,8 +283,6 @@ impl DeleteTimelineFlow {
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
CreateTimelineCause::Delete,
|
||||
// Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace
|
||||
None,
|
||||
)
|
||||
.context("create_timeline_struct")?;
|
||||
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use crate::pgdatadir_mapping::AuxFilesDirectory;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::BytesMut;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -13,7 +12,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
@@ -236,13 +234,9 @@ pub(crate) fn apply_in_neon(
|
||||
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
|
||||
}
|
||||
}
|
||||
NeonWalRecord::AuxFile { file_path, content } => {
|
||||
let mut dir = AuxFilesDirectory::des(page)?;
|
||||
dir.upsert(file_path.clone(), content.clone());
|
||||
|
||||
page.clear();
|
||||
let mut writer = page.writer();
|
||||
dir.ser_into(&mut writer)?;
|
||||
NeonWalRecord::AuxFile { .. } => {
|
||||
// No-op: this record will never be created in aux v2.
|
||||
warn!("AuxFile record should not be created in aux v2");
|
||||
}
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test {
|
||||
@@ -250,6 +244,7 @@ pub(crate) fn apply_in_neon(
|
||||
clear,
|
||||
will_init,
|
||||
} => {
|
||||
use bytes::BufMut;
|
||||
if *will_init {
|
||||
assert!(*clear, "init record must be clear to ensure correctness");
|
||||
}
|
||||
@@ -261,59 +256,3 @@ pub(crate) fn apply_in_neon(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::AUX_FILES_KEY;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
|
||||
#[test]
|
||||
fn apply_aux_file_deltas() -> anyhow::Result<()> {
|
||||
let base_dir = AuxFilesDirectory {
|
||||
files: HashMap::from([
|
||||
("two".to_string(), Bytes::from_static(b"content0")),
|
||||
("three".to_string(), Bytes::from_static(b"contentX")),
|
||||
]),
|
||||
};
|
||||
let base_image = AuxFilesDirectory::ser(&base_dir)?;
|
||||
|
||||
let deltas = vec![
|
||||
// Insert
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "one".to_string(),
|
||||
content: Some(Bytes::from_static(b"content1")),
|
||||
},
|
||||
// Update
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "two".to_string(),
|
||||
content: Some(Bytes::from_static(b"content99")),
|
||||
},
|
||||
// Delete
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "three".to_string(),
|
||||
content: None,
|
||||
},
|
||||
];
|
||||
|
||||
let file_path = AUX_FILES_KEY;
|
||||
let mut page = BytesMut::from_iter(base_image);
|
||||
|
||||
for record in deltas {
|
||||
apply_in_neon(&record, Lsn(8), file_path, &mut page)?;
|
||||
}
|
||||
|
||||
let reconstructed = AuxFilesDirectory::des(&page)?;
|
||||
let expect = HashMap::from([
|
||||
("one".to_string(), Bytes::from_static(b"content1")),
|
||||
("two".to_string(), Bytes::from_static(b"content99")),
|
||||
]);
|
||||
|
||||
assert_eq!(reconstructed.files, expect);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
5
restart.sh
Executable file
5
restart.sh
Executable file
@@ -0,0 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
|
||||
cargo neon endpoint stop main
|
||||
cargo neon endpoint start main --create-test-user true
|
||||
@@ -498,21 +498,18 @@ impl WalAcceptor {
|
||||
// we will send keepalives by replying to these requests once per second.
|
||||
let mut next_keepalive = Instant::now();
|
||||
|
||||
loop {
|
||||
let opt_msg = self.msg_rx.recv().await;
|
||||
if opt_msg.is_none() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
}
|
||||
let mut next_msg = opt_msg.unwrap();
|
||||
|
||||
while let Some(mut next_msg) = self.msg_rx.recv().await {
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
}
|
||||
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
// loop through AppendRequest's while it's readily available to
|
||||
// write as many WAL as possible without fsyncing
|
||||
// Loop through AppendRequests while available to write as many WAL records as
|
||||
// possible without fsyncing.
|
||||
//
|
||||
// Make sure the WAL is flushed before returning, see:
|
||||
// https://github.com/neondatabase/neon/issues/9259
|
||||
//
|
||||
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
|
||||
// Otherwise, we might end up in a situation where we read a message, but don't
|
||||
@@ -522,7 +519,7 @@ impl WalAcceptor {
|
||||
|
||||
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
|
||||
if self.reply_tx.send(reply).await.is_err() {
|
||||
return Ok(()); // chan closed, streaming terminated
|
||||
break; // disconnected, flush WAL and return on next send/recv
|
||||
}
|
||||
}
|
||||
|
||||
@@ -531,11 +528,13 @@ impl WalAcceptor {
|
||||
break;
|
||||
}
|
||||
|
||||
// continue pulling AppendRequests if available
|
||||
match self.msg_rx.try_recv() {
|
||||
Ok(msg) => next_msg = msg,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
|
||||
}
|
||||
// on disconnect, flush WAL and return on next send/recv
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
};
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
@@ -555,5 +554,6 @@ impl WalAcceptor {
|
||||
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2862,17 +2862,12 @@ impl Service {
|
||||
let _tenant_lock =
|
||||
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;
|
||||
|
||||
// Detach all shards
|
||||
let (detach_waiters, shard_ids, node) = {
|
||||
let mut shard_ids = Vec::new();
|
||||
// Detach all shards. This also deletes local pageserver shard data.
|
||||
let (detach_waiters, node) = {
|
||||
let mut detach_waiters = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
for (tenant_shard_id, shard) in
|
||||
tenants.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
shard_ids.push(*tenant_shard_id);
|
||||
|
||||
for (_, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
// Update the tenant's intent to remove all attachments
|
||||
shard.policy = PlacementPolicy::Detached;
|
||||
shard
|
||||
@@ -2892,7 +2887,7 @@ impl Service {
|
||||
let node = nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while lock is active");
|
||||
(detach_waiters, shard_ids, node.clone())
|
||||
(detach_waiters, node.clone())
|
||||
};
|
||||
|
||||
// This reconcile wait can fail in a few ways:
|
||||
@@ -2907,38 +2902,34 @@ impl Service {
|
||||
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
let locations = shard_ids
|
||||
.into_iter()
|
||||
.map(|s| (s, node.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
let results = self.tenant_for_shards_api(
|
||||
locations,
|
||||
|tenant_shard_id, client| async move { client.tenant_delete(tenant_shard_id).await },
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(StatusCode::ACCEPTED) => {
|
||||
// This should never happen: we waited for detaches to finish above
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Unexpectedly still attached on {}",
|
||||
node
|
||||
)));
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Err(e) => {
|
||||
// This is unexpected: remote deletion should be infallible, unless the object store
|
||||
// at large is unavailable.
|
||||
tracing::error!("Error deleting via node {}: {e}", node);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
|
||||
}
|
||||
// Delete the entire tenant (all shards) from remote storage via a random pageserver.
|
||||
// Passing an unsharded tenant ID will cause the pageserver to remove all remote paths with
|
||||
// the tenant ID prefix, including all shards (even possibly stale ones).
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
client
|
||||
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(Err(mgmt_api::Error::Cancelled))
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Err(e) => {
|
||||
// This is unexpected: remote deletion should be infallible, unless the object store
|
||||
// at large is unavailable.
|
||||
tracing::error!("Error deleting via node {node}: {e}");
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ Prerequisites:
|
||||
- Correctly configured Python, see [`/docs/sourcetree.md`](/docs/sourcetree.md#using-python)
|
||||
- Neon and Postgres binaries
|
||||
- See the root [README.md](/README.md) for build directions
|
||||
If you want to test tests with test-only APIs, you would need to add `--features testing` to Rust code build commands.
|
||||
To run tests you need to add `--features testing` to Rust code build commands.
|
||||
For convenience, repository cargo config contains `build_testing` alias, that serves as a subcommand, adding the required feature flags.
|
||||
Usage example: `cargo build_testing --release` is equivalent to `cargo build --features testing --release`
|
||||
- Tests can be run from the git tree; or see the environment variables
|
||||
|
||||
@@ -303,9 +303,10 @@ def assert_prefix_empty(
|
||||
remote_storage: Optional[RemoteStorage],
|
||||
prefix: Optional[str] = None,
|
||||
allowed_postfix: Optional[str] = None,
|
||||
delimiter: str = "/",
|
||||
) -> None:
|
||||
assert remote_storage is not None
|
||||
response = list_prefix(remote_storage, prefix)
|
||||
response = list_prefix(remote_storage, prefix, delimiter)
|
||||
keys = response["KeyCount"]
|
||||
objects: list[ObjectTypeDef] = response.get("Contents", [])
|
||||
common_prefixes = response.get("CommonPrefixes", [])
|
||||
@@ -338,16 +339,18 @@ def assert_prefix_empty(
|
||||
if not (allowed_postfix.endswith(key)):
|
||||
filtered_count += 1
|
||||
|
||||
assert (
|
||||
filtered_count == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
assert filtered_count == 0, f"remote prefix {prefix} is not empty: {objects}"
|
||||
|
||||
|
||||
# remote_storage must not be None, but that's easier for callers to make mypy happy
|
||||
def assert_prefix_not_empty(remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None):
|
||||
def assert_prefix_not_empty(
|
||||
remote_storage: Optional[RemoteStorage],
|
||||
prefix: Optional[str] = None,
|
||||
delimiter: str = "/",
|
||||
):
|
||||
assert remote_storage is not None
|
||||
response = list_prefix(remote_storage, prefix)
|
||||
assert response["KeyCount"] != 0, f"remote dir with prefix {prefix} is empty: {response}"
|
||||
assert response["KeyCount"] != 0, f"remote prefix {prefix} is empty: {response}"
|
||||
|
||||
|
||||
def list_prefix(
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
AuxFileStore,
|
||||
NeonEnvBuilder,
|
||||
logical_replication_sync,
|
||||
)
|
||||
|
||||
|
||||
def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
tenant_config = client.tenant_config(tenant_id).effective_config
|
||||
tenant_config["switch_aux_file_policy"] = AuxFileStore.V2
|
||||
client.set_tenant_config(tenant_id, tenant_config)
|
||||
# aux file v2 is enabled on the write path, so for now, it should be unset (or null)
|
||||
assert (
|
||||
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"]
|
||||
is None
|
||||
)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("create table t(pk integer primary key, payload integer)")
|
||||
cur.execute(
|
||||
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));"
|
||||
)
|
||||
cur.execute("create publication pub1 for table t, replication_example")
|
||||
|
||||
# now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils)
|
||||
# instead of going through the full logical replication process.
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
|
||||
vanilla_pg.safe_psql(
|
||||
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);"
|
||||
)
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
# Wait logical replication channel to be established
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
vanilla_pg.stop()
|
||||
endpoint.stop()
|
||||
|
||||
with env.pageserver.http_client() as client:
|
||||
# aux file v2 flag should be enabled at this point
|
||||
assert (
|
||||
client.timeline_detail(tenant_id, timeline_id)["last_aux_file_policy"]
|
||||
== AuxFileStore.V2
|
||||
)
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_config = client.tenant_config(tenant_id).effective_config
|
||||
tenant_config["switch_aux_file_policy"] = "V1"
|
||||
client.set_tenant_config(tenant_id, tenant_config)
|
||||
# the flag should still be enabled
|
||||
assert (
|
||||
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
|
||||
"last_aux_file_policy"
|
||||
]
|
||||
== AuxFileStore.V2
|
||||
)
|
||||
env.pageserver.restart()
|
||||
with env.pageserver.http_client() as client:
|
||||
# aux file v2 flag should be persisted
|
||||
assert (
|
||||
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
|
||||
"last_aux_file_policy"
|
||||
]
|
||||
== AuxFileStore.V2
|
||||
)
|
||||
@@ -20,6 +20,7 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from requests.exceptions import ReadTimeout
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
@@ -404,3 +405,57 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, make_httpserver, neon_env_builder
|
||||
cloud_admin_api_token=cloud_admin_token,
|
||||
)
|
||||
assert healthy
|
||||
|
||||
|
||||
def test_tenant_delete_stale_shards(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Deleting a tenant should also delete any stale (pre-split) shards from remote storage.
|
||||
"""
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create an unsharded tenant.
|
||||
tenant_id, timeline_id = env.create_tenant()
|
||||
|
||||
# Write some data.
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
workload.write_rows(256)
|
||||
workload.validate()
|
||||
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(("tenants", str(tenant_id))),
|
||||
)
|
||||
|
||||
# Upload a heatmap as well.
|
||||
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
# Split off a few shards, in two rounds.
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=16)
|
||||
|
||||
# Delete the tenant. This should also delete data for the unsharded and count=4 parents.
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id=tenant_id)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(("tenants", str(tenant_id))),
|
||||
delimiter="", # match partial prefixes, i.e. all shards
|
||||
)
|
||||
|
||||
dirs = list(env.pageserver.tenant_dir(None).glob(f"{tenant_id}*"))
|
||||
assert dirs == [], f"found tenant directories: {dirs}"
|
||||
|
||||
# The initial tenant created by the test harness should still be there.
|
||||
# Only the tenant we deleted should be removed.
|
||||
assert_prefix_not_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(("tenants", str(env.initial_tenant))),
|
||||
)
|
||||
dirs = list(env.pageserver.tenant_dir(None).glob(f"{env.initial_tenant}*"))
|
||||
assert dirs != [], "missing initial tenant directory"
|
||||
|
||||
env.stop()
|
||||
|
||||
@@ -119,6 +119,10 @@ def test_timeline_archive(neon_env_builder: NeonEnvBuilder, shard_count: int):
|
||||
|
||||
@pytest.mark.parametrize("manual_offload", [False, True])
|
||||
def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: bool):
|
||||
if not manual_offload:
|
||||
# (automatic) timeline offloading defaults to false for now
|
||||
neon_env_builder.pageserver_config_override = "timeline_offloading = true"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user