Compare commits

..

3 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3984b5c03f Update test_compute_restart.py 2024-10-17 14:37:24 +03:00
Konstantin Knizhnik
aa3ef2f048 Update test_compute_restart.py 2024-10-17 10:29:08 +03:00
Konstantin Knizhnik
5f37c3802e Add test restarting compute node to investigate flukyness of test_subscriber_restart 2024-10-17 08:52:56 +03:00
40 changed files with 1771 additions and 1384 deletions

View File

@@ -353,10 +353,13 @@ COPY compute/patches/pgvector.patch /pgvector.patch
# because we build the images on different machines than where we run them.
# Pass OPTFLAGS="" to remove it.
#
# vector 0.7.4 supports v17
# last release v0.7.4 - Aug 5, 2024
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.4.tar.gz -O pgvector.tar.gz && \
echo "0341edf89b1924ae0d552f617e14fb7f8867c0194ed775bcc44fa40288642583 pgvector.tar.gz" | sha256sum --check && \
# v17 is not supported yet because of upstream issue
# https://github.com/pgvector/pgvector/issues/669
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
patch -p1 < /pgvector.patch && \
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -787,22 +790,6 @@ RUN case "${PG_VERSION}" in \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/semver.control
#########################################################################################
#
# Layer "pg-sudo"
# compile pg-neon-sudo extension
#
#########################################################################################
FROM build-deps AS pg-sudo-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/skyzh/pg_sudo/archive/refs/heads/main.tar.gz -O pg_sudo.tar.gz && \
mkdir pg_sudo-src && cd pg_sudo-src && tar xzf ../pg_sudo.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_neon_sudo.control
#########################################################################################
#
# Layer "pg-embedding-pg-build"
@@ -843,7 +830,11 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# This is an experimental extension, never got to real production.
# !Do not remove! It can be present in shared_preload_libraries and compute will fail to start if library is not found.
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/yliang412/postgresql_anonymizer/archive/refs/heads/master.tar.gz -O pg_anon.tar.gz && \
RUN case "${PG_VERSION}" in "v17") \
echo "postgresql_anonymizer does not yet support PG17" && exit 0;; \
esac && \
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
@@ -1096,7 +1087,6 @@ COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-sudo-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -1285,7 +1275,7 @@ COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
#COPY --from=pg-embedding-pg-build /home/nonroot/pg_embedding-src/ /ext-src
#COPY --from=wal2json-pg-build /wal2json_2_5.tar.gz /ext-src
COPY --from=pg-anon-pg-build /pg_anon.tar.gz /ext-src
COPY --from=pg-sudo-pg-build /pg_sudo.tar.gz /ext-src
COPY compute/patches/pg_anon.patch /ext-src
COPY --from=pg-ivm-build /pg_ivm.tar.gz /ext-src
COPY --from=pg-partman-build /pg_partman.tar.gz /ext-src
RUN case "${PG_VERSION}" in "v17") \
@@ -1308,7 +1298,14 @@ RUN case "${PG_VERSION}" in "v17") \
esac && \
cd /ext-src/pg_hint_plan-src && patch -p1 < /ext-src/pg_hint_plan.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN patch -p1 </ext-src/pg_cron.patch
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
patch -p1 </ext-src/pg_anon.patch
RUN case "${PG_VERSION}" in "v17") \
echo "v17 extensions are not supported yet. Quit" && exit 0;; \
esac && \
patch -p1 </ext-src/pg_cron.patch
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433

View File

@@ -34,7 +34,6 @@ use nix::sys::signal::{kill, Signal};
use remote_storage::{DownloadError, RemotePath};
use crate::checker::create_availability_check_data;
use crate::installed_extensions::get_installed_extensions_sync;
use crate::local_proxy;
use crate::logger::inlinify;
use crate::pg_helpers::*;
@@ -1122,11 +1121,6 @@ impl ComputeNode {
self.pg_reload_conf()?;
}
self.post_apply_config()?;
let connstr = self.connstr.clone();
thread::spawn(move || {
get_installed_extensions_sync(connstr).context("get_installed_extensions")
});
}
let startup_end_time = Utc::now();
@@ -1490,6 +1484,28 @@ LIMIT 100",
info!("Pageserver config changed");
}
}
// Gather info about installed extensions
pub fn get_installed_extensions(&self) -> Result<()> {
let connstr = self.connstr.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
Ok(())
}
}
pub fn forward_termination_signal() {

View File

@@ -1,7 +1,6 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use std::collections::HashMap;
use std::collections::HashSet;
use tracing::info;
use url::Url;
use anyhow::Result;
@@ -80,23 +79,3 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
})
.await?
}
// Gather info about installed extensions
pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
Ok(())
}

View File

