Compare commits

...

5 Commits

Author SHA1 Message Date
Aleksandr Sarantsev
553e39c277 Change mark_invisible 2025-06-13 23:01:55 +04:00
Aleksandr Sarantsev
945ebc8411 Rename tenant shard request param 2025-06-13 21:42:22 +04:00
Alexander Sarantcev
d63815fa40 Fix ChaosInjector shard eligibility bug (#12231)
## Problem

ChaosInjector is intended to skip non-active scheduling policies, but
the current logic skips active shards instead.

## Summary of changes

- Fixed shard eligibility condition to correctly allow chaos injection
for shards with an Active scheduling policy.
2025-06-13 13:34:29 +00:00
Dmitrii Kovalkov
385324ee8a pageserver: fix post-merge PR comments on basebackup cache (#12216)
## Problem
This PR addresses all but the direct IO post-merge comments on
basebackup cache implementation.

- Follow up on
https://github.com/neondatabase/neon/pull/11989#pullrequestreview-2867966119
- Part of https://github.com/neondatabase/cloud/issues/29353

## Summary of changes
- Clean up the tmp directory by recreating it.
- Recreate the tmp directory on startup.
- Add comments why it's safe to not fsync the inode after renaming.
2025-06-13 08:49:31 +00:00
Alex Chi Z.
8a68d463f6 feat(pagectl): no max key limit if time travel recover locally (#12222)
## Problem

We would easily hit this limit for a tenant running for enough long
time.

## Summary of changes

Remove the max key limit for time-travel recovery if the command is
running locally.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-13 08:41:10 +00:00
11 changed files with 71 additions and 44 deletions

View File

@@ -824,6 +824,7 @@ impl RemoteStorage for AzureBlobStorage {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
.to_string()

View File

@@ -440,6 +440,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError>;
}
@@ -651,22 +652,23 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
match self {
Self::LocalFs(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::AwsS3(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::AzureBlob(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::Unreliable(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
}

View File

@@ -610,6 +610,7 @@ impl RemoteStorage for LocalFs {
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}

View File

@@ -981,22 +981,16 @@ impl RemoteStorage for S3Bucket {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
.list_versions_with_permit(&permit, prefix, mode, complexity_limit, cancel)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),

View File

@@ -240,11 +240,12 @@ impl RemoteStorage for UnreliableWrapper {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(TimeTravelError::Other)?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
}

View File

@@ -157,7 +157,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, &cancel)
.time_travel_recover(None, t2, t_final, &cancel, None)
.await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
@@ -173,7 +173,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, &cancel)
.time_travel_recover(None, t1, t_final, &cancel, None)
.await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
@@ -189,7 +189,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, &cancel)
.time_travel_recover(None, t0, t_final, &cancel, None)
.await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");

View File

@@ -176,9 +176,11 @@ async fn main() -> anyhow::Result<()> {
let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
let cancel = CancellationToken::new();
// Complexity limit: as we are running this command locally, we should have a lot of memory available, and we do not
// need to limit the number of versions we are going to delete.
storage
.unwrap()
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel)
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel, None)
.await?;
}
Commands::Key(dkc) => dkc.execute(),

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
@@ -167,14 +168,17 @@ impl BasebackupCache {
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn tmp_dir(&self) -> Utf8PathBuf {
self.data_dir.join("tmp")
}
fn entry_tmp_path(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Utf8PathBuf {
self.data_dir
.join("tmp")
self.tmp_dir()
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
@@ -194,15 +198,18 @@ impl BasebackupCache {
Some((tenant_id, timeline_id, lsn))
}
async fn cleanup(&self) -> anyhow::Result<()> {
// Cleanup tmp directory.
let tmp_dir = self.data_dir.join("tmp");
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
while let Some(dir_entry) = tmp_dir.next_entry().await? {
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
}
// Recreate the tmp directory to clear all files in it.
async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
let tmp_dir = self.tmp_dir();
if tmp_dir.exists() {
tokio::fs::remove_dir_all(&tmp_dir).await?;
}
tokio::fs::create_dir_all(&tmp_dir).await?;
Ok(())
}
async fn cleanup(&self) -> anyhow::Result<()> {
self.clean_tmp_dir().await?;
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
@@ -241,16 +248,14 @@ impl BasebackupCache {
}
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir and tmp directory if they do not exist.
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
// Create data_dir if it does not exist.
tokio::fs::create_dir_all(&self.data_dir)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create basebackup cache data_dir {:?}: {:?}",
self.data_dir,
e
)
})?;
.context("Failed to create basebackup cache data directory")?;
self.clean_tmp_dir()
.await
.context("Failed to clean tmp directory")?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::new();
@@ -451,6 +456,11 @@ impl BasebackupCache {
}
// Move the tmp file to the final location atomically.
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
// in the main directory.
// It's not necessary to fsync the inode after renaming, because the worst case is that
// the rename operation will be rolled back on the disk failure, the entry will disappear
// from the main directory, and the entry access will cause a cache miss.
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
@@ -468,16 +478,17 @@ impl BasebackupCache {
}
/// Prepares a basebackup in a temporary file.
/// Guarantees that the tmp file is fsynced before returning.
async fn prepare_basebackup_tmp(
&self,
emptry_tmp_path: &Utf8Path,
entry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
let file = tokio::fs::File::create(emptry_tmp_path).await?;
let file = tokio::fs::File::create(entry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(

View File

@@ -1,6 +1,7 @@
//! Helper functions to upload files to remote storage with a RemoteStorage
use std::io::{ErrorKind, SeekFrom};
use std::num::NonZeroU32;
use std::time::SystemTime;
use anyhow::{Context, bail};
@@ -228,11 +229,25 @@ pub(crate) async fn time_travel_recover_tenant(
let timelines_path = super::remote_timelines_path(tenant_shard_id);
prefixes.push(timelines_path);
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
for prefix in &prefixes {
backoff::retry(
|| async {
storage
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
.time_travel_recover(
Some(prefix),
timestamp,
done_if_after,
cancel,
COMPLEXITY_LIMIT,
)
.await
},
|e| !matches!(e, TimeTravelError::Other(_)),

View File

@@ -676,7 +676,7 @@ async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
@@ -2470,7 +2470,7 @@ pub fn make_router(
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
.get("/v1/tenant/:tenant_shard_id", |r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
@@ -2480,7 +2480,7 @@ pub fn make_router(
// The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
// are implicitly exposed here. This must be last in the list to avoid
// taking precedence over other GET methods we might implement by hand.
.get("/v1/tenant/:tenant_id/*", |r| {
.get("/v1/tenant/:tenant_shard_id/*", |r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
@@ -2489,7 +2489,7 @@ pub fn make_router(
})
// Tenant timeline mark_invisible passthrough to shard zero
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|r| {
tenant_service_handler(
r,

View File

@@ -107,7 +107,7 @@ impl ChaosInjector {
// - Skip shards doing a graceful migration already, so that we allow these to run to
// completion rather than only exercising the first part and then cancelling with
// some other chaos.
!matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
&& shard.get_preferred_node().is_none()
}