@@ -102,7 +102,6 @@ pub struct ConfigToml {
pub ingest_batch_size: u64,
pub max_vectored_read_bytes: MaxVectoredReadBytes,
pub image_compression: ImageCompressionAlgorithm,
pub timeline_offloading: bool,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
@@ -386,7 +385,6 @@ impl Default for ConfigToml {
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: (DEFAULT_IMAGE_COMPRESSION),
timeline_offloading: false,
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_io_mode: None,

View File

@@ -743,6 +743,8 @@ pub struct TimelineInfo {
// Forward compatibility: a previous version of the pageserver will receive a JSON. serde::Deserialize does
// not deny unknown fields by default so it's safe to set the field to some value, though it won't be
// read.
/// The last aux file policy being used on this timeline
pub last_aux_file_policy: Option<AuxFilePolicy>,
pub is_archived: Option<bool>,
}

View File

@@ -19,12 +19,7 @@ mod simulate_failures;
mod support;
use std::{
collections::HashMap,
fmt::Debug,
num::NonZeroU32,
ops::Bound,
pin::{pin, Pin},
sync::Arc,
collections::HashMap, fmt::Debug, num::NonZeroU32, ops::Bound, pin::Pin, sync::Arc,
time::SystemTime,
};
@@ -33,7 +28,6 @@ use camino::{Utf8Path, Utf8PathBuf};
use bytes::Bytes;
use futures::{stream::Stream, StreamExt};
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
@@ -267,7 +261,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<Listing, DownloadError> {
let mut stream = pin!(self.list_streaming(prefix, mode, max_keys, cancel));
let mut stream = std::pin::pin!(self.list_streaming(prefix, mode, max_keys, cancel));
let mut combined = stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
@@ -330,35 +324,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
cancel: &CancellationToken,
) -> anyhow::Result<()>;
/// Deletes all objects matching the given prefix.
///
/// NB: this uses NoDelimiter and will match partial prefixes. For example, the prefix /a/b will
/// delete /a/b, /a/b/*, /a/bc, /a/bc/*, etc.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will
/// be set to `TimeoutOrCancel`. In such situation it is unknown which deletions, if any, went
/// through.
async fn delete_prefix(
&self,
prefix: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let mut stream =
pin!(self.list_streaming(Some(prefix), ListingMode::NoDelimiter, None, cancel));
while let Some(result) = stream.next().await {
let keys = match result {
Ok(listing) if listing.keys.is_empty() => continue,
Ok(listing) => listing.keys.into_iter().map(|o| o.key).collect_vec(),
Err(DownloadError::Cancelled) => return Err(TimeoutOrCancel::Cancel.into()),
Err(DownloadError::Timeout) => return Err(TimeoutOrCancel::Timeout.into()),
Err(err) => return Err(err.into()),
};
tracing::info!("Deleting {} keys from remote storage", keys.len());
self.delete_objects(&keys, cancel).await?;
}
Ok(())
}
/// Copy a remote object inside a bucket from one path to another.
async fn copy(
&self,
@@ -523,20 +488,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
/// See [`RemoteStorage::delete_prefix`]
pub async fn delete_prefix(
&self,
prefix: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.delete_prefix(prefix, cancel).await,
Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
}
}
/// See [`RemoteStorage::copy`]
pub async fn copy_object(
&self,

View File

@@ -199,138 +199,6 @@ async fn list_no_delimiter_works(
Ok(())
}
/// Tests that giving a partial prefix returns all matches (e.g. "/foo" yields "/foobar/baz"),
/// but only with NoDelimiter.
#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
#[tokio::test]
async fn list_partial_prefix(
ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs,
) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
anyhow::bail!("S3 init failed: {e:?}")
}
};
let cancel = CancellationToken::new();
let test_client = Arc::clone(&ctx.enabled.client);
// Prefix "fold" should match all "folder{i}" directories with NoDelimiter.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("fold")?),
ListingMode::NoDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert_eq!(&objects, &ctx.remote_blobs);
// Prefix "fold" matches nothing with WithDelimiter.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("fold")?),
ListingMode::WithDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert!(objects.is_empty());
// Prefix "" matches everything.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("")?),
ListingMode::NoDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert_eq!(&objects, &ctx.remote_blobs);
// Prefix "" matches nothing with WithDelimiter.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("")?),
ListingMode::WithDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert!(objects.is_empty());
// Prefix "foo" matches nothing.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("foo")?),
ListingMode::NoDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert!(objects.is_empty());
// Prefix "folder2/blob" matches.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("folder2/blob")?),
ListingMode::NoDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
let expect: HashSet<_> = ctx
.remote_blobs
.iter()
.filter(|o| o.get_path().starts_with("folder2"))
.cloned()
.collect();
assert_eq!(&objects, &expect);
// Prefix "folder2/foo" matches nothing.
let objects: HashSet<_> = test_client
.list(
Some(&RemotePath::from_string("folder2/foo")?),
ListingMode::NoDelimiter,
None,
&cancel,
)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert!(objects.is_empty());
Ok(())
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn delete_non_exising_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
@@ -397,80 +265,6 @@ async fn delete_objects_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<(
Ok(())
}
/// Tests that delete_prefix() will delete all objects matching a prefix, including
/// partial prefixes (i.e. "/foo" matches "/foobar").
#[test_context(MaybeEnabledStorageWithSimpleTestBlobs)]
#[tokio::test]
async fn delete_prefix(ctx: &mut MaybeEnabledStorageWithSimpleTestBlobs) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledStorageWithSimpleTestBlobs::Enabled(ctx) => ctx,
MaybeEnabledStorageWithSimpleTestBlobs::Disabled => return Ok(()),
MaybeEnabledStorageWithSimpleTestBlobs::UploadsFailed(e, _) => {
anyhow::bail!("S3 init failed: {e:?}")
}
};
let cancel = CancellationToken::new();
let test_client = Arc::clone(&ctx.enabled.client);
/// Asserts that the S3 listing matches the given paths.
macro_rules! assert_list {
($expect:expr) => {{
let listing = test_client
.list(None, ListingMode::NoDelimiter, None, &cancel)
.await?
.keys
.into_iter()
.map(|o| o.key)
.collect();
assert_eq!($expect, listing);
}};
}
// We start with the full set of uploaded files.
let mut expect = ctx.remote_blobs.clone();
// Deleting a non-existing prefix should do nothing.
test_client
.delete_prefix(&RemotePath::from_string("xyz")?, &cancel)
.await?;
assert_list!(expect);
// Prefixes are case-sensitive.
test_client
.delete_prefix(&RemotePath::from_string("Folder")?, &cancel)
.await?;
assert_list!(expect);
// Deleting a path which overlaps with an existing object should do nothing. We pick the first
// path in the set as our common prefix.
let path = expect.iter().next().expect("empty set").clone().join("xyz");
test_client.delete_prefix(&path, &cancel).await?;
assert_list!(expect);
// Deleting an exact path should work. We pick the first path in the set.
let path = expect.iter().next().expect("empty set").clone();
test_client.delete_prefix(&path, &cancel).await?;
expect.remove(&path);
assert_list!(expect);
// Deleting a prefix should delete all matching objects.
test_client
.delete_prefix(&RemotePath::from_string("folder0/blob_")?, &cancel)
.await?;
expect.retain(|p| !p.get_path().as_str().starts_with("folder0/"));
assert_list!(expect);
// Deleting a common prefix should delete all objects.
test_client
.delete_prefix(&RemotePath::from_string("fold")?, &cancel)
.await?;
expect.clear();
assert_list!(expect);
Ok(())
}
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn upload_download_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {

View File

@@ -164,9 +164,6 @@ pub struct PageServerConf {
pub image_compression: ImageCompressionAlgorithm,
/// Whether to offload archived timelines automatically
pub timeline_offloading: bool,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
@@ -324,7 +321,6 @@ impl PageServerConf {
ingest_batch_size,
max_vectored_read_bytes,
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_io_mode,
@@ -368,7 +364,6 @@ impl PageServerConf {
ingest_batch_size,
max_vectored_read_bytes,
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
// ------------------------------------------------------------

View File

@@ -1218,7 +1218,16 @@ mod filesystem_level_usage {
let stat = Statvfs::get(tenants_dir, mock_config)
.context("statvfs failed, presumably directory got unlinked")?;
let (avail_bytes, total_bytes) = stat.get_avail_total_bytes();
// https://unix.stackexchange.com/a/703650
let blocksize = if stat.fragment_size() > 0 {
stat.fragment_size()
} else {
stat.block_size()
};
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
let avail_bytes = stat.blocks_available() * blocksize;
let total_bytes = stat.blocks() * blocksize;
Ok(Usage {
config,

View File

@@ -18,6 +18,7 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
@@ -473,6 +474,8 @@ async fn build_timeline_info_common(
is_archived: Some(is_archived),
walreceiver_status,
last_aux_file_policy: timeline.last_aux_file_policy.load(),
};
Ok(info)
}
@@ -2251,7 +2254,7 @@ async fn tenant_scan_remote_handler(
%timeline_id))
.await
{
Ok((index_part, index_generation, _index_mtime)) => {
Ok((index_part, index_generation)) => {
tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)",
index_part.layer_metadata.len(), index_part.metadata.disk_consistent_lsn());
generation = std::cmp::max(generation, index_generation);
@@ -2396,6 +2399,31 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}
async fn force_aux_policy_switch_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
let policy: AuxFilePolicy = json_request(&mut r).await?;
let state = get_state(&r);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
timeline
.do_switch_aux_policy(policy)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
@@ -3108,6 +3136,10 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
)
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",

View File

@@ -22,6 +22,7 @@ use pageserver_api::key::{
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -32,7 +33,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -676,6 +677,21 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
async fn list_aux_files_v1(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get(AUX_FILES_KEY, lsn, ctx).await {
Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
Ok(HashMap::new())
}
}
}
async fn list_aux_files_v2(
&self,
lsn: Lsn,
@@ -706,7 +722,10 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
self.list_aux_files_v2(lsn, ctx).await?;
let current_policy = self.last_aux_file_policy.load();
if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
self.list_aux_files_v2(lsn, ctx).await?;
}
Ok(())
}
@@ -715,7 +734,51 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
self.list_aux_files_v2(lsn, ctx).await
let current_policy = self.last_aux_file_policy.load();
match current_policy {
Some(AuxFilePolicy::V1) => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
let empty_str = if res.is_empty() { ", empty" } else { "" };
warn!(
"this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
);
Ok(res)
}
None => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
if !res.is_empty() {
warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
}
Ok(res)
}
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
Some(AuxFilePolicy::CrossValidation) => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
(Ok(v1), Ok(v2)) => {
if v1 != v2 {
tracing::error!(
"unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
);
return Err(PageReconstructError::Other(anyhow::anyhow!(
"unmatched aux file v1 v2 result"
)));
}
Ok(v1)
}
(Ok(_), Err(v2)) => {
tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
Err(v2)
}
(Err(v1), Ok(_)) => {
tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
Err(v1)
}
(Err(_), Err(v2)) => Err(v2),
}
}
}
}
pub(crate) async fn get_replorigins(
@@ -891,6 +954,9 @@ impl Timeline {
result.add_key(CONTROLFILE_KEY);
result.add_key(CHECKPOINT_KEY);
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
result.add_key(AUX_FILES_KEY);
}
// Add extra keyspaces in the test cases. Some test cases write keys into the storage without
// creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
@@ -1100,6 +1166,9 @@ impl<'a> DatadirModification<'a> {
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory
self.init_aux_dir()?;
let buf = if self.tline.pg_version >= 17 {
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
xids: HashSet::new(),
@@ -1278,6 +1347,9 @@ impl<'a> DatadirModification<'a> {
// 'true', now write the updated 'dbdirs' map back.
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory as well
self.init_aux_dir()?;
}
if r.is_none() {
// Create RelDirectory
@@ -1654,60 +1726,200 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
return Ok(());
}
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, 0));
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
Ok(())
}
pub async fn put_file(
&mut self,
path: &str,
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
let switch_policy = self.tline.get_switch_aux_file_policy();
let policy = {
let current_policy = self.tline.last_aux_file_policy.load();
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
let current_policy = if current_policy.is_none() {
// This path will only be hit once per tenant: we will decide the final policy in this code block.
// The next call to `put_file` will always have `last_aux_file_policy != None`.
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
if aux_files_key_v1.is_empty() {
None
} else {
warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
} else {
other_files.push(file);
current_policy
};
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.do_switch_aux_policy(switch_policy)?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
// This branch handles non-valid migration path, and the case that switch_policy == current_policy.
// And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
}
};
if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
other_files.push(file);
}
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let n_files;
let mut aux_files = self.tline.aux_files.lock().await;
if let Some(mut dir) = aux_files.dir.take() {
// We already updated aux files in `self`: emit a delta and update our latest value.
dir.upsert(file_path.clone(), content.clone());
n_files = dir.files.len();
if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
aux_files.n_deltas = 0;
} else {
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
);
aux_files.n_deltas += 1;
}
aux_files.dir = Some(dir);
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
n_files = dir.files.len();
aux_files.dir = Some(dir);
}
Err(
e @ (PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
e @ (PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey(_)),
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
if !matches!(e, PageReconstructError::MissingKey(_)) {
let e = utils::error::report_compact_sources(&e);
tracing::warn!("treating error as if it was a missing key: {}", e);
}
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
n_files = 1;
aux_files.dir = Some(dir);
}
}
}
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => warn!("removing non-existing aux file: {}", path),
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, n_files));
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
Ok(())
}
@@ -1877,6 +2089,12 @@ impl<'a> DatadirModification<'a> {
self.tline.get(key, lsn, ctx).await
}
/// Only used during unit tests, force putting a key into the modification.
#[cfg(test)]
pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
self.put(key, val);
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
@@ -1994,6 +2212,21 @@ struct RelDirectory {
rels: HashSet<(Oid, u8)>,
}
#[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}
impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct RelSizeEntry {
nblocks: u32,

View File

@@ -53,22 +53,6 @@ impl Statvfs {
Statvfs::Mock(stat) => stat.block_size,
}
}
/// Get the available and total bytes on the filesystem.
pub fn get_avail_total_bytes(&self) -> (u64, u64) {
// https://unix.stackexchange.com/a/703650
let blocksize = if self.fragment_size() > 0 {
self.fragment_size()
} else {
self.block_size()
};
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
let avail_bytes = self.blocks_available() * blocksize;
let total_bytes = self.blocks() * blocksize;
(avail_bytes, total_bytes)
}
}
pub mod mock {

View File

@@ -20,6 +20,7 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::models::TimelineState;
@@ -799,6 +800,7 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
@@ -809,6 +811,10 @@ impl Tenant {
ancestor.clone(),
resources,
CreateTimelineCause::Load,
// This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`,
// there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence.
// Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2.
last_aux_file_policy,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
@@ -823,6 +829,10 @@ impl Tenant {
if let Some(index_part) = index_part.as_ref() {
timeline.remote_client.init_upload_queue(index_part)?;
timeline
.last_aux_file_policy
.store(index_part.last_aux_file_policy());
} else {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
@@ -1393,12 +1403,15 @@ impl Tenant {
None
};
let last_aux_file_policy = index_part.last_aux_file_policy();
self.timeline_init_and_sync(
timeline_id,
resources,
Some(index_part),
remote_metadata,
ancestor,
last_aux_file_policy,
ctx,
)
.await
@@ -1811,6 +1824,7 @@ impl Tenant {
create_guard,
initdb_lsn,
None,
None,
)
.await
}
@@ -2173,8 +2187,7 @@ impl Tenant {
.iter()
.any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id))
};
let can_offload =
can_offload && has_no_unoffloaded_children && self.conf.timeline_offloading;
let can_offload = can_offload && has_no_unoffloaded_children;
if (is_active, can_offload) == (false, false) {
None
} else {
@@ -3018,6 +3031,7 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
@@ -3046,6 +3060,7 @@ impl Tenant {
resources,
pg_version,
state,
last_aux_file_policy,
self.attach_wal_lag_cooldown.clone(),
self.cancel.child_token(),
);
@@ -3704,6 +3719,7 @@ impl Tenant {
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
src_timeline.last_aux_file_policy.load(),
)
.await?;
@@ -3897,6 +3913,7 @@ impl Tenant {
timeline_create_guard,
pgdata_lsn,
None,
None,
)
.await?;
@@ -3968,6 +3985,7 @@ impl Tenant {
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<UninitializedTimeline<'a>> {
let tenant_shard_id = self.tenant_shard_id;
@@ -3983,6 +4001,7 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
last_aux_file_policy,
)
.context("Failed to create timeline data structure")?;
@@ -4580,6 +4599,7 @@ mod tests {
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
@@ -4588,7 +4608,7 @@ mod tests {
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use itertools::Itertools;
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use rand::{thread_rng, Rng};
@@ -4597,6 +4617,7 @@ mod tests {
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
use timeline::{DeltaLayerTestDesc, GcInfo};
use utils::bin_ser::BeSer;
use utils::id::TenantId;
static TEST_KEY: Lazy<Key> =
@@ -6400,9 +6421,16 @@ mod tests {
}
#[tokio::test]
async fn test_aux_file_e2e() {
let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap();
async fn test_branch_copies_dirty_aux_file_flag() {
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag")
.await
.unwrap();
// the default aux file policy to switch is v2 if not set by the admins
assert_eq!(
harness.tenant_conf.switch_aux_file_policy,
AuxFilePolicy::default_tenant_config()
);
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
@@ -6412,6 +6440,9 @@ mod tests {
.await
.unwrap();
// no aux file is written at this point, so the persistent flag should be unset
assert_eq!(tline.last_aux_file_policy.load(), None);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
@@ -6422,6 +6453,30 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
// there is no tenant manager to pass the configuration through, so lets mimic it
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V2,
"wanted state has been updated"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"aux file is written with switch_aux_file_policy unset (which is v2), so we should use v2 there"
);
// we can read everything from the storage
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
@@ -6439,6 +6494,12 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"keep v2 storage format when new files are written"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
@@ -6450,9 +6511,321 @@ mod tests {
.await
.unwrap();
// child copies the last flag even if that is not on remote storage yet
assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2);
assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let files = child.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
assert_eq!(files.get("pg_logical/mappings/test2"), None);
// even if we crash here without flushing parent timeline with it's new
// last_aux_file_policy we are safe, because child was never meant to access ancestor's
// files. the ancestor can even switch back to V1 because of a migration safely.
}
#[tokio::test]
async fn aux_file_policy_switch() {
let mut harness = TenantHarness::create("aux_file_policy_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
// there is no tenant manager to pass the configuration through, so lets mimic it
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V2,
"wanted state has been updated"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::CrossValidation),
"dirty index_part.json reflected state is yet to be updated"
);
// we can still read the auxfile v1 before we ingest anything new
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"ingesting a file should apply the wanted switch state when applicable"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first")),
"cross validation writes to both v1 and v2 so this should be available in v2"
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// mimic again by trying to flip it from V2 to V1 (not switched to while ingesting a file)
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V1),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"third", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V1,
"wanted state has been updated again, even if invalid request"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"ingesting a file should apply the wanted switch state when applicable"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"third"))
);
// mimic again by trying to flip it from from V1 to V2 (not switched to while ingesting a file)
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test3", b"last", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(tline.get_switch_aux_file_policy(), AuxFilePolicy::V2);
assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"third"))
);
assert_eq!(
files.get("pg_logical/mappings/test3"),
Some(&bytes::Bytes::from_static(b"last"))
);
}
#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"dirty index_part.json reflected state is yet to be updated"
);
// lose all data from v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
// read data ingested in v2
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// lose all data from v1
assert_eq!(files.get("pg_logical/mappings/test1"), None);
}
#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: vec![(
"test_file".to_string(),
Bytes::copy_from_slice(b"test_file"),
)]
.into_iter()
.collect(),
})
.unwrap();
modification.put_for_test(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
modification.commit(&ctx).await.unwrap();
}
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V1),
"keep using v1 because there are aux files writting with v1"
);
// we can still read the auxfile v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("test_file"),
Some(&bytes::Bytes::from_static(b"test_file"))
);
}
#[tokio::test]

View File

@@ -11,7 +11,6 @@ use pageserver_api::shard::{
};
use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::{distributions::Alphanumeric, Rng};
use remote_storage::TimeoutOrCancel;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};
@@ -1351,17 +1350,47 @@ impl TenantManager {
}
}
async fn delete_tenant_remote(
&self,
tenant_shard_id: TenantShardId,
) -> Result<(), DeleteTenantError> {
let remote_path = remote_tenant_path(&tenant_shard_id);
let mut keys_stream = self.resources.remote_storage.list_streaming(
Some(&remote_path),
remote_storage::ListingMode::NoDelimiter,
None,
&self.cancel,
);
while let Some(chunk) = keys_stream.next().await {
let keys = match chunk {
Ok(listing) => listing.keys,
Err(remote_storage::DownloadError::Cancelled) => {
return Err(DeleteTenantError::Cancelled)
}
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
};
if keys.is_empty() {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)
.await?;
}
}
Ok(())
}
/// If a tenant is attached, detach it. Then remove its data from remote storage.
///
/// A tenant is considered deleted once it is gone from remote storage. It is the caller's
/// responsibility to avoid trying to attach the tenant again or use it any way once deletion
/// has started: this operation is not atomic, and must be retried until it succeeds.
///
/// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove
/// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage
/// controller uses this to purge all remote tenant data, including any stale parent shards that
/// may remain after splits. Ideally, this special case would be handled elsewhere. See:
/// <https://github.com/neondatabase/neon/pull/9394>.
pub(crate) async fn delete_tenant(
&self,
tenant_shard_id: TenantShardId,
@@ -1413,29 +1442,25 @@ impl TenantManager {
// in 500 responses to delete requests.
// - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
// 503/retry, rather than kicking off a wasteful concurrent deletion.
// NB: this also deletes partial prefixes, i.e. a <tenant_id> path will delete all
// <tenant_id>_<shard_id>/* objects. See method comment for why.
backoff::retry(
|| async move {
self.resources
.remote_storage
.delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel)
.await
match backoff::retry(
|| async move { self.delete_tenant_remote(tenant_shard_id).await },
|e| match e {
DeleteTenantError::Cancelled => true,
DeleteTenantError::SlotError(_) => {
unreachable!("Remote deletion doesn't touch slots")
}
_ => false,
},
|_| false, // backoff::retry handles cancellation
1,
3,
&format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
&self.cancel,
)
.await
.unwrap_or(Err(TimeoutOrCancel::Cancel.into()))
.map_err(|err| {
if TimeoutOrCancel::caused_by_cancel(&err) {
return DeleteTenantError::Cancelled;
}
DeleteTenantError::Other(err)
})
{
Some(r) => r,
None => Err(DeleteTenantError::Cancelled),
}
}
#[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]

View File

@@ -187,7 +187,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
pub(crate) use download::download_initdb_tar_zst;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
@@ -505,7 +505,7 @@ impl RemoteTimelineClient {
},
);
let (index_part, index_generation, index_last_modified) = download::download_index_part(
let (index_part, _index_generation) = download::download_index_part(
&self.storage_impl,
&self.tenant_shard_id,
&self.timeline_id,
@@ -519,49 +519,6 @@ impl RemoteTimelineClient {
)
.await?;
// Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
// old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
// in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
// when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
// also a newer index available, that is surprising.
const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
if e.duration() > Duration::from_secs(5) {
// We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
// timestamp, it is common to be out by at least 1 second.
tracing::warn!("Index has modification time in the future: {e}");
}
Duration::ZERO
});
if index_age > INDEX_AGE_CHECKS_THRESHOLD {
tracing::info!(
?index_generation,
age = index_age.as_secs_f64(),
"Loaded an old index, checking for other indices..."
);
// Find the highest-generation index
let (_latest_index_part, latest_index_generation, latest_index_mtime) =
download::download_index_part(
&self.storage_impl,
&self.tenant_shard_id,
&self.timeline_id,
Generation::MAX,
cancel,
)
.await?;
if latest_index_generation > index_generation {
// Unexpected! Why are we loading such an old index if a more recent one exists?
tracing::warn!(
?index_generation,
?latest_index_generation,
?latest_index_mtime,
"Found a newer index while loading an old one"
);
}
}
if index_part.deleted_at.is_some() {
Ok(MaybeDeletedIndexPart::Deleted(index_part))
} else {
@@ -671,6 +628,18 @@ impl RemoteTimelineClient {
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
self: &Arc<Self>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.last_aux_file_policy = last_aux_file_policy;
self.schedule_index_upload(upload_queue)?;
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
///
/// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,

View File

@@ -6,7 +6,6 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::time::SystemTime;
use anyhow::{anyhow, Context};
use camino::{Utf8Path, Utf8PathBuf};
@@ -344,10 +343,10 @@ async fn do_download_index_part(
timeline_id: &TimelineId,
index_generation: Generation,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
) -> Result<(IndexPart, Generation), DownloadError> {
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
let (index_part_bytes, index_part_mtime) = download_retry_forever(
let index_part_bytes = download_retry_forever(
|| async {
let download = storage
.download(&remote_path, &DownloadOpts::default(), cancel)
@@ -360,7 +359,7 @@ async fn do_download_index_part(
tokio::io::copy_buf(&mut stream, &mut bytes).await?;
Ok((bytes, download.last_modified))
Ok(bytes)
},
&format!("download {remote_path:?}"),
cancel,
@@ -371,7 +370,7 @@ async fn do_download_index_part(
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
.map_err(DownloadError::Other)?;
Ok((index_part, index_generation, index_part_mtime))
Ok((index_part, index_generation))
}
/// index_part.json objects are suffixed with a generation number, so we cannot
@@ -386,7 +385,7 @@ pub(crate) async fn download_index_part(
timeline_id: &TimelineId,
my_generation: Generation,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
) -> Result<(IndexPart, Generation), DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if my_generation.is_none() {

View File

@@ -133,6 +133,10 @@ impl IndexPart {
pub(crate) fn example() -> Self {
Self::empty(TimelineMetadata::example())
}
pub(crate) fn last_aux_file_policy(&self) -> Option<AuxFilePolicy> {
self.last_aux_file_policy
}
}
/// Metadata gathered for each of the layer files.

View File

@@ -341,10 +341,6 @@ impl Layer {
Ok(())
}
pub(crate) async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
self.0.needs_download().await
}
/// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
/// while the guard exists.
///

View File

@@ -28,9 +28,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -98,12 +98,12 @@ use crate::{
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{
pgdatadir_mapping::DirectoryKind,
virtual_file::{MaybeFatalIo, VirtualFile},
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
virtual_file::{MaybeFatalIo, VirtualFile},
};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use crate::config::PageServerConf;
@@ -206,6 +206,11 @@ pub struct TimelineResources {
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
pub(crate) struct AuxFilesState {
pub(crate) dir: Option<AuxFilesDirectory>,
pub(crate) n_deltas: usize,
}
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
@@ -408,9 +413,15 @@ pub struct Timeline {
timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
/// Size estimator for aux file v2
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
/// Indicate whether aux file v2 storage is enabled.
pub(crate) last_aux_file_policy: AtomicAuxFilePolicy,
/// Some test cases directly place keys into the timeline without actually modifying the directory
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
/// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense
@@ -1554,7 +1565,6 @@ impl Timeline {
}
/// Checks if the internal state of the timeline is consistent with it being able to be offloaded.
///
/// This is neccessary but not sufficient for offloading of the timeline as it might have
/// child timelines that are not offloaded yet.
pub(crate) fn can_offload(&self) -> bool {
@@ -2001,6 +2011,14 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
}
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.switch_aux_file_policy
.unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy)
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2133,6 +2151,7 @@ impl Timeline {
resources: TimelineResources,
pg_version: u32,
state: TimelineState,
aux_file_policy: Option<AuxFilePolicy>,
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
cancel: CancellationToken,
) -> Arc<Self> {
@@ -2262,8 +2281,15 @@ impl Timeline {
timeline_get_throttle: resources.timeline_get_throttle,
aux_files: tokio::sync::Mutex::new(AuxFilesState {
dir: None,
n_deltas: 0,
}),
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy),
#[cfg(test)]
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
@@ -2274,6 +2300,10 @@ impl Timeline {
attach_wal_lag_cooldown,
};
if aux_file_policy == Some(AuxFilePolicy::V1) {
warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)");
}
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -4448,6 +4478,14 @@ impl Timeline {
) -> Result<(), detach_ancestor::Error> {
detach_ancestor::complete(self, tenant, attempt, ctx).await
}
/// Switch aux file policy and schedule upload to the index part.
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
self.last_aux_file_policy.store(Some(policy));
self.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
Ok(())
}
}
impl Drop for Timeline {

View File

@@ -29,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
@@ -1692,45 +1691,6 @@ impl Timeline {
unreachable!("key retention is empty")
}
/// Check how much space is left on the disk
async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
let tenants_dir = self.conf.tenants_path();
let stat = Statvfs::get(&tenants_dir, None)
.context("statvfs failed, presumably directory got unlinked")?;
let (avail_bytes, _) = stat.get_avail_total_bytes();
Ok(avail_bytes)
}
/// Check if the compaction can proceed safely without running out of space. We assume the size
/// upper bound of the produced files of a compaction job is the same as all layers involved in
/// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
/// compaction.
async fn check_compaction_space(
self: &Arc<Self>,
layer_selection: &[Layer],
) -> anyhow::Result<()> {
let available_space = self.check_available_space().await?;
let mut remote_layer_size = 0;
let mut all_layer_size = 0;
for layer in layer_selection {
let needs_download = layer.needs_download().await?;
if needs_download.is_some() {
remote_layer_size += layer.layer_desc().file_size;
}
all_layer_size += layer.layer_desc().file_size;
}
let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
{
return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
}
Ok(())
}
/// An experimental compaction building block that combines compaction with garbage collection.
///
/// The current implementation picks all delta + image layers that are below or intersecting with
@@ -1846,8 +1806,6 @@ impl Timeline {
lowest_retain_lsn
);
self.check_compaction_space(&layer_selection).await?;
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)

View File

@@ -283,6 +283,8 @@ impl DeleteTimelineFlow {
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
// Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace
None,
)
.context("create_timeline_struct")?;

View File

@@ -1,7 +1,8 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use pageserver_api::key::Key;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
@@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
/// Can this request be served by neon redo functions
@@ -234,9 +236,13 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { .. } => {
// No-op: this record will never be created in aux v2.
warn!("AuxFile record should not be created in aux v2");
NeonWalRecord::AuxFile { file_path, content } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(file_path.clone(), content.clone());
page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
#[cfg(test)]
NeonWalRecord::Test {
@@ -244,7 +250,6 @@ pub(crate) fn apply_in_neon(
clear,
will_init,
} => {
use bytes::BufMut;
if *will_init {
assert!(*clear, "init record must be clear to ensure correctness");
}
@@ -256,3 +261,59 @@ pub(crate) fn apply_in_neon(
}
Ok(())
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::key::AUX_FILES_KEY;
use super::*;
use std::collections::HashMap;
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
#[test]
fn apply_aux_file_deltas() -> anyhow::Result<()> {
let base_dir = AuxFilesDirectory {
files: HashMap::from([
("two".to_string(), Bytes::from_static(b"content0")),
("three".to_string(), Bytes::from_static(b"contentX")),
]),
};
let base_image = AuxFilesDirectory::ser(&base_dir)?;
let deltas = vec![
// Insert
NeonWalRecord::AuxFile {
file_path: "one".to_string(),
content: Some(Bytes::from_static(b"content1")),
},
// Update
NeonWalRecord::AuxFile {
file_path: "two".to_string(),
content: Some(Bytes::from_static(b"content99")),
},
// Delete
NeonWalRecord::AuxFile {
file_path: "three".to_string(),
content: None,
},
];
let file_path = AUX_FILES_KEY;
let mut page = BytesMut::from_iter(base_image);
for record in deltas {
apply_in_neon(&record, Lsn(8), file_path, &mut page)?;
}
let reconstructed = AuxFilesDirectory::des(&page)?;
let expect = HashMap::from([
("one".to_string(), Bytes::from_static(b"content1")),
("two".to_string(), Bytes::from_static(b"content99")),
]);
assert_eq!(reconstructed.files, expect);
Ok(())
}
}

View File

@@ -11,9 +11,8 @@ use tokio::net::{lookup_host, TcpStream};
use tracing::field::display;
use tracing::{debug, info};
use super::conn_pool::poll_client;
use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client, Send};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client};
use super::local_conn_pool::{self, LocalClient, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
@@ -32,7 +31,7 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::{compute, EndpointId, Host};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub(crate) config: &'static ProxyConfig,
@@ -200,7 +199,7 @@ impl PoolingBackend {
&self,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
) -> Result<http_conn_pool::Client, HttpConnError> {
info!("pool: looking for an existing connection");
if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
return Ok(client);
@@ -482,7 +481,7 @@ impl ConnectMechanism for TokioMechanism {
}
struct HyperMechanism {
pool: Arc<http_conn_pool::GlobalConnPool<Send>>,
pool: Arc<http_conn_pool::GlobalConnPool>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -492,7 +491,7 @@ struct HyperMechanism {
#[async_trait]
impl ConnectMechanism for HyperMechanism {
type Connection = http_conn_pool::Client<Send>;
type Connection = http_conn_pool::Client;
type ConnectError = HttpConnError;
type Error = HttpConnError;

View File

@@ -1,29 +1,31 @@
use std::collections::HashMap;
use std::fmt;
use std::ops::Deref;
use std::pin::pin;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
use std::task::{ready, Poll};
use std::time::Duration;
use dashmap::DashMap;
use futures::future::poll_fn;
use futures::Future;
use parking_lot::RwLock;
use rand::Rng;
use smallvec::SmallVec;
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{AsyncMessage, Socket};
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use super::backend::HttpConnError;
use crate::auth::backend::ComputeUserInfo;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::MetricsAuxInfo;
use crate::metrics::Metrics;
use super::conn_pool_lib::{Client, ClientInnerExt, ConnInfo, GlobalConnPool};
#[cfg(test)]
use {
super::conn_pool_lib::GlobalConnPoolOptions,
crate::auth::backend::ComputeUserInfo,
std::{sync::atomic, time::Duration},
};
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{DbName, EndpointCacheKey, RoleName};
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
@@ -31,12 +33,34 @@ pub(crate) struct ConnInfoWithAuth {
pub(crate) auth: AuthData,
}
#[derive(Debug, Clone)]
pub(crate) struct ConnInfo {
pub(crate) user_info: ComputeUserInfo,
pub(crate) dbname: DbName,
}
#[derive(Debug, Clone)]
pub(crate) enum AuthData {
Password(SmallVec<[u8; 16]>),
Jwt(String),
}
impl ConnInfo {
// hm, change to hasher to avoid cloning?
pub(crate) fn db_and_user(&self) -> (DbName, RoleName) {
(self.dbname.clone(), self.user_info.user.clone())
}
pub(crate) fn endpoint_cache_key(&self) -> Option<EndpointCacheKey> {
// We don't want to cache http connections for ephemeral endpoints.
if self.user_info.options.is_ephemeral() {
None
} else {
Some(self.user_info.endpoint_cache_key())
}
}
}
impl fmt::Display for ConnInfo {
// use custom display to avoid logging password
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -51,6 +75,402 @@ impl fmt::Display for ConnInfo {
}
}
struct ConnPoolEntry<C: ClientInnerExt> {
conn: ClientInner<C>,
_last_access: std::time::Instant,
}
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
total_conns: usize,
max_conns: usize,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
global_pool_size_max_conns: usize,
}
impl<C: ClientInnerExt> EndpointConnPool<C> {
fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
let Self {
pools,
total_conns,
global_connections_count,
..
} = self;
pools.get_mut(&db_user).and_then(|pool_entries| {
pool_entries.get_conn_entry(total_conns, global_connections_count.clone())
})
}
fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
let Self {
pools,
total_conns,
global_connections_count,
..
} = self;
if let Some(pool) = pools.get_mut(&db_user) {
let old_len = pool.conns.len();
pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
let new_len = pool.conns.len();
let removed = old_len - new_len;
if removed > 0 {
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
}
*total_conns -= removed;
removed > 0
} else {
false
}
}
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
let conn_id = client.conn_id;
if client.is_closed() {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
return;
}
let global_max_conn = pool.read().global_pool_size_max_conns;
if pool
.read()
.global_connections_count
.load(atomic::Ordering::Relaxed)
>= global_max_conn
{
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full");
return;
}
// return connection to the pool
let mut returned = false;
let mut per_db_size = 0;
let total_conns = {
let mut pool = pool.write();
if pool.total_conns < pool.max_conns {
let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
pool_entries.conns.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
returned = true;
per_db_size = pool_entries.conns.len();
pool.total_conns += 1;
pool.global_connections_count
.fetch_add(1, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.inc();
}
pool.total_conns
};
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
}
}
impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
fn drop(&mut self) {
if self.total_conns > 0 {
self.global_connections_count
.fetch_sub(self.total_conns, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(self.total_conns as i64);
}
}
}
pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
conns: Vec<ConnPoolEntry<C>>,
}
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
fn default() -> Self {
Self { conns: Vec::new() }
}
}
impl<C: ClientInnerExt> DbUserConnPool<C> {
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
let old_len = self.conns.len();
self.conns.retain(|conn| !conn.conn.is_closed());
let new_len = self.conns.len();
let removed = old_len - new_len;
*conns -= removed;
removed
}
fn get_conn_entry(
&mut self,
conns: &mut usize,
global_connections_count: Arc<AtomicUsize>,
) -> Option<ConnPoolEntry<C>> {
let mut removed = self.clear_closed_clients(conns);
let conn = self.conns.pop();
if conn.is_some() {
*conns -= 1;
removed += 1;
}
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
conn
}
}
pub(crate) struct GlobalConnPool<C: ClientInnerExt> {
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
global_pool_size: AtomicUsize,
/// Total number of connections in the pool
global_connections_count: Arc<AtomicUsize>,
config: &'static crate::config::HttpConfig,
}
#[derive(Debug, Clone, Copy)]
pub struct GlobalConnPoolOptions {
// Maximum number of connections per one endpoint.
// Can mix different (dbname, username) connections.
// When running out of free slots for a particular endpoint,
// falls back to opening a new connection for each request.
pub max_conns_per_endpoint: usize,
pub gc_epoch: Duration,
pub pool_shards: usize,
pub idle_timeout: Duration,
pub opt_in: bool,
// Total number of connections in the pool.
pub max_total_conns: usize,
}
impl<C: ClientInnerExt> GlobalConnPool<C> {
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
global_pool: DashMap::with_shard_amount(shards),
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
})
}
#[cfg(test)]
pub(crate) fn get_global_connections_count(&self) -> usize {
self.global_connections_count
.load(atomic::Ordering::Relaxed)
}
pub(crate) fn get_idle_timeout(&self) -> Duration {
self.config.pool_options.idle_timeout
}
pub(crate) fn shutdown(&self) {
// drops all strong references to endpoint-pools
self.global_pool.clear();
}
pub(crate) async fn gc_worker(&self, mut rng: impl Rng) {
let epoch = self.config.pool_options.gc_epoch;
let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
loop {
interval.tick().await;
let shard = rng.gen_range(0..self.global_pool.shards().len());
self.gc(shard);
}
}
fn gc(&self, shard: usize) {
debug!(shard, "pool: performing epoch reclamation");
// acquire a random shard lock
let mut shard = self.global_pool.shards()[shard].write();
let timer = Metrics::get()
.proxy
.http_pool_reclaimation_lag_seconds
.start_timer();
let current_len = shard.len();
let mut clients_removed = 0;
shard.retain(|endpoint, x| {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
let EndpointConnPool {
pools, total_conns, ..
} = pool.get_mut();
// ensure that closed clients are removed
for db_pool in pools.values_mut() {
clients_removed += db_pool.clear_closed_clients(total_conns);
}
// we only remove this pool if it has no active connections
if *total_conns == 0 {
info!("pool: discarding pool for endpoint {endpoint}");
return false;
}
}
true
});
let new_len = shard.len();
drop(shard);
timer.observe();
// Do logging outside of the lock.
if clients_removed > 0 {
let size = self
.global_connections_count
.fetch_sub(clients_removed, atomic::Ordering::Relaxed)
- clients_removed;
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(clients_removed as i64);
info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
}
let removed = current_len - new_len;
if removed > 0 {
let global_pool_size = self
.global_pool_size
.fetch_sub(removed, atomic::Ordering::Relaxed)
- removed;
info!("pool: performed global pool gc. size now {global_pool_size}");
}
}
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInner<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(client) = client {
if client.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
info!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
client.session.send(ctx.session_id())?;
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
Ok(None)
}
fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: self.config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
}));
// find or create a pool for this endpoint
let mut created = false;
let pool = self
.global_pool
.entry(endpoint.clone())
.or_insert_with(|| {
created = true;
new_pool
})
.clone();
// log new global pool size
if created {
let global_pool_size = self
.global_pool_size
.fetch_add(1, atomic::Ordering::Relaxed)
+ 1;
info!(
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
);
}
pool
}
}
pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C>>,
ctx: &RequestMonitoring,
@@ -154,7 +574,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
}
.instrument(span));
let inner = ClientInnerRemote {
let inner = ClientInner {
inner: client,
session: tx,
cancel,
@@ -164,7 +584,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
Client::new(inner, conn_info, pool_clone)
}
pub(crate) struct ClientInnerRemote<C: ClientInnerExt> {
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
@@ -172,36 +592,131 @@ pub(crate) struct ClientInnerRemote<C: ClientInnerExt> {
conn_id: uuid::Uuid,
}
impl<C: ClientInnerExt> ClientInnerRemote<C> {
pub(crate) fn inner_mut(&mut self) -> &mut C {
&mut self.inner
impl<C: ClientInnerExt> Drop for ClientInner<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
}
}
pub(crate) fn inner(&self) -> &C {
&self.inner
pub(crate) trait ClientInnerExt: Sync + Send + 'static {
fn is_closed(&self) -> bool;
fn get_process_id(&self) -> i32;
}
impl ClientInnerExt for tokio_postgres::Client {
fn is_closed(&self) -> bool {
self.is_closed()
}
pub(crate) fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
}
pub(crate) fn aux(&self) -> &MetricsAuxInfo {
&self.aux
}
pub(crate) fn get_conn_id(&self) -> uuid::Uuid {
self.conn_id
fn get_process_id(&self) -> i32 {
self.get_process_id()
}
}
impl<C: ClientInnerExt> ClientInner<C> {
pub(crate) fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl<C: ClientInnerExt> Drop for ClientInnerRemote<C> {
impl<C: ClientInnerExt> Client<C> {
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.as_ref().unwrap().aux;
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
}
}
pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInner<C>>,
conn_info: ConnInfo,
pool: Weak<RwLock<EndpointConnPool<C>>>,
}
pub(crate) struct Discard<'a, C: ClientInnerExt> {
conn_info: &'a ConnInfo,
pool: &'a mut Weak<RwLock<EndpointConnPool<C>>>,
}
impl<C: ClientInnerExt> Client<C> {
pub(self) fn new(
inner: ClientInner<C>,
conn_info: ConnInfo,
pool: Weak<RwLock<EndpointConnPool<C>>>,
) -> Self {
Self {
inner: Some(inner),
span: Span::current(),
conn_info,
pool,
}
}
pub(crate) fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
let Self {
inner,
pool,
conn_info,
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(&mut inner.inner, Discard { conn_info, pool })
}
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is not idle");
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
}
}
}
impl<C: ClientInnerExt> Deref for Client<C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self
.inner
.as_ref()
.expect("client inner should not be removed")
.inner
}
}
impl<C: ClientInnerExt> Client<C> {
fn do_drop(&mut self) -> Option<impl FnOnce()> {
let conn_info = self.conn_info.clone();
let client = self
.inner
.take()
.expect("client inner should not be removed");
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
let current_span = self.span.clone();
// return connection to the pool
return Some(move || {
let _span = current_span.enter();
EndpointConnPool::put(&conn_pool, &conn_info, client);
});
}
None
}
}
impl<C: ClientInnerExt> Drop for Client<C> {
fn drop(&mut self) {
// on client drop, tell the conn to shut down
self.cancel.cancel();
if let Some(drop) = self.do_drop() {
tokio::task::spawn_blocking(drop);
}
}
}
@@ -230,12 +745,12 @@ mod tests {
}
}
fn create_inner() -> ClientInnerRemote<MockClient> {
fn create_inner() -> ClientInner<MockClient> {
create_inner_with(MockClient::new(false))
}
fn create_inner_with(client: MockClient) -> ClientInnerRemote<MockClient> {
ClientInnerRemote {
fn create_inner_with(client: MockClient) -> ClientInner<MockClient> {
ClientInner {
inner: client,
session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()),
cancel: CancellationToken::new(),
@@ -282,7 +797,7 @@ mod tests {
{
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
assert_eq!(0, pool.get_global_connections_count());
client.inner_mut().1.discard();
client.inner().1.discard();
// Discard should not add the connection from the pool.
assert_eq!(0, pool.get_global_connections_count());
}

View File

@@ -1,562 +0,0 @@
use dashmap::DashMap;
use parking_lot::RwLock;
use rand::Rng;
use std::{collections::HashMap, sync::Arc, sync::Weak, time::Duration};
use std::{
ops::Deref,
sync::atomic::{self, AtomicUsize},
};
use tokio_postgres::ReadyForQueryStatus;
use crate::control_plane::messages::ColdStartInfo;
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{
auth::backend::ComputeUserInfo, context::RequestMonitoring, DbName, EndpointCacheKey, RoleName,
};
use super::conn_pool::ClientInnerRemote;
use tracing::info;
use tracing::{debug, Span};
use super::backend::HttpConnError;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfo {
pub(crate) user_info: ComputeUserInfo,
pub(crate) dbname: DbName,
}
impl ConnInfo {
// hm, change to hasher to avoid cloning?
pub(crate) fn db_and_user(&self) -> (DbName, RoleName) {
(self.dbname.clone(), self.user_info.user.clone())
}
pub(crate) fn endpoint_cache_key(&self) -> Option<EndpointCacheKey> {
// We don't want to cache http connections for ephemeral endpoints.
if self.user_info.options.is_ephemeral() {
None
} else {
Some(self.user_info.endpoint_cache_key())
}
}
}
pub(crate) struct ConnPoolEntry<C: ClientInnerExt> {
pub(crate) conn: ClientInnerRemote<C>,
pub(crate) _last_access: std::time::Instant,
}
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
total_conns: usize,
max_conns: usize,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
global_pool_size_max_conns: usize,
}
impl<C: ClientInnerExt> EndpointConnPool<C> {
fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
let Self {
pools,
total_conns,
global_connections_count,
..
} = self;
pools.get_mut(&db_user).and_then(|pool_entries| {
let (entry, removed) = pool_entries.get_conn_entry(total_conns);
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
entry
})
}
pub(crate) fn remove_client(
&mut self,
db_user: (DbName, RoleName),
conn_id: uuid::Uuid,
) -> bool {
let Self {
pools,
total_conns,
global_connections_count,
..
} = self;
if let Some(pool) = pools.get_mut(&db_user) {
let old_len = pool.conns.len();
pool.conns.retain(|conn| conn.conn.get_conn_id() != conn_id);
let new_len = pool.conns.len();
let removed = old_len - new_len;
if removed > 0 {
global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
}
*total_conns -= removed;
removed > 0
} else {
false
}
}
pub(crate) fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInnerRemote<C>) {
let conn_id = client.get_conn_id();
if client.is_closed() {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
return;
}
let global_max_conn = pool.read().global_pool_size_max_conns;
if pool
.read()
.global_connections_count
.load(atomic::Ordering::Relaxed)
>= global_max_conn
{
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full");
return;
}
// return connection to the pool
let mut returned = false;
let mut per_db_size = 0;
let total_conns = {
let mut pool = pool.write();
if pool.total_conns < pool.max_conns {
let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
pool_entries.conns.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
returned = true;
per_db_size = pool_entries.conns.len();
pool.total_conns += 1;
pool.global_connections_count
.fetch_add(1, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.inc();
}
pool.total_conns
};
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
}
}
impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
fn drop(&mut self) {
if self.total_conns > 0 {
self.global_connections_count
.fetch_sub(self.total_conns, atomic::Ordering::Relaxed);
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(self.total_conns as i64);
}
}
}
pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
pub(crate) conns: Vec<ConnPoolEntry<C>>,
}
impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
fn default() -> Self {
Self { conns: Vec::new() }
}
}
impl<C: ClientInnerExt> DbUserConnPool<C> {
fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
let old_len = self.conns.len();
self.conns.retain(|conn| !conn.conn.is_closed());
let new_len = self.conns.len();
let removed = old_len - new_len;
*conns -= removed;
removed
}
pub(crate) fn get_conn_entry(
&mut self,
conns: &mut usize,
) -> (Option<ConnPoolEntry<C>>, usize) {
let mut removed = self.clear_closed_clients(conns);
let conn = self.conns.pop();
if conn.is_some() {
*conns -= 1;
removed += 1;
}
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(removed as i64);
(conn, removed)
}
}
pub(crate) struct GlobalConnPool<C: ClientInnerExt> {
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
global_pool_size: AtomicUsize,
/// Total number of connections in the pool
global_connections_count: Arc<AtomicUsize>,
config: &'static crate::config::HttpConfig,
}
#[derive(Debug, Clone, Copy)]
pub struct GlobalConnPoolOptions {
// Maximum number of connections per one endpoint.
// Can mix different (dbname, username) connections.
// When running out of free slots for a particular endpoint,
// falls back to opening a new connection for each request.
pub max_conns_per_endpoint: usize,
pub gc_epoch: Duration,
pub pool_shards: usize,
pub idle_timeout: Duration,
pub opt_in: bool,
// Total number of connections in the pool.
pub max_total_conns: usize,
}
impl<C: ClientInnerExt> GlobalConnPool<C> {
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
global_pool: DashMap::with_shard_amount(shards),
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
})
}
#[cfg(test)]
pub(crate) fn get_global_connections_count(&self) -> usize {
self.global_connections_count
.load(atomic::Ordering::Relaxed)
}
pub(crate) fn get_idle_timeout(&self) -> Duration {
self.config.pool_options.idle_timeout
}
pub(crate) fn shutdown(&self) {
// drops all strong references to endpoint-pools
self.global_pool.clear();
}
pub(crate) async fn gc_worker(&self, mut rng: impl Rng) {
let epoch = self.config.pool_options.gc_epoch;
let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
loop {
interval.tick().await;
let shard = rng.gen_range(0..self.global_pool.shards().len());
self.gc(shard);
}
}
pub(crate) fn gc(&self, shard: usize) {
debug!(shard, "pool: performing epoch reclamation");
// acquire a random shard lock
let mut shard = self.global_pool.shards()[shard].write();
let timer = Metrics::get()
.proxy
.http_pool_reclaimation_lag_seconds
.start_timer();
let current_len = shard.len();
let mut clients_removed = 0;
shard.retain(|endpoint, x| {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
let EndpointConnPool {
pools, total_conns, ..
} = pool.get_mut();
// ensure that closed clients are removed
for db_pool in pools.values_mut() {
clients_removed += db_pool.clear_closed_clients(total_conns);
}
// we only remove this pool if it has no active connections
if *total_conns == 0 {
info!("pool: discarding pool for endpoint {endpoint}");
return false;
}
}
true
});
let new_len = shard.len();
drop(shard);
timer.observe();
// Do logging outside of the lock.
if clients_removed > 0 {
let size = self
.global_connections_count
.fetch_sub(clients_removed, atomic::Ordering::Relaxed)
- clients_removed;
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(clients_removed as i64);
info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
}
let removed = current_len - new_len;
if removed > 0 {
let global_pool_size = self
.global_pool_size
.fetch_sub(removed, atomic::Ordering::Relaxed)
- removed;
info!("pool: performed global pool gc. size now {global_pool_size}");
}
}
pub(crate) fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: self.config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
}));
// find or create a pool for this endpoint
let mut created = false;
let pool = self
.global_pool
.entry(endpoint.clone())
.or_insert_with(|| {
created = true;
new_pool
})
.clone();
// log new global pool size
if created {
let global_pool_size = self
.global_pool_size
.fetch_add(1, atomic::Ordering::Relaxed)
+ 1;
info!(
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
);
}
pool
}
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerRemote<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner().get_process_id()),
);
info!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
client.session().send(ctx.session_id())?;
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
Ok(None)
}
}
impl<C: ClientInnerExt> Client<C> {
pub(crate) fn new(
inner: ClientInnerRemote<C>,
conn_info: ConnInfo,
pool: Weak<RwLock<EndpointConnPool<C>>>,
) -> Self {
Self {
inner: Some(inner),
span: Span::current(),
conn_info,
pool,
}
}
pub(crate) fn inner_mut(&mut self) -> (&mut C, Discard<'_, C>) {
let Self {
inner,
pool,
conn_info,
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
let inner_ref = inner.inner_mut();
(inner_ref, Discard { conn_info, pool })
}
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.as_ref().unwrap().aux();
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
}
pub(crate) fn do_drop(&mut self) -> Option<impl FnOnce()> {
let conn_info = self.conn_info.clone();
let client = self
.inner
.take()
.expect("client inner should not be removed");
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
let current_span = self.span.clone();
// return connection to the pool
return Some(move || {
let _span = current_span.enter();
EndpointConnPool::put(&conn_pool, &conn_info, client);
});
}
None
}
}
pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInnerRemote<C>>,
conn_info: ConnInfo,
pool: Weak<RwLock<EndpointConnPool<C>>>,
}
impl<C: ClientInnerExt> Drop for Client<C> {
fn drop(&mut self) {
if let Some(drop) = self.do_drop() {
tokio::task::spawn_blocking(drop);
}
}
}
impl<C: ClientInnerExt> Deref for Client<C> {
type Target = C;
fn deref(&self) -> &Self::Target {
self.inner
.as_ref()
.expect("client inner should not be removed")
.inner()
}
}
pub(crate) trait ClientInnerExt: Sync + Send + 'static {
fn is_closed(&self) -> bool;
fn get_process_id(&self) -> i32;
}
impl ClientInnerExt for tokio_postgres::Client {
fn is_closed(&self) -> bool {
self.is_closed()
}
fn get_process_id(&self) -> i32 {
self.get_process_id()
}
}
pub(crate) struct Discard<'a, C: ClientInnerExt> {
conn_info: &'a ConnInfo,
pool: &'a mut Weak<RwLock<EndpointConnPool<C>>>,
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is not idle");
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
}
}
}

View File

@@ -10,12 +10,11 @@ use rand::Rng;
use tokio::net::TcpStream;
use tracing::{debug, error, info, info_span, Instrument};
use super::conn_pool::ConnInfo;
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
use crate::EndpointCacheKey;
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
@@ -23,15 +22,15 @@ pub(crate) type Connect =
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
pub(crate) struct ConnPoolEntry<C: ClientInnerExt + Clone> {
conn: C,
struct ConnPoolEntry {
conn: Send,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
}
// Per-endpoint connection pool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
pub(crate) struct EndpointConnPool {
// TODO(conrad):
// either we should open more connections depending on stream count
// (not exposed by hyper, need our own counter)
@@ -41,13 +40,13 @@ pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
// seems somewhat redundant though.
//
// Probably we should run a semaphore and just the single conn. TBD.
conns: VecDeque<ConnPoolEntry<C>>,
conns: VecDeque<ConnPoolEntry>,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
}
impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
impl EndpointConnPool {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry> {
let Self { conns, .. } = self;
loop {
@@ -82,7 +81,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
}
}
impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
impl Drop for EndpointConnPool {
fn drop(&mut self) {
if !self.conns.is_empty() {
self.global_connections_count
@@ -96,12 +95,12 @@ impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
}
}
pub(crate) struct GlobalConnPool<C: ClientInnerExt + Clone> {
pub(crate) struct GlobalConnPool {
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool>>>,
/// Number of endpoint-connection pools
///
@@ -116,7 +115,7 @@ pub(crate) struct GlobalConnPool<C: ClientInnerExt + Clone> {
config: &'static crate::config::HttpConfig,
}
impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
impl GlobalConnPool {
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
@@ -211,7 +210,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
self: &Arc<Self>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Option<Client<C>> {
) -> Option<Client> {
let endpoint = conn_info.endpoint_cache_key()?;
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
let client = endpoint_pool.write().get_conn_entry()?;
@@ -229,7 +228,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
) -> Arc<RwLock<EndpointConnPool>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
@@ -269,14 +268,14 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
}
pub(crate) fn poll_http2_client(
global_pool: Arc<GlobalConnPool<Send>>,
global_pool: Arc<GlobalConnPool>,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
client: Send,
connection: Connect,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<Send> {
) -> Client {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let session_id = ctx.session_id();
@@ -323,13 +322,13 @@ pub(crate) fn poll_http2_client(
Client::new(client, aux)
}
pub(crate) struct Client<C: ClientInnerExt + Clone> {
pub(crate) inner: C,
pub(crate) struct Client {
pub(crate) inner: Send,
aux: MetricsAuxInfo,
}
impl<C: ClientInnerExt + Clone> Client<C> {
pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self {
impl Client {
pub(self) fn new(inner: Send, aux: MetricsAuxInfo) -> Self {
Self { inner, aux }
}
@@ -340,14 +339,3 @@ impl<C: ClientInnerExt + Clone> Client<C> {
})
}
}
impl ClientInnerExt for Send {
fn is_closed(&self) -> bool {
self.is_closed()
}
fn get_process_id(&self) -> i32 {
// ideally throw something meaningful
-1
}
}

View File

@@ -20,12 +20,11 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument, Span};
use super::backend::HttpConnError;
use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
use super::conn_pool::{ClientInnerExt, ConnInfo};
use crate::context::RequestMonitoring;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::Metrics;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
use crate::{DbName, RoleName};
struct ConnPoolEntry<C: ClientInnerExt> {
@@ -363,7 +362,7 @@ pub(crate) fn poll_client(
LocalClient::new(inner, conn_info, pool_clone)
}
pub(crate) struct ClientInner<C: ClientInnerExt> {
struct ClientInner<C: ClientInnerExt> {
inner: C,
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
@@ -388,24 +387,13 @@ impl<C: ClientInnerExt> ClientInner<C> {
}
}
impl ClientInner<tokio_postgres::Client> {
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
self.jti += 1;
let token = resign_jwt(&self.key, payload, self.jti)?;
// initiates the auth session
self.inner.simple_query("discard all").await?;
self.inner
.query(
"select auth.jwt_session_init($1)",
&[&token as &(dyn ToSql + Sync)],
)
.await?;
let pid = self.inner.get_process_id();
info!(pid, jti = self.jti, "user session state init");
Ok(())
impl<C: ClientInnerExt> LocalClient<C> {
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.as_ref().unwrap().aux;
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
}
}
@@ -434,18 +422,6 @@ impl<C: ClientInnerExt> LocalClient<C> {
pool,
}
}
pub(crate) fn client_inner(&mut self) -> (&mut ClientInner<C>, Discard<'_, C>) {
let Self {
inner,
pool,
conn_info,
span: _,
} = self;
let inner_m = inner.as_mut().expect("client inner should not be removed");
(inner_m, Discard { conn_info, pool })
}
pub(crate) fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
let Self {
inner,
@@ -458,6 +434,33 @@ impl<C: ClientInnerExt> LocalClient<C> {
}
}
impl LocalClient<tokio_postgres::Client> {
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
let inner = self
.inner
.as_mut()
.expect("client inner should not be removed");
inner.jti += 1;
let token = resign_jwt(&inner.key, payload, inner.jti)?;
// initiates the auth session
inner.inner.simple_query("discard all").await?;
inner
.inner
.query(
"select auth.jwt_session_init($1)",
&[&token as &(dyn ToSql + Sync)],
)
.await?;
let pid = inner.inner.get_process_id();
info!(pid, jti = inner.jti, "user session state init");
Ok(())
}
}
/// implements relatively efficient in-place json object key upserting
///
/// only supports top-level keys
@@ -521,15 +524,24 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
jwt
}
impl<C: ClientInnerExt> LocalClient<C> {
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.as_ref().unwrap().aux;
USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(
"local_pool: throwing away connection '{conn_info}' because connection is not idle"
);
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("local_pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
}
}
}
impl<C: ClientInnerExt> LocalClient<C> {
fn do_drop(&mut self) -> Option<impl FnOnce()> {
let conn_info = self.conn_info.clone();
let client = self
@@ -556,23 +568,6 @@ impl<C: ClientInnerExt> Drop for LocalClient<C> {
}
}
impl<C: ClientInnerExt> Discard<'_, C> {
pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(
"local_pool: throwing away connection '{conn_info}' because connection is not idle"
);
}
}
pub(crate) fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!("local_pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
}
}
}
#[cfg(test)]
mod tests {
use p256::ecdsa::SigningKey;

View File

@@ -5,7 +5,6 @@
mod backend;
pub mod cancel_set;
mod conn_pool;
mod conn_pool_lib;
mod http_conn_pool;
mod http_util;
mod json;
@@ -21,7 +20,7 @@ use anyhow::Context;
use async_trait::async_trait;
use atomic_take::AtomicTake;
use bytes::Bytes;
pub use conn_pool_lib::GlobalConnPoolOptions;
pub use conn_pool::GlobalConnPoolOptions;
use futures::future::{select, Either};
use futures::TryFutureExt;
use http::{Method, Response, StatusCode};
@@ -66,7 +65,7 @@ pub async fn task_main(
}
let local_pool = local_conn_pool::LocalConnPool::new(&config.http_config);
let conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config);
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
tokio::spawn(async move {

View File

@@ -25,11 +25,10 @@ use urlencoding;
use utils::http::error::ApiError;
use super::backend::{LocalProxyConnError, PoolingBackend};
use super::conn_pool::{AuthData, ConnInfoWithAuth};
use super::conn_pool_lib::{self, ConnInfo};
use super::conn_pool::{AuthData, ConnInfo, ConnInfoWithAuth};
use super::http_util::json_response;
use super::json::{json_to_pg_text, pg_text_row_to_json, JsonConversionError};
use super::local_conn_pool;
use super::{conn_pool, local_conn_pool};
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
use crate::auth::{endpoint_sni, ComputeUserInfoParseError};
use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
@@ -38,7 +37,6 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{run_until_cancelled, NeonOptions};
use crate::serverless::backend::HttpConnError;
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
use crate::{DbName, RoleName};
@@ -609,8 +607,7 @@ async fn handle_db_inner(
let client = match keys.keys {
ComputeCredentialKeys::JwtPayload(payload) if is_local_proxy => {
let mut client = backend.connect_to_local_postgres(ctx, conn_info).await?;
let (cli_inner, _dsc) = client.client_inner();
cli_inner.set_jwt_session(&payload).await?;
client.set_jwt_session(&payload).await?;
Client::Local(client)
}
_ => {
@@ -1024,12 +1021,12 @@ async fn query_to_json<T: GenericClient>(
}
enum Client {
Remote(conn_pool_lib::Client<tokio_postgres::Client>),
Remote(conn_pool::Client<tokio_postgres::Client>),
Local(local_conn_pool::LocalClient<tokio_postgres::Client>),
}
enum Discard<'a> {
Remote(conn_pool_lib::Discard<'a, tokio_postgres::Client>),
Remote(conn_pool::Discard<'a, tokio_postgres::Client>),
Local(local_conn_pool::Discard<'a, tokio_postgres::Client>),
}
@@ -1044,7 +1041,7 @@ impl Client {
fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
match self {
Client::Remote(client) => {
let (c, d) = client.inner_mut();
let (c, d) = client.inner();
(c, Discard::Remote(d))
}
Client::Local(local_client) => {

View File

@@ -1,5 +0,0 @@
#!/bin/bash
cargo neon endpoint stop main
cargo neon endpoint start main --create-test-user true

View File

@@ -498,18 +498,21 @@ impl WalAcceptor {
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();
while let Some(mut next_msg) = self.msg_rx.recv().await {
loop {
let opt_msg = self.msg_rx.recv().await;
if opt_msg.is_none() {
return Ok(()); // chan closed, streaming terminated
}
let mut next_msg = opt_msg.unwrap();
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// Loop through AppendRequests while available to write as many WAL records as
// possible without fsyncing.
//
// Make sure the WAL is flushed before returning, see:
// https://github.com/neondatabase/neon/issues/9259
// loop through AppendRequest's while it's readily available to
// write as many WAL as possible without fsyncing
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
@@ -519,7 +522,7 @@ impl WalAcceptor {
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
if self.reply_tx.send(reply).await.is_err() {
break; // disconnected, flush WAL and return on next send/recv
return Ok(()); // chan closed, streaming terminated
}
}
@@ -528,13 +531,11 @@ impl WalAcceptor {
break;
}
// continue pulling AppendRequests if available
match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
// on disconnect, flush WAL and return on next send/recv
Err(TryRecvError::Disconnected) => break,
};
Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
}
}
// flush all written WAL to the disk
@@ -554,6 +555,5 @@ impl WalAcceptor {
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}
Ok(())
}
}

View File

@@ -2862,12 +2862,17 @@ impl Service {
let _tenant_lock =
trace_exclusive_lock(&self.tenant_op_locks, tenant_id, TenantOperations::Delete).await;
// Detach all shards. This also deletes local pageserver shard data.
let (detach_waiters, node) = {
// Detach all shards
let (detach_waiters, shard_ids, node) = {
let mut shard_ids = Vec::new();
let mut detach_waiters = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (_, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
for (tenant_shard_id, shard) in
tenants.range_mut(TenantShardId::tenant_range(tenant_id))
{
shard_ids.push(*tenant_shard_id);
// Update the tenant's intent to remove all attachments
shard.policy = PlacementPolicy::Detached;
shard
@@ -2887,7 +2892,7 @@ impl Service {
let node = nodes
.get(&node_id)
.expect("Pageservers may not be deleted while lock is active");
(detach_waiters, node.clone())
(detach_waiters, shard_ids, node.clone())
};
// This reconcile wait can fail in a few ways:
@@ -2902,34 +2907,38 @@ impl Service {
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
.await?;
// Delete the entire tenant (all shards) from remote storage via a random pageserver.
// Passing an unsharded tenant ID will cause the pageserver to remove all remote paths with
// the tenant ID prefix, including all shards (even possibly stale ones).
match node
.with_client_retries(
|client| async move {
client
.tenant_delete(TenantShardId::unsharded(tenant_id))
.await
},
&self.config.jwt_token,
1,
3,
RECONCILE_TIMEOUT,
&self.cancel,
)
.await
.unwrap_or(Err(mgmt_api::Error::Cancelled))
{
Ok(_) => {}
Err(mgmt_api::Error::Cancelled) => {
return Err(ApiError::ShuttingDown);
}
Err(e) => {
// This is unexpected: remote deletion should be infallible, unless the object store
// at large is unavailable.
tracing::error!("Error deleting via node {node}: {e}");
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
let locations = shard_ids
.into_iter()
.map(|s| (s, node.clone()))
.collect::<Vec<_>>();
let results = self.tenant_for_shards_api(
locations,
|tenant_shard_id, client| async move { client.tenant_delete(tenant_shard_id).await },
1,
3,
RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
for result in results {
match result {
Ok(StatusCode::ACCEPTED) => {
// This should never happen: we waited for detaches to finish above
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Unexpectedly still attached on {}",
node
)));
}
Ok(_) => {}
Err(mgmt_api::Error::Cancelled) => {
return Err(ApiError::ShuttingDown);
}
Err(e) => {
// This is unexpected: remote deletion should be infallible, unless the object store
// at large is unavailable.
tracing::error!("Error deleting via node {}: {e}", node);
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
}
}
}

View File

@@ -138,7 +138,7 @@ pub struct ProjectData {
pub name: String,
pub region_id: String,
pub platform_id: String,
pub user_id: Option<String>,
pub user_id: String,
pub pageserver_id: Option<u64>,
#[serde(deserialize_with = "from_nullable_id")]
pub tenant: TenantId,

View File

@@ -16,13 +16,13 @@ use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePat
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::TenantId};
use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES,
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
#[derive(Serialize, Deserialize, Debug)]
@@ -250,16 +250,13 @@ async fn find_garbage_inner(
&target.tenant_root(&tenant_shard_id),
)
.await?;
if let Some(object) = tenant_objects.keys.first() {
if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
continue;
} else {
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key);
}
let object = tenant_objects.keys.first().unwrap();
if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
continue;
} else {
tracing::info!("Tenant {tenant_shard_id} is missing in console appears to have been deleted while we ran");
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key);
}
} else {
// A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
@@ -409,17 +406,14 @@ pub async fn get_tenant_objects(
// TODO: apply extra validation based on object modification time. Don't purge
// tenants where any timeline's index_part.json has been touched recently.
let cancel = CancellationToken::new();
let list = backoff::retry(
|| s3_client.list(Some(&tenant_root), ListingMode::NoDelimiter, None, &cancel),
|_| false,
3,
MAX_RETRIES as u32,
"get_tenant_objects",
&cancel,
)
.await
.expect("dummy cancellation token")?;
let list = s3_client
.list(
Some(&tenant_root),
ListingMode::NoDelimiter,
None,
&CancellationToken::new(),
)
.await?;
Ok(list.keys)
}
@@ -430,25 +424,14 @@ pub async fn get_timeline_objects(
tracing::debug!("Listing objects in timeline {ttid}");
let timeline_root = super::remote_timeline_path_id(&ttid);
let cancel = CancellationToken::new();
let list = backoff::retry(
|| {
s3_client.list(
Some(&timeline_root),
ListingMode::NoDelimiter,
None,
&cancel,
)
},
|_| false,
3,
MAX_RETRIES as u32,
"get_timeline_objects",
&cancel,
)
.await
.expect("dummy cancellation token")?;
let list = s3_client
.list(
Some(&timeline_root),
ListingMode::NoDelimiter,
None,
&CancellationToken::new(),
)
.await?;
Ok(list.keys)
}

View File

@@ -6,7 +6,7 @@ Prerequisites:
- Correctly configured Python, see [`/docs/sourcetree.md`](/docs/sourcetree.md#using-python)
- Neon and Postgres binaries
- See the root [README.md](/README.md) for build directions
To run tests you need to add `--features testing` to Rust code build commands.
If you want to test tests with test-only APIs, you would need to add `--features testing` to Rust code build commands.
For convenience, repository cargo config contains `build_testing` alias, that serves as a subcommand, adding the required feature flags.
Usage example: `cargo build_testing --release` is equivalent to `cargo build --features testing --release`
- Tests can be run from the git tree; or see the environment variables

View File

@@ -303,10 +303,9 @@ def assert_prefix_empty(
remote_storage: Optional[RemoteStorage],
prefix: Optional[str] = None,
allowed_postfix: Optional[str] = None,
delimiter: str = "/",
) -> None:
assert remote_storage is not None
response = list_prefix(remote_storage, prefix, delimiter)
response = list_prefix(remote_storage, prefix)
keys = response["KeyCount"]
objects: list[ObjectTypeDef] = response.get("Contents", [])
common_prefixes = response.get("CommonPrefixes", [])
@@ -339,18 +338,16 @@ def assert_prefix_empty(
if not (allowed_postfix.endswith(key)):
filtered_count += 1
assert filtered_count == 0, f"remote prefix {prefix} is not empty: {objects}"
assert (
filtered_count == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
# remote_storage must not be None, but that's easier for callers to make mypy happy
def assert_prefix_not_empty(
remote_storage: Optional[RemoteStorage],
prefix: Optional[str] = None,
delimiter: str = "/",
):
def assert_prefix_not_empty(remote_storage: Optional[RemoteStorage], prefix: Optional[str] = None):
assert remote_storage is not None
response = list_prefix(remote_storage, prefix)
assert response["KeyCount"] != 0, f"remote prefix {prefix} is empty: {response}"
assert response["KeyCount"] != 0, f"remote dir with prefix {prefix} is empty: {response}"
def list_prefix(

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
NeonEnvBuilder,
logical_replication_sync,
)
def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
client = env.pageserver.http_client()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
tenant_config = client.tenant_config(tenant_id).effective_config
tenant_config["switch_aux_file_policy"] = AuxFileStore.V2
client.set_tenant_config(tenant_id, tenant_config)
# aux file v2 is enabled on the write path, so for now, it should be unset (or null)
assert (
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)["last_aux_file_policy"]
is None
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("create table t(pk integer primary key, payload integer)")
cur.execute(
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));"
)
cur.execute("create publication pub1 for table t, replication_example")
# now start subscriber, aux files will be created at this point. TODO: find better ways of testing aux files (i.e., neon_test_utils)
# instead of going through the full logical replication process.
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(pk integer primary key, payload integer)")
vanilla_pg.safe_psql(
"CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120), testcolumn1 int, testcolumn2 int, testcolumn3 int);"
)
connstr = endpoint.connstr().replace("'", "''")
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
endpoint.stop()
with env.pageserver.http_client() as client:
# aux file v2 flag should be enabled at this point
assert (
client.timeline_detail(tenant_id, timeline_id)["last_aux_file_policy"]
== AuxFileStore.V2
)
with env.pageserver.http_client() as client:
tenant_config = client.tenant_config(tenant_id).effective_config
tenant_config["switch_aux_file_policy"] = "V1"
client.set_tenant_config(tenant_id, tenant_config)
# the flag should still be enabled
assert (
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
"last_aux_file_policy"
]
== AuxFileStore.V2
)
env.pageserver.restart()
with env.pageserver.http_client() as client:
# aux file v2 flag should be persisted
assert (
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)[
"last_aux_file_policy"
]
== AuxFileStore.V2
)

View File

@@ -0,0 +1,38 @@
import threading
from fixtures.neon_fixtures import NeonEnv
def test_compute_restart(neon_simple_env: NeonEnv):
env = neon_simple_env
env.create_branch("publisher")
pub = env.endpoints.create("publisher")
pub.start()
sub_timeline_id = env.create_branch("subscriber")
sub = env.endpoints.create("subscriber")
sub.start()
n_records = 100000
n_restarts = 200
def insert_data(pub):
with pub.cursor() as pcur:
for i in range(0, n_records):
pcur.execute("INSERT into t values (%s,random()*100000)", (i,))
with pub.cursor() as pcur:
with sub.cursor() as scur:
pcur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
thread = threading.Thread(target=insert_data, args=(pub,), daemon=True)
thread.start()
for _ in range(n_restarts):
# restart subscriber
# time.sleep(2)
sub.stop("immediate", sks_wait_walreceiver_gone=(env.safekeepers, sub_timeline_id))
sub.start()
thread.join()

View File

@@ -20,7 +20,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.utils import run_pg_bench_small, wait_until
from fixtures.workload import Workload
from requests.exceptions import ReadTimeout
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -405,57 +404,3 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, make_httpserver, neon_env_builder
cloud_admin_api_token=cloud_admin_token,
)
assert healthy
def test_tenant_delete_stale_shards(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Deleting a tenant should also delete any stale (pre-split) shards from remote storage.
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
# Create an unsharded tenant.
tenant_id, timeline_id = env.create_tenant()
# Write some data.
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
workload.write_rows(256)
workload.validate()
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix="/".join(("tenants", str(tenant_id))),
)
# Upload a heatmap as well.
env.pageserver.http_client().tenant_heatmap_upload(tenant_id)
# Split off a few shards, in two rounds.
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
env.storage_controller.tenant_shard_split(tenant_id, shard_count=16)
# Delete the tenant. This should also delete data for the unsharded and count=4 parents.
env.storage_controller.pageserver_api().tenant_delete(tenant_id=tenant_id)
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix="/".join(("tenants", str(tenant_id))),
delimiter="", # match partial prefixes, i.e. all shards
)
dirs = list(env.pageserver.tenant_dir(None).glob(f"{tenant_id}*"))
assert dirs == [], f"found tenant directories: {dirs}"
# The initial tenant created by the test harness should still be there.
# Only the tenant we deleted should be removed.
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix="/".join(("tenants", str(env.initial_tenant))),
)
dirs = list(env.pageserver.tenant_dir(None).glob(f"{env.initial_tenant}*"))
assert dirs != [], "missing initial tenant directory"
env.stop()

View File

@@ -119,10 +119,6 @@ def test_timeline_archive(neon_env_builder: NeonEnvBuilder, shard_count: int):
@pytest.mark.parametrize("manual_offload", [False, True])
def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: bool):
if not manual_offload:
# (automatic) timeline offloading defaults to false for now
neon_env_builder.pageserver_config_override = "timeline_offloading = true"
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()