mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 20:32:56 +00:00
Compare commits
166 Commits
erik/respo
...
remove_rem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
396e6a4b62 | ||
|
|
3295d6245a | ||
|
|
a4bfdd8013 | ||
|
|
cd869e5737 | ||
|
|
544136f13f | ||
|
|
1cbb7fccaa | ||
|
|
9ab35cc5d2 | ||
|
|
bb03c74217 | ||
|
|
774f34d778 | ||
|
|
f5a171076b | ||
|
|
881fdbc04c | ||
|
|
8a27e58894 | ||
|
|
b4c81f7dff | ||
|
|
9151b71b19 | ||
|
|
8348bc9b1a | ||
|
|
6e47438f43 | ||
|
|
579e85e92d | ||
|
|
53d2b48ea2 | ||
|
|
ac6604b6ed | ||
|
|
91b64427ed | ||
|
|
ffe0f90083 | ||
|
|
59b5a55dbf | ||
|
|
b9290c7005 | ||
|
|
302a58e8ea | ||
|
|
def51361ae | ||
|
|
d67d4b3eee | ||
|
|
cd1b548a8f | ||
|
|
282372aa5a | ||
|
|
1f0cd3b50e | ||
|
|
4973419a38 | ||
|
|
44ef584842 | ||
|
|
55c42da91b | ||
|
|
5c343af807 | ||
|
|
87ecb2e6ca | ||
|
|
c659d0f218 | ||
|
|
9f7688b1d2 | ||
|
|
3edff352b5 | ||
|
|
08680f6591 | ||
|
|
55105ad1c3 | ||
|
|
df328758f0 | ||
|
|
d5ac61d566 | ||
|
|
355ea43ac7 | ||
|
|
6f0ab326b4 | ||
|
|
c06a4fb511 | ||
|
|
9a714ac6b8 | ||
|
|
8c21edc9c5 | ||
|
|
6ff324a12d | ||
|
|
090f9a5a80 | ||
|
|
74aefa0b07 | ||
|
|
ce1abef0bd | ||
|
|
53eacacb6b | ||
|
|
d40b9a515a | ||
|
|
7c2f687bd6 | ||
|
|
effc151244 | ||
|
|
bdfc895642 | ||
|
|
96161c8cfd | ||
|
|
da99399d16 | ||
|
|
7eb74d3720 | ||
|
|
7b39681caf | ||
|
|
5ff5c580ad | ||
|
|
6ccc6cbc69 | ||
|
|
2ecf6727c5 | ||
|
|
f957616f1c | ||
|
|
b154a5e908 | ||
|
|
c66e859bcc | ||
|
|
1559ef36af | ||
|
|
ef1c3d3914 | ||
|
|
83e28083b0 | ||
|
|
0950f8c752 | ||
|
|
41d36b65e2 | ||
|
|
d8cb81118a | ||
|
|
ecf34bb3e4 | ||
|
|
fb4d404553 | ||
|
|
450f79b3f5 | ||
|
|
a47b7d1d4c | ||
|
|
b01022d8df | ||
|
|
0155ff95e7 | ||
|
|
46b6a1a5e8 | ||
|
|
290f121b59 | ||
|
|
786ddeff62 | ||
|
|
732e155b8e | ||
|
|
e10c5b0a9b | ||
|
|
b608eaa410 | ||
|
|
7b2ae073f0 | ||
|
|
d4a7bdad55 | ||
|
|
96c9fd330c | ||
|
|
39b85cc6fd | ||
|
|
eccb868a50 | ||
|
|
de93c70f2f | ||
|
|
72430eb539 | ||
|
|
e2443a0147 | ||
|
|
e5d00b6c2a | ||
|
|
dc97970215 | ||
|
|
bc5a643c19 | ||
|
|
b1134f6857 | ||
|
|
04ab9b78fe | ||
|
|
fa0b881c4c | ||
|
|
106bda1ef9 | ||
|
|
c2de71e1fe | ||
|
|
3eba531c3d | ||
|
|
088bea8680 | ||
|
|
a9b0ac92bc | ||
|
|
3045956ddd | ||
|
|
599069b612 | ||
|
|
54873844c2 | ||
|
|
dfdd41a771 | ||
|
|
12763ca312 | ||
|
|
4c80c8c1ab | ||
|
|
acd2e7f222 | ||
|
|
a6b6dd2f36 | ||
|
|
0fd14ad74b | ||
|
|
52eaa52573 | ||
|
|
1e33692c1c | ||
|
|
f82ba477a4 | ||
|
|
761644af25 | ||
|
|
cd12d97ba7 | ||
|
|
1e4ded860c | ||
|
|
a0f29853b3 | ||
|
|
c4cdf747f8 | ||
|
|
e658f16810 | ||
|
|
a4b4305422 | ||
|
|
d8807eb651 | ||
|
|
1500f711f3 | ||
|
|
dafa42eb71 | ||
|
|
b5e5ead2ee | ||
|
|
24251b8d17 | ||
|
|
e6378197a7 | ||
|
|
4bb0cc2fe4 | ||
|
|
9304f42ea5 | ||
|
|
18f4eb2622 | ||
|
|
2caa8bcc23 | ||
|
|
bb222abde1 | ||
|
|
dd2b4ad26f | ||
|
|
b65cb8ea05 | ||
|
|
3a7efc10a0 | ||
|
|
a682de1dba | ||
|
|
45a542c335 | ||
|
|
bdb98b288e | ||
|
|
a06c8e9add | ||
|
|
18eefd61eb | ||
|
|
a2de0574b5 | ||
|
|
aa8e954197 | ||
|
|
30847e59b9 | ||
|
|
d930a581f8 | ||
|
|
931c22545b | ||
|
|
366f3c8ff8 | ||
|
|
26c39d7b4c | ||
|
|
6ffa5138ce | ||
|
|
975e1558cc | ||
|
|
82a955ebfe | ||
|
|
82596f8807 | ||
|
|
e3e57579a1 | ||
|
|
56d551e6a3 | ||
|
|
b4a0f8baf7 | ||
|
|
2e686ed6ea | ||
|
|
c46f72d411 | ||
|
|
ed46713e5c | ||
|
|
c228cb7b3f | ||
|
|
c88e4a0974 | ||
|
|
b71a2f4cb2 | ||
|
|
98a7b090de | ||
|
|
235a8cbd28 | ||
|
|
6afeb3c6f7 | ||
|
|
8c42aeac9f | ||
|
|
f36658ac10 | ||
|
|
f8227da9da |
@@ -68,6 +68,8 @@ pub mod completion;
|
||||
/// Reporting utilities
|
||||
pub mod error;
|
||||
|
||||
pub mod sync;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
1
libs/utils/src/sync.rs
Normal file
1
libs/utils/src/sync.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod heavier_once_cell;
|
||||
306
libs/utils/src/sync/heavier_once_cell.rs
Normal file
306
libs/utils/src/sync/heavier_once_cell.rs
Normal file
@@ -0,0 +1,306 @@
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
|
||||
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
|
||||
/// for the duration of initialization.
|
||||
///
|
||||
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].
|
||||
///
|
||||
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
|
||||
pub struct OnceCell<T> {
|
||||
inner: Mutex<Inner<T>>,
|
||||
}
|
||||
|
||||
impl<T> Default for OnceCell<T> {
|
||||
/// Create new uninitialized [`OnceCell`].
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Semaphore is the current state:
|
||||
/// - open semaphore means the value is `None`, not yet initialized
|
||||
/// - closed semaphore means the value has been initialized
|
||||
#[derive(Debug)]
|
||||
struct Inner<T> {
|
||||
init_semaphore: Arc<Semaphore>,
|
||||
value: Option<T>,
|
||||
}
|
||||
|
||||
impl<T> Default for Inner<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
init_semaphore: Arc::new(Semaphore::new(1)),
|
||||
value: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> OnceCell<T> {
|
||||
/// Creates an already initialized `OnceCell` with the given value.
|
||||
pub fn new(value: T) -> Self {
|
||||
let sem = Semaphore::new(1);
|
||||
sem.close();
|
||||
Self {
|
||||
inner: Mutex::new(Inner {
|
||||
init_semaphore: Arc::new(sem),
|
||||
value: Some(value),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a guard to an existing initialized value, or uniquely initializes the value before
|
||||
/// returning the guard.
|
||||
///
|
||||
/// Initializing might wait on any existing [`Guard::take_and_deinit`] deinitialization.
|
||||
///
|
||||
/// Initialization is panic-safe and cancellation-safe.
|
||||
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
Fut: std::future::Future<Output = Result<T, E>>,
|
||||
{
|
||||
let sem = {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
if guard.value.is_some() {
|
||||
return Ok(Guard(guard));
|
||||
}
|
||||
guard.init_semaphore.clone()
|
||||
};
|
||||
|
||||
let permit = sem.acquire_owned().await;
|
||||
if permit.is_err() {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
assert!(
|
||||
guard.value.is_some(),
|
||||
"semaphore got closed, must be initialized"
|
||||
);
|
||||
return Ok(Guard(guard));
|
||||
} else {
|
||||
// now we try
|
||||
let value = factory().await?;
|
||||
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
assert!(
|
||||
guard.value.is_none(),
|
||||
"we won permit, must not be initialized"
|
||||
);
|
||||
guard.value = Some(value);
|
||||
guard.init_semaphore.close();
|
||||
Ok(Guard(guard))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a guard to an existing initialized value, if any.
|
||||
pub fn get(&self) -> Option<Guard<'_, T>> {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
if guard.value.is_some() {
|
||||
Some(Guard(guard))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,
|
||||
/// initialized value.
|
||||
#[derive(Debug)]
|
||||
pub struct Guard<'a, T>(MutexGuard<'a, Inner<T>>);
|
||||
|
||||
impl<T> std::ops::Deref for Guard<'_, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0
|
||||
.value
|
||||
.as_ref()
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::ops::DerefMut for Guard<'_, T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0
|
||||
.value
|
||||
.as_mut()
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Guard<'a, T> {
|
||||
/// Take the current value, and a new permit for it's deinitialization.
|
||||
///
|
||||
/// The permit will be on a semaphore part of the new internal value, and any following
|
||||
/// [`OnceCell::get_or_init`] will wait on it to complete.
|
||||
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
|
||||
let mut swapped = Inner::default();
|
||||
let permit = swapped
|
||||
.init_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.expect("we just created this");
|
||||
std::mem::swap(&mut *self.0, &mut swapped);
|
||||
swapped
|
||||
.value
|
||||
.map(|v| (v, permit))
|
||||
.expect("guard is not created unless value has been initialized")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn many_initializers() {
|
||||
#[derive(Default, Debug)]
|
||||
struct Counters {
|
||||
factory_got_to_run: AtomicUsize,
|
||||
future_polled: AtomicUsize,
|
||||
winners: AtomicUsize,
|
||||
}
|
||||
|
||||
let initializers = 100;
|
||||
|
||||
let cell = Arc::new(OnceCell::default());
|
||||
let counters = Arc::new(Counters::default());
|
||||
let barrier = Arc::new(tokio::sync::Barrier::new(initializers + 1));
|
||||
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
for i in 0..initializers {
|
||||
js.spawn({
|
||||
let cell = cell.clone();
|
||||
let counters = counters.clone();
|
||||
let barrier = barrier.clone();
|
||||
|
||||
async move {
|
||||
barrier.wait().await;
|
||||
let won = {
|
||||
let g = cell
|
||||
.get_or_init(|| {
|
||||
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
|
||||
async {
|
||||
counters.future_polled.fetch_add(1, Ordering::Relaxed);
|
||||
Ok::<_, Infallible>(i)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
*g == i
|
||||
};
|
||||
|
||||
if won {
|
||||
counters.winners.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
barrier.wait().await;
|
||||
|
||||
while let Some(next) = js.join_next().await {
|
||||
next.expect("no panics expected");
|
||||
}
|
||||
|
||||
let mut counters = Arc::try_unwrap(counters).unwrap();
|
||||
|
||||
assert_eq!(*counters.factory_got_to_run.get_mut(), 1);
|
||||
assert_eq!(*counters.future_polled.get_mut(), 1);
|
||||
assert_eq!(*counters.winners.get_mut(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn reinit_waits_for_deinit() {
|
||||
// with he tokio::time paused, we will "sleep" for 1s while holding the reinitialization
|
||||
let sleep_for = Duration::from_secs(1);
|
||||
let initial = 42;
|
||||
let reinit = 1;
|
||||
let cell = Arc::new(OnceCell::new(initial));
|
||||
|
||||
let deinitialization_started = Arc::new(tokio::sync::Barrier::new(2));
|
||||
|
||||
let jh = tokio::spawn({
|
||||
let cell = cell.clone();
|
||||
let deinitialization_started = deinitialization_started.clone();
|
||||
async move {
|
||||
let (answer, _permit) = cell.get().expect("initialized to value").take_and_deinit();
|
||||
assert_eq!(answer, initial);
|
||||
|
||||
deinitialization_started.wait().await;
|
||||
tokio::time::sleep(sleep_for).await;
|
||||
}
|
||||
});
|
||||
|
||||
deinitialization_started.wait().await;
|
||||
|
||||
let started_at = tokio::time::Instant::now();
|
||||
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let elapsed = started_at.elapsed();
|
||||
assert!(
|
||||
elapsed >= sleep_for,
|
||||
"initialization should had taken at least the time time slept with permit"
|
||||
);
|
||||
|
||||
jh.await.unwrap();
|
||||
|
||||
assert_eq!(*cell.get().unwrap(), reinit);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialization_attemptable_until_ok() {
|
||||
let cell = OnceCell::default();
|
||||
|
||||
for _ in 0..10 {
|
||||
cell.get_or_init(|| async { Err("whatever error") })
|
||||
.await
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
let g = cell
|
||||
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(*g, "finally success");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn initialization_is_cancellation_safe() {
|
||||
let cell = OnceCell::default();
|
||||
|
||||
let barrier = tokio::sync::Barrier::new(2);
|
||||
|
||||
let initializer = cell.get_or_init(|| async {
|
||||
barrier.wait().await;
|
||||
futures::future::pending::<()>().await;
|
||||
|
||||
Ok::<_, Infallible>("never reached")
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
_ = initializer => { unreachable!("cannot complete; stuck in pending().await") },
|
||||
_ = barrier.wait() => {}
|
||||
};
|
||||
|
||||
// now initializer is dropped
|
||||
|
||||
assert!(cell.get().is_none());
|
||||
|
||||
let g = cell
|
||||
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(*g, "now initialized");
|
||||
}
|
||||
}
|
||||
@@ -60,7 +60,11 @@ use utils::serde_percent::Percent;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{self, storage_layer::PersistentLayer, timeline::EvictionError, Timeline},
|
||||
tenant::{
|
||||
self,
|
||||
storage_layer::{AsLayerDesc, EvictionError, Layer},
|
||||
Timeline,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -108,7 +112,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
_ = background_jobs_barrier.wait() => { }
|
||||
};
|
||||
|
||||
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
|
||||
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
@@ -121,7 +125,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
async fn disk_usage_eviction_task(
|
||||
state: &State,
|
||||
task_config: &DiskUsageEvictionTaskConfig,
|
||||
storage: GenericRemoteStorage,
|
||||
_storage: &GenericRemoteStorage,
|
||||
tenants_dir: &Path,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
@@ -145,14 +149,8 @@ async fn disk_usage_eviction_task(
|
||||
let start = Instant::now();
|
||||
|
||||
async {
|
||||
let res = disk_usage_eviction_task_iteration(
|
||||
state,
|
||||
task_config,
|
||||
&storage,
|
||||
tenants_dir,
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
let res =
|
||||
disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await;
|
||||
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
@@ -183,13 +181,12 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
|
||||
async fn disk_usage_eviction_task_iteration(
|
||||
state: &State,
|
||||
task_config: &DiskUsageEvictionTaskConfig,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenants_dir: &Path,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
|
||||
.context("get filesystem-level disk usage before evictions")?;
|
||||
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
|
||||
let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await;
|
||||
match res {
|
||||
Ok(outcome) => {
|
||||
debug!(?outcome, "disk_usage_eviction_iteration finished");
|
||||
@@ -273,7 +270,6 @@ struct LayerCount {
|
||||
|
||||
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
state: &State,
|
||||
storage: &GenericRemoteStorage,
|
||||
usage_pre: U,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<IterationOutcome<U>> {
|
||||
@@ -330,9 +326,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
||||
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
let mut max_batch_size = 0;
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
if !usage_planned.has_pressure() {
|
||||
debug!(
|
||||
@@ -349,10 +346,15 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
|
||||
|
||||
batched
|
||||
.entry(TimelineKey(candidate.timeline))
|
||||
.or_default()
|
||||
.push(candidate.layer);
|
||||
let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();
|
||||
|
||||
// semaphore will later be used to limit eviction concurrency, and we can express at
|
||||
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
|
||||
// but fail gracefully by not making batches larger.
|
||||
if batch.len() < u32::MAX as usize {
|
||||
batch.push(candidate.layer);
|
||||
max_batch_size = max_batch_size.max(batch.len());
|
||||
}
|
||||
}
|
||||
|
||||
let usage_planned = match warned {
|
||||
@@ -369,64 +371,101 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
|
||||
// phase2: evict victims batched by timeline
|
||||
|
||||
// After the loop, `usage_assumed` is the post-eviction usage,
|
||||
// according to internal accounting.
|
||||
let mut usage_assumed = usage_pre;
|
||||
let mut evictions_failed = LayerCount::default();
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
|
||||
// ratelimit to 1k files or any higher max batch size
|
||||
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));
|
||||
|
||||
for (timeline, batch) in batched {
|
||||
let tenant_id = timeline.tenant_id;
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let batch_size = batch.len();
|
||||
let batch_size =
|
||||
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");
|
||||
|
||||
// I dislike naming of `available_permits` but it means current total amount of permits
|
||||
// because permits can be added
|
||||
assert!(batch_size as usize <= limit.available_permits());
|
||||
|
||||
debug!(%timeline_id, "evicting batch for timeline");
|
||||
|
||||
async {
|
||||
let results = timeline.evict_layers(storage, &batch, cancel.clone()).await;
|
||||
let evict = {
|
||||
let limit = limit.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
let mut evicted_bytes = 0;
|
||||
let mut evictions_failed = LayerCount::default();
|
||||
|
||||
match results {
|
||||
Err(e) => {
|
||||
warn!("failed to evict batch: {:#}", e);
|
||||
}
|
||||
Ok(results) => {
|
||||
assert_eq!(results.len(), batch.len());
|
||||
for (result, layer) in results.into_iter().zip(batch.iter()) {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
match result {
|
||||
Some(Ok(())) => {
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
|
||||
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
|
||||
}
|
||||
Some(Err(EvictionError::FileNotFound)) => {
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
Some(Err(
|
||||
e @ EvictionError::LayerNotFound(_)
|
||||
| e @ EvictionError::StatFailed(_),
|
||||
)) => {
|
||||
let e = utils::error::report_compact_sources(&e);
|
||||
warn!(%layer, "failed to evict layer: {e}");
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
None => {
|
||||
assert!(cancel.is_cancelled());
|
||||
return;
|
||||
let Ok(_permit) = limit.acquire_many_owned(batch_size).await else {
|
||||
// semaphore closing means cancelled
|
||||
return (evicted_bytes, evictions_failed);
|
||||
};
|
||||
|
||||
let results = timeline.evict_layers(&batch, &cancel).await;
|
||||
|
||||
match results {
|
||||
Ok(results) => {
|
||||
assert_eq!(results.len(), batch.len());
|
||||
for (result, layer) in results.into_iter().zip(batch.iter()) {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
match result {
|
||||
Some(Ok(())) => {
|
||||
evicted_bytes += file_size;
|
||||
}
|
||||
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
None => {
|
||||
assert!(cancel.is_cancelled());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to evict batch: {:#}", e);
|
||||
}
|
||||
}
|
||||
(evicted_bytes, evictions_failed)
|
||||
}
|
||||
}
|
||||
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size))
|
||||
.await;
|
||||
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size));
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
js.spawn(evict);
|
||||
|
||||
// spwaning multiple thousands of these is essentially blocking, so give already spawned a
|
||||
// chance of making progress
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
let join_all = async move {
|
||||
// After the evictions, `usage_assumed` is the post-eviction usage,
|
||||
// according to internal accounting.
|
||||
let mut usage_assumed = usage_pre;
|
||||
let mut evictions_failed = LayerCount::default();
|
||||
|
||||
while let Some(res) = js.join_next().await {
|
||||
match res {
|
||||
Ok((evicted_bytes, failed)) => {
|
||||
usage_assumed.add_available_bytes(evicted_bytes);
|
||||
evictions_failed.file_sizes += failed.file_sizes;
|
||||
evictions_failed.count += failed.count;
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => { /* already logged */ }
|
||||
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
|
||||
}
|
||||
}
|
||||
(usage_assumed, evictions_failed)
|
||||
};
|
||||
|
||||
let (usage_assumed, evictions_failed) = tokio::select! {
|
||||
tuple = join_all => { tuple },
|
||||
_ = cancel.cancelled() => {
|
||||
// close the semaphore to stop any pending acquires
|
||||
limit.close();
|
||||
return Ok(IterationOutcome::Cancelled);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(IterationOutcome::Finished(IterationOutcomeFinished {
|
||||
before: usage_pre,
|
||||
@@ -441,7 +480,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
layer: Layer,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
|
||||
@@ -1028,7 +1028,7 @@ async fn timeline_compact_handler(
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
|
||||
@@ -1053,7 +1053,7 @@ async fn timeline_checkpoint_handler(
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -1160,11 +1160,11 @@ async fn disk_usage_eviction_run(
|
||||
|
||||
let state = get_state(&r);
|
||||
|
||||
let Some(storage) = state.remote_storage.clone() else {
|
||||
if state.remote_storage.as_ref().is_none() {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"remote storage not configured, cannot run eviction iteration"
|
||||
)));
|
||||
};
|
||||
}
|
||||
|
||||
let state = state.disk_usage_eviction_state.clone();
|
||||
|
||||
@@ -1182,7 +1182,6 @@ async fn disk_usage_eviction_run(
|
||||
async move {
|
||||
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
|
||||
&state,
|
||||
&storage,
|
||||
usage,
|
||||
&child_cancel,
|
||||
)
|
||||
|
||||
@@ -133,9 +133,7 @@ pub(crate) mod timeline;
|
||||
pub mod size;
|
||||
|
||||
pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
pub use timeline::{
|
||||
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
|
||||
};
|
||||
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
||||
|
||||
// re-export for use in remote_timeline_client.rs
|
||||
pub use crate::tenant::metadata::save_metadata;
|
||||
@@ -4041,6 +4039,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn delta_layer_dumping() -> anyhow::Result<()> {
|
||||
use storage_layer::AsLayerDesc;
|
||||
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
@@ -4048,16 +4047,18 @@ mod tests {
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
let layer_map = tline.layers.read().await;
|
||||
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
|
||||
let level0_deltas = layer_map
|
||||
.layer_map()
|
||||
.get_level0_deltas()?
|
||||
.into_iter()
|
||||
.map(|desc| layer_map.get_from_desc(&desc))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert!(!level0_deltas.is_empty());
|
||||
|
||||
for delta in level0_deltas {
|
||||
let delta = layer_map.get_from_desc(&delta);
|
||||
// Ensure we are dumping a delta layer here
|
||||
let delta = delta.downcast_delta_layer().unwrap();
|
||||
|
||||
delta.dump(false, &ctx).await.unwrap();
|
||||
assert!(delta.layer_desc().is_delta);
|
||||
delta.dump(true, &ctx).await.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@@ -639,147 +639,10 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
println!("historic_layers:");
|
||||
for layer in self.iter_historic_layers() {
|
||||
layer.dump(verbose, ctx)?;
|
||||
for desc in self.iter_historic_layers() {
|
||||
desc.dump();
|
||||
}
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::LayerMap;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod l0_delta_layers_updated {
|
||||
|
||||
use crate::tenant::{
|
||||
storage_layer::{AsLayerDesc, PersistentLayerDesc},
|
||||
timeline::layer_manager::LayerFileManager,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
struct LayerObject(PersistentLayerDesc);
|
||||
|
||||
impl AsLayerDesc for LayerObject {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerObject {
|
||||
fn new(desc: PersistentLayerDesc) -> Self {
|
||||
LayerObject(desc)
|
||||
}
|
||||
}
|
||||
|
||||
type TestLayerFileManager = LayerFileManager<LayerObject>;
|
||||
|
||||
#[test]
|
||||
fn for_full_range_delta() {
|
||||
// l0_delta_layers are used by compaction, and should observe all buffered updates
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
|
||||
true
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn for_non_full_range_delta() {
|
||||
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
|
||||
// because not full range
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn for_image() {
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
|
||||
// code only checks if it is a full range layer, doesn't care about images, which must
|
||||
// mean we should in practice never have full range images
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replacing_missing_l0_is_notfound() {
|
||||
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
|
||||
// however only happen for precondition failures.
|
||||
|
||||
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
|
||||
let layer = LayerFileName::from_str(layer).unwrap();
|
||||
let layer = PersistentLayerDesc::from(layer);
|
||||
|
||||
// same skeletan construction; see scenario below
|
||||
let not_found = Arc::new(LayerObject::new(layer.clone()));
|
||||
let new_version = Arc::new(LayerObject::new(layer));
|
||||
|
||||
// after the immutable storage state refactor, the replace operation
|
||||
// will not use layer map any more. We keep it here for consistency in test cases
|
||||
// and can remove it in the future.
|
||||
let _map = LayerMap::default();
|
||||
|
||||
let mut mapping = TestLayerFileManager::new();
|
||||
|
||||
mapping
|
||||
.replace_and_verify(not_found, new_version)
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
|
||||
let name = LayerFileName::from_str(layer_name).unwrap();
|
||||
let skeleton = PersistentLayerDesc::from(name);
|
||||
|
||||
let remote = Arc::new(LayerObject::new(skeleton.clone()));
|
||||
let downloaded = Arc::new(LayerObject::new(skeleton));
|
||||
|
||||
let mut map = LayerMap::default();
|
||||
let mut mapping = LayerFileManager::new();
|
||||
|
||||
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
|
||||
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
|
||||
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
|
||||
|
||||
let expected_in_counts = (1, usize::from(expected_l0));
|
||||
|
||||
map.batch_update()
|
||||
.insert_historic(remote.layer_desc().clone());
|
||||
mapping.insert(remote.clone());
|
||||
assert_eq!(
|
||||
count_layer_in(&map, remote.layer_desc()),
|
||||
expected_in_counts
|
||||
);
|
||||
|
||||
mapping
|
||||
.replace_and_verify(remote, downloaded.clone())
|
||||
.expect("name derived attributes are the same");
|
||||
assert_eq!(
|
||||
count_layer_in(&map, downloaded.layer_desc()),
|
||||
expected_in_counts
|
||||
);
|
||||
|
||||
map.batch_update().remove_historic(downloaded.layer_desc());
|
||||
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
|
||||
}
|
||||
|
||||
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
|
||||
let historic = map
|
||||
.iter_historic_layers()
|
||||
.filter(|x| x.key() == layer.key())
|
||||
.count();
|
||||
let l0s = map
|
||||
.get_level0_deltas()
|
||||
.expect("why does this return a result");
|
||||
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
|
||||
|
||||
(historic, l0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,8 +163,6 @@
|
||||
//! - download their remote [`IndexPart`]s
|
||||
//! - create `Timeline` struct and a `RemoteTimelineClient`
|
||||
//! - initialize the client's upload queue with its `IndexPart`
|
||||
//! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
|
||||
//! for layers that are referenced by `IndexPart` but not present locally
|
||||
//! - schedule uploads for layers that are only present locally.
|
||||
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
|
||||
//! the local filesystem, write the remote metadata to the local filesystem
|
||||
@@ -233,7 +231,8 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::task_mgr::shutdown_token;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
pub(crate) use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::AsLayerDesc;
|
||||
use crate::tenant::upload_queue::Delete;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
@@ -250,7 +249,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use self::index::IndexPart;
|
||||
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::storage_layer::{LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
|
||||
// Occasional network issues and such can cause remote operations to fail, and
|
||||
@@ -598,25 +597,25 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// Launch an upload operation in the background.
|
||||
///
|
||||
pub fn schedule_layer_file_upload(
|
||||
pub(crate) fn schedule_layer_file_upload(
|
||||
self: &Arc<Self>,
|
||||
layer_file_name: &LayerFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
layer: ResidentLayer,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let metadata = LayerFileMetadata::new(layer.layer_desc().file_size);
|
||||
|
||||
upload_queue
|
||||
.latest_files
|
||||
.insert(layer_file_name.clone(), layer_metadata.clone());
|
||||
.insert(layer.layer_desc().filename(), metadata.clone());
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
|
||||
let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
|
||||
info!("scheduled layer file upload {layer}");
|
||||
let op = UploadOp::UploadLayer(layer, metadata);
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
|
||||
info!("scheduled layer file upload {layer_file_name}");
|
||||
|
||||
// Launch the task immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
Ok(())
|
||||
@@ -1054,11 +1053,8 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
|
||||
let path = &self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_id, &self.timeline_id)
|
||||
.join(layer_file_name.file_name());
|
||||
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
|
||||
let path = layer.local_path();
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -1367,6 +1363,7 @@ mod tests {
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
storage_layer::Layer,
|
||||
Tenant, Timeline,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
@@ -1507,7 +1504,7 @@ mod tests {
|
||||
let TestSetup {
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline: _timeline,
|
||||
timeline,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
@@ -1527,32 +1524,29 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Create a couple of dummy files, schedule upload for them
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
|
||||
let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
|
||||
let content_1 = dummy_contents("foo");
|
||||
let content_2 = dummy_contents("bar");
|
||||
let content_3 = dummy_contents("baz");
|
||||
|
||||
for (filename, content) in [
|
||||
(&layer_file_name_1, &content_1),
|
||||
(&layer_file_name_2, &content_2),
|
||||
(&layer_file_name_3, &content_3),
|
||||
] {
|
||||
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
|
||||
}
|
||||
let layers = [
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(name, contents): (LayerFileName, Vec<u8>)| {
|
||||
std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
|
||||
|
||||
Layer::for_resident(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
name,
|
||||
LayerFileMetadata::new(contents.len() as u64),
|
||||
)
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)
|
||||
.schedule_layer_file_upload(layers[0].clone())
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_2,
|
||||
&LayerFileMetadata::new(content_2.len() as u64),
|
||||
)
|
||||
.schedule_layer_file_upload(layers[1].clone())
|
||||
.unwrap();
|
||||
|
||||
// Check that they are started immediately, not queued
|
||||
@@ -1605,21 +1599,18 @@ mod tests {
|
||||
.map(|f| f.to_owned())
|
||||
.collect(),
|
||||
&[
|
||||
&layer_file_name_1.file_name(),
|
||||
&layer_file_name_2.file_name(),
|
||||
&layers[0].layer_desc().filename().file_name(),
|
||||
&layers[1].layer_desc().filename().file_name(),
|
||||
],
|
||||
);
|
||||
assert_eq!(index_part.metadata, metadata);
|
||||
|
||||
// Schedule upload and then a deletion. Check that the deletion is queued
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_3,
|
||||
&LayerFileMetadata::new(content_3.len() as u64),
|
||||
)
|
||||
.schedule_layer_file_upload(layers[2].clone())
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
|
||||
.schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
|
||||
.unwrap();
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
@@ -1634,8 +1625,8 @@ mod tests {
|
||||
}
|
||||
assert_remote_files(
|
||||
&[
|
||||
&layer_file_name_1.file_name(),
|
||||
&layer_file_name_2.file_name(),
|
||||
&layers[0].layer_desc().filename().file_name(),
|
||||
&layers[1].layer_desc().filename().file_name(),
|
||||
"index_part.json",
|
||||
],
|
||||
&remote_timeline_dir,
|
||||
@@ -1646,8 +1637,8 @@ mod tests {
|
||||
|
||||
assert_remote_files(
|
||||
&[
|
||||
&layer_file_name_2.file_name(),
|
||||
&layer_file_name_3.file_name(),
|
||||
&layers[1].layer_desc().filename().file_name(),
|
||||
&layers[2].layer_desc().filename().file_name(),
|
||||
"index_part.json",
|
||||
],
|
||||
&remote_timeline_dir,
|
||||
@@ -1661,7 +1652,7 @@ mod tests {
|
||||
let TestSetup {
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline: _timeline,
|
||||
timeline,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics").await.unwrap();
|
||||
@@ -1681,6 +1672,13 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let layer_file_1 = Layer::for_resident(
|
||||
harness.conf,
|
||||
&timeline,
|
||||
layer_file_name_1.clone(),
|
||||
LayerFileMetadata::new(content_1.len() as u64),
|
||||
);
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
started: Option<usize>,
|
||||
@@ -1706,10 +1704,7 @@ mod tests {
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)
|
||||
.schedule_layer_file_upload(layer_file_1.clone())
|
||||
.unwrap();
|
||||
|
||||
let pre = get_bytes_started_stopped();
|
||||
|
||||
@@ -67,6 +67,8 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
// upload. However, a nonexistent file can also be indicative of
|
||||
// something worse, like when a file is scheduled for upload before
|
||||
// it has been written to disk yet.
|
||||
//
|
||||
// This is tested against `test_compaction_delete_before_upload`
|
||||
info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -4,26 +4,21 @@ pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
mod remote_layer;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::repository::Key;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use enum_map::EnumMap;
|
||||
use enumset::EnumSet;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tracing::warn;
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
@@ -39,7 +34,8 @@ pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
pub use inmemory_layer::InMemoryLayer;
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -74,7 +70,7 @@ pub struct ValueReconstructState {
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
}
|
||||
|
||||
/// Return value from Layer::get_page_reconstruct_data
|
||||
/// Return value from [`Layer::get_value_reconstruct_data`]
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum ValueReconstructResult {
|
||||
/// Got all the data needed to reconstruct the requested page
|
||||
@@ -179,26 +175,6 @@ impl LayerAccessStats {
|
||||
new
|
||||
}
|
||||
|
||||
/// Creates a clone of `self` and records `new_status` in the clone.
|
||||
///
|
||||
/// The `new_status` is not recorded in `self`.
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
///
|
||||
/// [`record_residence_event`]: Self::record_residence_event
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
&self,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats {
|
||||
let clone = {
|
||||
let inner = self.0.lock().unwrap();
|
||||
inner.clone()
|
||||
};
|
||||
let new = LayerAccessStats(Mutex::new(clone));
|
||||
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
|
||||
new
|
||||
}
|
||||
|
||||
/// Record a change in layer residency.
|
||||
///
|
||||
/// Recording the event must happen while holding the layer map lock to
|
||||
@@ -321,95 +297,12 @@ impl LayerAccessStats {
|
||||
}
|
||||
}
|
||||
|
||||
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
|
||||
/// required by [`LayerMap`](super::layer_map::LayerMap).
|
||||
///
|
||||
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
|
||||
/// timeline names, because those are known in the context of which the layers
|
||||
/// are used in (timeline).
|
||||
#[async_trait::async_trait]
|
||||
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
|
||||
///
|
||||
/// Return data needed to reconstruct given page at LSN.
|
||||
///
|
||||
/// It is up to the caller to collect more data from previous layer and
|
||||
/// perform WAL redo, if necessary.
|
||||
///
|
||||
/// See PageReconstructResult for possible return values. The collected data
|
||||
/// is appended to reconstruct_data; the caller should pass an empty struct
|
||||
/// on first call, or a struct with a cached older image of the page if one
|
||||
/// is available. If this returns ValueReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult>;
|
||||
}
|
||||
|
||||
/// Get a layer descriptor from a layer.
|
||||
pub trait AsLayerDesc {
|
||||
/// Get the layer descriptor.
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc;
|
||||
}
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
///
|
||||
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
|
||||
/// layers are used to ingest incoming WAL, and provide fast access to the
|
||||
/// recent page versions. On-disk layers are stored as files on disk, and are
|
||||
/// immutable. This trait presents the common functionality of in-memory and
|
||||
/// on-disk layers.
|
||||
///
|
||||
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
|
||||
/// A delta layer contains all modifications within a range of LSNs and keys.
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN.
|
||||
pub trait PersistentLayer: Layer + AsLayerDesc {
|
||||
/// File name used for this layer, both in the pageserver's local filesystem
|
||||
/// state as well as in the remote storage.
|
||||
fn filename(&self) -> LayerFileName {
|
||||
self.layer_desc().filename()
|
||||
}
|
||||
|
||||
// Path to the layer file in the local filesystem.
|
||||
// `None` for `RemoteLayer`.
|
||||
fn local_path(&self) -> Option<PathBuf>;
|
||||
|
||||
/// Permanently remove this layer from disk.
|
||||
fn delete_resident_layer_file(&self) -> Result<()>;
|
||||
|
||||
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
layer: &Arc<dyn PersistentLayer>,
|
||||
) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
if layer.is_remote_layer() {
|
||||
Arc::clone(layer).downcast_remote_layer()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -447,19 +340,6 @@ pub mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper enum to hold a PageServerConf, or a path
|
||||
///
|
||||
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
|
||||
/// global config, and paths to layer files are constructed using the tenant/timeline
|
||||
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
|
||||
/// struct for a file on disk, without having a page server running, so that we have no
|
||||
/// config. In that case, we use the Path variant to hold the full path to the file on
|
||||
/// disk.
|
||||
enum PathOrConf {
|
||||
Path(PathBuf),
|
||||
Conf(&'static PageServerConf),
|
||||
}
|
||||
|
||||
/// Range wrapping newtype, which uses display to render Debug.
|
||||
///
|
||||
/// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
|
||||
|
||||
@@ -34,17 +34,16 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
@@ -60,10 +59,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{
|
||||
AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
|
||||
PersistentLayerDesc,
|
||||
};
|
||||
use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -183,20 +179,12 @@ impl DeltaKey {
|
||||
}
|
||||
}
|
||||
|
||||
/// DeltaLayer is the in-memory data structure associated with an on-disk delta
|
||||
/// file.
|
||||
///
|
||||
/// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
|
||||
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
|
||||
/// Otherwise the struct is just a placeholder for a file that exists on disk,
|
||||
/// and it needs to be loaded before using it in queries.
|
||||
/// This is used only from `pagectl`. Within pageserver, all layers are
|
||||
/// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
|
||||
pub struct DeltaLayer {
|
||||
path_or_conf: PathOrConf,
|
||||
|
||||
path: PathBuf,
|
||||
pub desc: PersistentLayerDesc,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: OnceCell<Arc<DeltaLayerInner>>,
|
||||
}
|
||||
|
||||
@@ -213,6 +201,8 @@ impl std::fmt::Debug for DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
|
||||
/// file.
|
||||
pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
@@ -222,12 +212,6 @@ pub struct DeltaLayerInner {
|
||||
file: FileBlockReader<VirtualFile>,
|
||||
}
|
||||
|
||||
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
|
||||
fn as_ref(&self) -> &DeltaLayerInner {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayerInner")
|
||||
@@ -237,19 +221,6 @@ impl std::fmt::Debug for DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for DeltaLayer {
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for DeltaLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -263,40 +234,9 @@ impl AsLayerDesc for DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for DeltaLayer {
|
||||
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
|
||||
Some(self)
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
self.local_path()
|
||||
}
|
||||
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
self.delete_resident_layer_file()
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
self.info(reset)
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
self.access_stats()
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.desc.lsn_range.start,
|
||||
self.desc.lsn_range.end,
|
||||
self.desc.file_size,
|
||||
);
|
||||
self.desc.dump();
|
||||
|
||||
if !verbose {
|
||||
return Ok(());
|
||||
@@ -304,119 +244,7 @@ impl DeltaLayer {
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
);
|
||||
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
file,
|
||||
);
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
let keys = DeltaLayerInner::load_keys(&inner).await?;
|
||||
|
||||
// A subroutine to dump a single blob
|
||||
async fn dump_blob(val: ValueRef<'_>) -> Result<String> {
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
}
|
||||
|
||||
for entry in keys {
|
||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||
let desc = match dump_blob(val).await {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => {
|
||||
let err: anyhow::Error = err;
|
||||
format!("ERROR: {err}")
|
||||
}
|
||||
};
|
||||
println!(" key {key} at {lsn}: {desc}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.desc.lsn_range.start);
|
||||
|
||||
ensure!(self.desc.key_range.contains(&key));
|
||||
|
||||
let inner = self
|
||||
.load(LayerAccessKind::GetValueReconstructData, ctx)
|
||||
.await?;
|
||||
inner
|
||||
.get_value_reconstruct_data(key, lsn_range, reconstruct_state)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> Option<PathBuf> {
|
||||
Some(self.path())
|
||||
}
|
||||
|
||||
pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.layer_desc().filename().file_name();
|
||||
let lsn_range = self.layer_desc().lsn_range.clone();
|
||||
|
||||
let access_stats = self.access_stats.as_api_model(reset);
|
||||
|
||||
HistoricLayerInfo::Delta {
|
||||
layer_file_name,
|
||||
layer_file_size: self.desc.file_size,
|
||||
lsn_start: lsn_range.start,
|
||||
lsn_end: lsn_range.end,
|
||||
remote: false,
|
||||
access_stats,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
|
||||
fn path_for(
|
||||
path_or_conf: &PathOrConf,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
fname: &DeltaFileName,
|
||||
) -> PathBuf {
|
||||
match path_or_conf {
|
||||
PathOrConf::Path(path) => path.clone(),
|
||||
PathOrConf::Conf(conf) => conf
|
||||
.timeline_path(tenant_id, timeline_id)
|
||||
.join(fname.to_string()),
|
||||
}
|
||||
inner.dump().await
|
||||
}
|
||||
|
||||
fn temp_path_for(
|
||||
@@ -462,52 +290,22 @@ impl DeltaLayer {
|
||||
async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let summary = match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => Some(Summary::from(self)),
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
let loaded = DeltaLayerInner::load(&path, None)?;
|
||||
|
||||
let loaded = DeltaLayerInner::load(&path, summary)?;
|
||||
// not production code
|
||||
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
let actual_filename = self.path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.layer_desc().filename().file_name();
|
||||
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!("warning: filename does not match what is expected from in-file summary");
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
if actual_filename != expected_filename {
|
||||
println!("warning: filename does not match what is expected from in-file summary");
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
|
||||
Ok(Arc::new(loaded))
|
||||
}
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
filename: &DeltaFileName,
|
||||
file_size: u64,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> DeltaLayer {
|
||||
DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
desc: PersistentLayerDesc::new_delta(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
filename.key_range.clone(),
|
||||
filename.lsn_range.clone(),
|
||||
file_size,
|
||||
),
|
||||
access_stats,
|
||||
inner: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
@@ -522,7 +320,7 @@ impl DeltaLayer {
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
Ok(DeltaLayer {
|
||||
path_or_conf: PathOrConf::Path(path.to_path_buf()),
|
||||
path: path.to_path_buf(),
|
||||
desc: PersistentLayerDesc::new_delta(
|
||||
summary.tenant_id,
|
||||
summary.timeline_id,
|
||||
@@ -535,29 +333,9 @@ impl DeltaLayer {
|
||||
})
|
||||
}
|
||||
|
||||
fn layer_name(&self) -> DeltaFileName {
|
||||
self.desc.delta_file_name()
|
||||
}
|
||||
/// Path to the layer file in pageserver workdir.
|
||||
pub fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
&self.path_or_conf,
|
||||
&self.desc.tenant_id,
|
||||
&self.desc.timeline_id,
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
|
||||
///
|
||||
/// The value can be obtained via the [`ValueRef::load`] function.
|
||||
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.await
|
||||
.context("load delta layer keys")?;
|
||||
DeltaLayerInner::load_keys(inner)
|
||||
.await
|
||||
.context("Layer index is corrupted")
|
||||
/// Path to the layer file
|
||||
fn path(&self) -> PathBuf {
|
||||
self.path.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -662,7 +440,7 @@ impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the delta layer.
|
||||
///
|
||||
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -708,37 +486,21 @@ impl DeltaLayerWriterInner {
|
||||
// Note: Because we opened the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
let layer = DeltaLayer {
|
||||
path_or_conf: PathOrConf::Conf(self.conf),
|
||||
desc: PersistentLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_start..key_end,
|
||||
self.lsn_range.clone(),
|
||||
metadata.len(),
|
||||
),
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: OnceCell::new(),
|
||||
};
|
||||
|
||||
let desc = PersistentLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_start..key_end,
|
||||
self.lsn_range.clone(),
|
||||
metadata.len(),
|
||||
);
|
||||
|
||||
// fsync the file
|
||||
file.sync_all()?;
|
||||
// Rename the file to its final name
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let final_path = DeltaLayer::path_for(
|
||||
&PathOrConf::Conf(self.conf),
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
&DeltaFileName {
|
||||
key_range: self.key_start..key_end,
|
||||
lsn_range: self.lsn_range,
|
||||
},
|
||||
);
|
||||
std::fs::rename(self.path, &final_path)?;
|
||||
|
||||
trace!("created delta layer {}", final_path.display());
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
trace!("created delta layer {}", layer.local_path().display());
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -821,8 +583,12 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// Finish writing the delta layer.
|
||||
///
|
||||
pub fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
self.inner.take().unwrap().finish(key_end)
|
||||
pub(crate) fn finish(
|
||||
mut self,
|
||||
key_end: Key,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
self.inner.take().unwrap().finish(key_end, timeline)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -949,14 +715,14 @@ impl DeltaLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn load_keys<T: AsRef<DeltaLayerInner> + Clone>(
|
||||
this: &T,
|
||||
) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
let dl = this.as_ref();
|
||||
let file = &dl.file;
|
||||
pub(super) async fn load_keys(&self) -> Result<Vec<DeltaEntry<'_>>> {
|
||||
let file = &self.file;
|
||||
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
file,
|
||||
);
|
||||
|
||||
let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
|
||||
|
||||
@@ -969,7 +735,7 @@ impl DeltaLayerInner {
|
||||
let val_ref = ValueRef {
|
||||
blob_ref: BlobRef(value),
|
||||
reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
|
||||
Adapter(dl),
|
||||
Adapter(self),
|
||||
)),
|
||||
};
|
||||
let pos = BlobRef(value).pos();
|
||||
@@ -993,10 +759,61 @@ impl DeltaLayerInner {
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
// Last key occupies all space till end of value storage,
|
||||
// which corresponds to beginning of the index
|
||||
last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
|
||||
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
|
||||
}
|
||||
Ok(all_keys)
|
||||
}
|
||||
|
||||
pub(super) async fn dump(&self) -> anyhow::Result<()> {
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
self.index_start_blk, self.index_root_blk
|
||||
);
|
||||
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
file,
|
||||
);
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
let keys = self.load_keys().await?;
|
||||
|
||||
async fn dump_blob(val: ValueRef<'_>) -> anyhow::Result<String> {
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
}
|
||||
|
||||
for entry in keys {
|
||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||
let desc = match dump_blob(val).await {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => {
|
||||
format!("ERROR: {err}")
|
||||
}
|
||||
};
|
||||
println!(" key {key} at {lsn}: {desc}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of data associated with a delta layer key and its value
|
||||
@@ -1032,3 +849,9 @@ impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
|
||||
self.0.as_ref().file.read_blk(blknum)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
|
||||
fn as_ref(&self) -> &DeltaLayerInner {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,22 +31,24 @@ use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use hex;
|
||||
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::{self, File};
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::io::{Seek, SeekFrom};
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tracing::*;
|
||||
|
||||
@@ -57,7 +59,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::ImageFileName;
|
||||
use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLayerDesc};
|
||||
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -115,22 +117,14 @@ impl Summary {
|
||||
}
|
||||
}
|
||||
|
||||
/// ImageLayer is the in-memory data structure associated with an on-disk image
|
||||
/// file.
|
||||
///
|
||||
/// We keep an ImageLayer in memory for each file, in the LayerMap. If a layer
|
||||
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
|
||||
/// Otherwise the struct is just a placeholder for a file that exists on disk,
|
||||
/// and it needs to be loaded before using it in queries.
|
||||
/// This is used only from `pagectl`. Within pageserver, all layers are
|
||||
/// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
|
||||
pub struct ImageLayer {
|
||||
path_or_conf: PathOrConf,
|
||||
|
||||
path: PathBuf,
|
||||
pub desc: PersistentLayerDesc,
|
||||
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
|
||||
pub lsn: Lsn,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
inner: OnceCell<ImageLayerInner>,
|
||||
}
|
||||
|
||||
@@ -147,6 +141,8 @@ impl std::fmt::Debug for ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// ImageLayer is the in-memory data structure associated with an on-disk image
|
||||
/// file.
|
||||
pub struct ImageLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
@@ -167,18 +163,22 @@ impl std::fmt::Debug for ImageLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for ImageLayer {
|
||||
/// Look up given page in the file
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
impl ImageLayerInner {
|
||||
pub(super) async fn dump(&self) -> anyhow::Result<()> {
|
||||
let file = &self.file;
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file);
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
tree_reader
|
||||
.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,120 +195,21 @@ impl AsLayerDesc for ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for ImageLayer {
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
self.local_path()
|
||||
}
|
||||
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
self.delete_resident_layer_file()
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
self.info(reset)
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
self.access_stats()
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.lsn,
|
||||
self.desc.is_incremental(),
|
||||
self.desc.file_size
|
||||
);
|
||||
self.desc.dump();
|
||||
|
||||
if !verbose {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
|
||||
let file = &inner.file;
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
tree_reader
|
||||
.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
inner.dump().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
assert!(self.desc.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self
|
||||
.load(LayerAccessKind::GetValueReconstructData, ctx)
|
||||
.await?;
|
||||
inner
|
||||
.get_value_reconstruct_data(key, reconstruct_state)
|
||||
.await
|
||||
// FIXME: makes no sense to dump paths
|
||||
.with_context(|| format!("read {}", self.path().display()))
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> Option<PathBuf> {
|
||||
Some(self.path())
|
||||
}
|
||||
|
||||
pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.layer_desc().filename().file_name();
|
||||
let lsn_start = self.layer_desc().image_layer_lsn();
|
||||
|
||||
HistoricLayerInfo::Image {
|
||||
layer_file_name,
|
||||
layer_file_size: self.desc.file_size,
|
||||
lsn_start,
|
||||
remote: false,
|
||||
access_stats: self.access_stats.as_api_model(reset),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
|
||||
fn path_for(
|
||||
path_or_conf: &PathOrConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
fname: &ImageFileName,
|
||||
) -> PathBuf {
|
||||
match path_or_conf {
|
||||
PathOrConf::Path(path) => path.to_path_buf(),
|
||||
PathOrConf::Conf(conf) => conf
|
||||
.timeline_path(&tenant_id, &timeline_id)
|
||||
.join(fname.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn temp_path_for(
|
||||
conf: &PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -344,52 +245,21 @@ impl ImageLayer {
|
||||
async fn load_inner(&self) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
let expected_summary = match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => Some(Summary::from(self)),
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None)?;
|
||||
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
|
||||
// not production code
|
||||
let actual_filename = self.path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.layer_desc().filename().file_name();
|
||||
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!("warning: filename does not match what is expected from in-file summary");
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
if actual_filename != expected_filename {
|
||||
println!("warning: filename does not match what is expected from in-file summary");
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
|
||||
Ok(loaded)
|
||||
}
|
||||
|
||||
/// Create an ImageLayer struct representing an existing file on disk
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
filename: &ImageFileName,
|
||||
file_size: u64,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> ImageLayer {
|
||||
ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(conf),
|
||||
desc: PersistentLayerDesc::new_img(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
filename.key_range.clone(),
|
||||
filename.lsn,
|
||||
file_size,
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: filename.lsn,
|
||||
access_stats,
|
||||
inner: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create an ImageLayer struct representing an existing file on disk.
|
||||
///
|
||||
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
|
||||
@@ -402,7 +272,7 @@ impl ImageLayer {
|
||||
.metadata()
|
||||
.context("get file metadata to determine size")?;
|
||||
Ok(ImageLayer {
|
||||
path_or_conf: PathOrConf::Path(path.to_path_buf()),
|
||||
path: path.to_path_buf(),
|
||||
desc: PersistentLayerDesc::new_img(
|
||||
summary.tenant_id,
|
||||
summary.timeline_id,
|
||||
@@ -416,18 +286,9 @@ impl ImageLayer {
|
||||
})
|
||||
}
|
||||
|
||||
fn layer_name(&self) -> ImageFileName {
|
||||
self.desc.image_file_name()
|
||||
}
|
||||
|
||||
/// Path to the layer file in pageserver workdir.
|
||||
pub fn path(&self) -> PathBuf {
|
||||
Self::path_for(
|
||||
&self.path_or_conf,
|
||||
self.desc.timeline_id,
|
||||
self.desc.tenant_id,
|
||||
&self.layer_name(),
|
||||
)
|
||||
fn path(&self) -> PathBuf {
|
||||
self.path.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -582,7 +443,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
fn finish(self) -> anyhow::Result<ImageLayer> {
|
||||
fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -624,33 +485,13 @@ impl ImageLayerWriterInner {
|
||||
// Note: Because we open the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
let layer = ImageLayer {
|
||||
path_or_conf: PathOrConf::Conf(self.conf),
|
||||
desc,
|
||||
lsn: self.lsn,
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: OnceCell::new(),
|
||||
};
|
||||
|
||||
// fsync the file
|
||||
file.sync_all()?;
|
||||
|
||||
// Rename the file to its final name
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let final_path = ImageLayer::path_for(
|
||||
&PathOrConf::Conf(self.conf),
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&ImageFileName {
|
||||
key_range: self.key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
std::fs::rename(self.path, final_path)?;
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
trace!("created image layer {}", layer.path().display());
|
||||
trace!("created image layer {}", layer.local_path().display());
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
@@ -716,8 +557,11 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish()
|
||||
pub(crate) fn finish(
|
||||
mut self,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> anyhow::Result<super::ResidentLayer> {
|
||||
self.inner.take().unwrap().finish(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,11 +10,12 @@ use crate::repository::{Key, Value};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walrecord;
|
||||
use anyhow::{ensure, Result};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
@@ -28,7 +29,7 @@ use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{DeltaLayer, DeltaLayerWriter, Layer};
|
||||
use super::{DeltaLayerWriter, ResidentLayer};
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -203,20 +204,6 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for InMemoryLayer {
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
self.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for InMemoryLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let end_lsn = self.end_lsn_or_max();
|
||||
@@ -225,17 +212,13 @@ impl std::fmt::Display for InMemoryLayer {
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
///
|
||||
/// Get layer size.
|
||||
///
|
||||
pub async fn size(&self) -> Result<u64> {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.file.len())
|
||||
}
|
||||
|
||||
///
|
||||
/// Create a new, empty, in-memory layer
|
||||
///
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -313,7 +296,7 @@ impl InMemoryLayer {
|
||||
/// Write this frozen in-memory layer to disk.
|
||||
///
|
||||
/// Returns a new delta layer with all the same data as this in-memory layer
|
||||
pub(crate) async fn write_to_disk(&self) -> Result<DeltaLayer> {
|
||||
pub(crate) async fn write_to_disk(&self, timeline: &Arc<Timeline>) -> Result<ResidentLayer> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to acquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
@@ -352,7 +335,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX)?;
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline)?;
|
||||
Ok(delta_layer)
|
||||
}
|
||||
}
|
||||
|
||||
1205
pageserver/src/tenant/storage_layer/layer.rs
Normal file
1205
pageserver/src/tenant/storage_layer/layer.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,3 @@
|
||||
use anyhow::Result;
|
||||
use core::fmt::Display;
|
||||
use std::ops::Range;
|
||||
use utils::{
|
||||
@@ -6,7 +5,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{context::RequestContext, repository::Key};
|
||||
use crate::repository::Key;
|
||||
|
||||
use super::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
|
||||
@@ -100,6 +99,22 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_filename(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
filename: LayerFileName,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
match filename {
|
||||
LayerFileName::Image(i) => {
|
||||
Self::new_img(tenant_id, timeline_id, i.key_range, i.lsn, file_size)
|
||||
}
|
||||
LayerFileName::Delta(d) => {
|
||||
Self::new_delta(tenant_id, timeline_id, d.key_range, d.lsn_range, file_size)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the LSN that the image layer covers.
|
||||
pub fn image_layer_lsn(&self) -> Lsn {
|
||||
assert!(!self.is_delta);
|
||||
@@ -173,21 +188,31 @@ impl PersistentLayerDesc {
|
||||
self.is_delta
|
||||
}
|
||||
|
||||
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end,
|
||||
self.is_delta,
|
||||
self.is_incremental(),
|
||||
self.file_size,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
pub fn dump(&self) {
|
||||
if self.is_delta {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end,
|
||||
self.is_incremental(),
|
||||
self.file_size,
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.image_layer_lsn(),
|
||||
self.is_incremental(),
|
||||
self.file_size
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_size(&self) -> u64 {
|
||||
|
||||
@@ -1,216 +0,0 @@
|
||||
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
|
||||
//! in remote storage.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::timeline::layer_manager::LayerManager;
|
||||
use anyhow::{bail, Result};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::filename::{DeltaFileName, ImageFileName};
|
||||
use super::{
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset,
|
||||
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
|
||||
};
|
||||
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
/// [`DeltaLayer`](super::DeltaLayer).
|
||||
///
|
||||
/// RemoteLayer might be downloaded on-demand during operations which are
|
||||
/// allowed download remote layers and during which, it gets replaced with a
|
||||
/// concrete `DeltaLayer` or `ImageLayer`.
|
||||
///
|
||||
/// See: [`crate::context::RequestContext`] for authorization to download
|
||||
pub struct RemoteLayer {
|
||||
pub desc: PersistentLayerDesc,
|
||||
|
||||
pub layer_metadata: LayerFileMetadata,
|
||||
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Has `LayerMap::replace` failed for this (true) or not (false).
|
||||
///
|
||||
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
|
||||
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
|
||||
/// unprocessable, because a LayerMap::replace failed.
|
||||
///
|
||||
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
|
||||
/// a possible fast loop between `Timeline::get_reconstruct_data` and
|
||||
/// `Timeline::download_remote_layer`, which also logs.
|
||||
///
|
||||
/// [`ongoing_download`]: Self::ongoing_download
|
||||
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RemoteLayer")
|
||||
.field("file_name", &self.desc.filename())
|
||||
.field("layer_metadata", &self.layer_metadata)
|
||||
.field("is_incremental", &self.desc.is_incremental())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Layer for RemoteLayer {
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
_lsn_range: Range<Lsn>,
|
||||
_reconstruct_state: &mut ValueReconstructState,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<ValueReconstructResult> {
|
||||
bail!("layer {self} needs to be downloaded");
|
||||
}
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
impl std::fmt::Display for RemoteLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.layer_desc().short_id())
|
||||
}
|
||||
}
|
||||
|
||||
impl AsLayerDesc for RemoteLayer {
|
||||
fn layer_desc(&self) -> &PersistentLayerDesc {
|
||||
&self.desc
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for RemoteLayer {
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
bail!("remote layer has no layer file");
|
||||
}
|
||||
|
||||
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
Some(self)
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.layer_desc().filename().file_name();
|
||||
let lsn_range = self.layer_desc().lsn_range.clone();
|
||||
|
||||
if self.desc.is_delta {
|
||||
HistoricLayerInfo::Delta {
|
||||
layer_file_name,
|
||||
layer_file_size: self.layer_metadata.file_size(),
|
||||
lsn_start: lsn_range.start,
|
||||
lsn_end: lsn_range.end,
|
||||
remote: true,
|
||||
access_stats: self.access_stats.as_api_model(reset),
|
||||
}
|
||||
} else {
|
||||
HistoricLayerInfo::Image {
|
||||
layer_file_name,
|
||||
layer_file_size: self.layer_metadata.file_size(),
|
||||
lsn_start: lsn_range.start,
|
||||
remote: true,
|
||||
access_stats: self.access_stats.as_api_model(reset),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats {
|
||||
&self.access_stats
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteLayer {
|
||||
pub fn new_img(
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
fname: &ImageFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
desc: PersistentLayerDesc::new_img(
|
||||
tenantid,
|
||||
timelineid,
|
||||
fname.key_range.clone(),
|
||||
fname.lsn,
|
||||
layer_metadata.file_size(),
|
||||
),
|
||||
layer_metadata: layer_metadata.clone(),
|
||||
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
|
||||
download_replacement_failure: std::sync::atomic::AtomicBool::default(),
|
||||
access_stats,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_delta(
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
fname: &DeltaFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
desc: PersistentLayerDesc::new_delta(
|
||||
tenantid,
|
||||
timelineid,
|
||||
fname.key_range.clone(),
|
||||
fname.lsn_range.clone(),
|
||||
layer_metadata.file_size(),
|
||||
),
|
||||
layer_metadata: layer_metadata.clone(),
|
||||
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
|
||||
download_replacement_failure: std::sync::atomic::AtomicBool::default(),
|
||||
access_stats,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a Layer struct representing this layer, after it has been downloaded.
|
||||
pub(crate) fn create_downloaded_layer(
|
||||
&self,
|
||||
_layer_map_lock_held_witness: &LayerManager,
|
||||
conf: &'static PageServerConf,
|
||||
file_size: u64,
|
||||
) -> Arc<dyn PersistentLayer> {
|
||||
if self.desc.is_delta {
|
||||
let fname = self.desc.delta_file_name();
|
||||
Arc::new(DeltaLayer::new(
|
||||
conf,
|
||||
self.desc.timeline_id,
|
||||
self.desc.tenant_id,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
} else {
|
||||
let fname = self.desc.image_file_name();
|
||||
Arc::new(ImageLayer::new(
|
||||
conf,
|
||||
self.desc.timeline_id,
|
||||
self.desc.tenant_id,
|
||||
&fname,
|
||||
file_size,
|
||||
self.access_stats
|
||||
.clone_for_residence_change(LayerResidenceStatus::Resident),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -29,7 +29,6 @@ use crate::{
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::PersistentLayer,
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
@@ -194,15 +193,26 @@ impl Timeline {
|
||||
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let candidates: Vec<_> = {
|
||||
let guard = self.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
let hist_layer = guard.get_from_desc(&hist_layer);
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// guard against eviction while we inspect it; it might be that eviction_task and
|
||||
// disk_usage_eviction_task both select the same layers to be evicted, and
|
||||
// seemingly free up double the space. both succeeding is of no consequence.
|
||||
let guard = match hist_layer.keep_resident().await {
|
||||
Ok(Some(l)) => l,
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
// these should not happen, but we cannot make them statically impossible right
|
||||
// now.
|
||||
tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
@@ -233,7 +243,7 @@ impl Timeline {
|
||||
}
|
||||
};
|
||||
if no_activity_for > p.threshold {
|
||||
candidates.push(hist_layer)
|
||||
candidates.push(guard.drop_eviction_guard())
|
||||
}
|
||||
}
|
||||
candidates
|
||||
@@ -252,7 +262,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let results = match self
|
||||
.evict_layer_batch(remote_client, &candidates[..], cancel.clone())
|
||||
.evict_layer_batch(remote_client, &candidates, cancel)
|
||||
.await
|
||||
{
|
||||
Err(pre_err) => {
|
||||
@@ -263,7 +273,7 @@ impl Timeline {
|
||||
Ok(results) => results,
|
||||
};
|
||||
assert_eq!(results.len(), candidates.len());
|
||||
for (l, result) in candidates.iter().zip(results) {
|
||||
for result in results {
|
||||
match result {
|
||||
None => {
|
||||
stats.skipped_for_shutdown += 1;
|
||||
@@ -271,20 +281,10 @@ impl Timeline {
|
||||
Some(Ok(())) => {
|
||||
stats.evicted += 1;
|
||||
}
|
||||
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
|
||||
stats.not_evictable += 1;
|
||||
}
|
||||
Some(Err(EvictionError::FileNotFound)) => {
|
||||
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
|
||||
// compaction/gc removed the file while we were waiting on layer_removal_cs
|
||||
stats.not_evictable += 1;
|
||||
}
|
||||
Some(Err(
|
||||
e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
|
||||
)) => {
|
||||
let e = utils::error::report_compact_sources(&e);
|
||||
warn!(layer = %l, "failed to evict layer: {e}");
|
||||
stats.not_evictable += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if stats.candidates == stats.not_evictable {
|
||||
|
||||
@@ -8,21 +8,19 @@ use utils::{
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::TimelineMetrics,
|
||||
tenant::{
|
||||
layer_map::{BatchedUpdates, LayerMap},
|
||||
storage_layer::{
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
|
||||
PersistentLayerDesc, PersistentLayerKey,
|
||||
AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
|
||||
ResidentLayer,
|
||||
},
|
||||
timeline::compare_arced_layers,
|
||||
},
|
||||
};
|
||||
|
||||
/// Provides semantic APIs to manipulate the layer map.
|
||||
pub(crate) struct LayerManager {
|
||||
layer_map: LayerMap,
|
||||
layer_fmgr: LayerFileManager,
|
||||
layer_fmgr: LayerFileManager<Layer>,
|
||||
}
|
||||
|
||||
/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
|
||||
@@ -43,7 +41,7 @@ impl LayerManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
|
||||
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
|
||||
self.layer_fmgr.get_from_desc(desc)
|
||||
}
|
||||
|
||||
@@ -55,21 +53,12 @@ impl LayerManager {
|
||||
&self.layer_map
|
||||
}
|
||||
|
||||
/// Replace layers in the layer file manager, used in evictions and layer downloads.
|
||||
pub(crate) fn replace_and_verify(
|
||||
&mut self,
|
||||
expected: Arc<dyn PersistentLayer>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> Result<()> {
|
||||
self.layer_fmgr.replace_and_verify(expected, new)
|
||||
}
|
||||
|
||||
/// Called from `load_layer_map`. Initialize the layer manager with:
|
||||
/// 1. all on-disk layers
|
||||
/// 2. next open layer (with disk disk_consistent_lsn LSN)
|
||||
pub(crate) fn initialize_local_layers(
|
||||
&mut self,
|
||||
on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
|
||||
on_disk_layers: Vec<Layer>,
|
||||
next_open_layer_at: Lsn,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
@@ -164,10 +153,10 @@ impl LayerManager {
|
||||
}
|
||||
|
||||
/// Add image layers to the layer map, called from `create_image_layers`.
|
||||
pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
|
||||
pub(crate) fn track_new_image_layers(&mut self, image_layers: &[ResidentLayer]) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in image_layers {
|
||||
Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
|
||||
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
@@ -175,46 +164,47 @@ impl LayerManager {
|
||||
/// Flush a frozen layer and add the written delta layer to the layer map.
|
||||
pub(crate) fn finish_flush_l0_layer(
|
||||
&mut self,
|
||||
delta_layer: Option<DeltaLayer>,
|
||||
delta_layer: Option<&ResidentLayer>,
|
||||
frozen_layer_for_check: &Arc<InMemoryLayer>,
|
||||
) {
|
||||
let l = self.layer_map.frozen_layers.pop_front();
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
let inmem = self
|
||||
.layer_map
|
||||
.frozen_layers
|
||||
.pop_front()
|
||||
.expect("there must be a inmem layer to flush");
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
// timeline). If two threads tried to flush the same frozen
|
||||
// Only one task may call this function at a time (for this
|
||||
// timeline). If two tasks tried to flush the same frozen
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
|
||||
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
|
||||
|
||||
if let Some(delta_layer) = delta_layer {
|
||||
Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
|
||||
if let Some(l) = delta_layer {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
updates.flush();
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
/// Called when compaction is completed.
|
||||
pub(crate) fn finish_compact_l0(
|
||||
&mut self,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
compact_from: Vec<Arc<dyn PersistentLayer>>,
|
||||
compact_to: Vec<Arc<dyn PersistentLayer>>,
|
||||
metrics: &TimelineMetrics,
|
||||
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
compact_from: Vec<Layer>,
|
||||
compact_to: &[ResidentLayer],
|
||||
duplicates: &[(ResidentLayer, ResidentLayer)],
|
||||
) -> Result<()> {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for l in compact_to {
|
||||
Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
for l in compact_from {
|
||||
// NB: the layer file identified by descriptor `l` is guaranteed to be present
|
||||
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
|
||||
// time, even though we dropped `Timeline::layers` inbetween.
|
||||
Self::delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
l,
|
||||
&mut updates,
|
||||
metrics,
|
||||
&mut self.layer_fmgr,
|
||||
)?;
|
||||
Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr)?;
|
||||
}
|
||||
for (old, new) in duplicates {
|
||||
self.layer_fmgr.replace(old.as_ref(), new.as_ref().clone());
|
||||
}
|
||||
updates.flush();
|
||||
Ok(())
|
||||
@@ -223,28 +213,26 @@ impl LayerManager {
|
||||
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
|
||||
pub(crate) fn finish_gc_timeline(
|
||||
&mut self,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
gc_layers: Vec<Arc<dyn PersistentLayer>>,
|
||||
metrics: &TimelineMetrics,
|
||||
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
gc_layers: Vec<Layer>,
|
||||
) -> Result<ApplyGcResultGuard> {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for doomed_layer in gc_layers {
|
||||
Self::delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
layer_removal_cs,
|
||||
doomed_layer,
|
||||
&mut updates,
|
||||
metrics,
|
||||
&mut self.layer_fmgr,
|
||||
)?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
|
||||
)?;
|
||||
}
|
||||
Ok(ApplyGcResultGuard(updates))
|
||||
}
|
||||
|
||||
/// Helper function to insert a layer into the layer map and file manager.
|
||||
fn insert_historic_layer(
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
layer: Layer,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
mapping: &mut LayerFileManager<Layer>,
|
||||
) {
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
mapping.insert(layer);
|
||||
@@ -254,17 +242,12 @@ impl LayerManager {
|
||||
/// Remote storage is not affected by this operation.
|
||||
fn delete_historic_layer(
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
_layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Layer,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
metrics: &TimelineMetrics,
|
||||
mapping: &mut LayerFileManager,
|
||||
mapping: &mut LayerFileManager<Layer>,
|
||||
) -> anyhow::Result<()> {
|
||||
let desc = layer.layer_desc();
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
metrics.resident_physical_size_gauge.sub(desc.file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
// Maybe instead discard all layer map historic versions that
|
||||
@@ -272,22 +255,21 @@ impl LayerManager {
|
||||
// and mark what we can't delete yet as deleted from the layer
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(desc);
|
||||
mapping.remove(layer);
|
||||
mapping.remove(&layer);
|
||||
layer.garbage_collect_on_drop();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
|
||||
pub(crate) fn contains(&self, layer: &Layer) -> bool {
|
||||
self.layer_fmgr.contains(layer)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
|
||||
HashMap<PersistentLayerKey, Arc<T>>,
|
||||
);
|
||||
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
|
||||
|
||||
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
|
||||
impl<T: AsLayerDesc + Clone + PartialEq + std::fmt::Debug> LayerFileManager<T> {
|
||||
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
|
||||
// The assumption for the `expect()` is that all code maintains the following invariant:
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
self.0
|
||||
@@ -297,14 +279,14 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&mut self, layer: Arc<T>) {
|
||||
pub(crate) fn insert(&mut self, layer: T) {
|
||||
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
|
||||
if present.is_some() && cfg!(debug_assertions) {
|
||||
panic!("overwriting a layer: {:?}", layer.layer_desc())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
|
||||
pub(crate) fn contains(&self, layer: &T) -> bool {
|
||||
self.0.contains_key(&layer.layer_desc().key())
|
||||
}
|
||||
|
||||
@@ -312,7 +294,7 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
pub(crate) fn remove(&mut self, layer: Arc<T>) {
|
||||
pub(crate) fn remove(&mut self, layer: &T) {
|
||||
let present = self.0.remove(&layer.layer_desc().key());
|
||||
if present.is_none() && cfg!(debug_assertions) {
|
||||
panic!(
|
||||
@@ -322,38 +304,13 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
|
||||
let key = expected.layer_desc().key();
|
||||
let other = new.layer_desc().key();
|
||||
pub(crate) fn replace(&mut self, old: &T, new: T) {
|
||||
let key = old.layer_desc().key();
|
||||
assert_eq!(key, new.layer_desc().key());
|
||||
|
||||
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
|
||||
let new_l0 = LayerMap::is_l0(new.layer_desc());
|
||||
|
||||
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
|
||||
"layermap-replace-notfound"
|
||||
));
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"expected and new layer have different keys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"one layer is l0 while the other is not: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
if let Some(layer) = self.0.get_mut(&key) {
|
||||
anyhow::ensure!(
|
||||
compare_arced_layers(&expected, layer),
|
||||
"another layer was found instead of expected, expected={expected:?}, new={new:?}",
|
||||
expected = Arc::as_ptr(&expected),
|
||||
new = Arc::as_ptr(layer),
|
||||
);
|
||||
*layer = new;
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!("layer was not found");
|
||||
if let Some(existing) = self.0.get_mut(&key) {
|
||||
assert_eq!(existing, old);
|
||||
*existing = new;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::metrics::RemoteOpFileKind;
|
||||
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::storage_layer::ResidentLayer;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
@@ -210,7 +211,7 @@ pub(crate) struct Delete {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum UploadOp {
|
||||
/// Upload a layer file
|
||||
UploadLayer(LayerFileName, LayerFileMetadata),
|
||||
UploadLayer(ResidentLayer, LayerFileMetadata),
|
||||
|
||||
/// Upload the metadata file
|
||||
UploadMetadata(IndexPart, Lsn),
|
||||
@@ -225,13 +226,8 @@ pub(crate) enum UploadOp {
|
||||
impl std::fmt::Display for UploadOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
UploadOp::UploadLayer(path, metadata) => {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?})",
|
||||
path.file_name(),
|
||||
metadata.file_size()
|
||||
)
|
||||
UploadOp::UploadLayer(layer, metadata) => {
|
||||
write!(f, "UploadLayer({}, size={:?})", layer, metadata.file_size())
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
|
||||
UploadOp::Delete(delete) => write!(
|
||||
|
||||
@@ -1524,7 +1524,7 @@ class NeonPageserver(PgProtocol):
|
||||
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
|
||||
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
|
||||
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
|
||||
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*compaction_loop.*Compaction failed, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
|
||||
@@ -22,7 +22,7 @@ def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
|
||||
|
||||
# eviction might be the first one after an attach to access the layers
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*unexpectedly on-demand downloading remote layer remote.* for task kind Eviction"
|
||||
".*unexpectedly on-demand downloading remote layer .* for task kind Eviction"
|
||||
)
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
return env
|
||||
|
||||
@@ -15,7 +15,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to load delta layer.*",
|
||||
".*layer loading failed:.*",
|
||||
".*could not find data for key.*",
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
@@ -99,7 +99,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
# Third timeline will also fail during basebackup, because the layer file is corrupt.
|
||||
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
|
||||
# (We don't check layer file contents on startup, when loading the timeline)
|
||||
with pytest.raises(Exception, match="Failed to load delta layer") as err:
|
||||
with pytest.raises(Exception, match="layer loading failed:") as err:
|
||||
pg3.start()
|
||||
log.info(
|
||||
f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"
|
||||
|
||||
@@ -2,35 +2,137 @@ import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
|
||||
# Test duplicate layer detection
|
||||
#
|
||||
# This test sets fail point at the end of first compaction phase:
|
||||
# after flushing new L1 layers but before deletion of L0 layers
|
||||
# it should cause generation of duplicate L1 layer by compaction after restart.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
def test_compaction_duplicates_all(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Makes compact_level0_phase1 return input layers as the output layers with a
|
||||
failpoint as if those L0 inputs would had all been recreated when L1s were
|
||||
supposed to be created.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
|
||||
test_name="test_compaction_duplicates_all",
|
||||
)
|
||||
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "5 s",
|
||||
"compaction_period": "0 s",
|
||||
"compaction_threshold": "3",
|
||||
}
|
||||
)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
|
||||
pageserver_http.configure_failpoints(("compact-level0-phase1-return-same", "return"))
|
||||
# pageserver_http.configure_failpoints(("after-timeline-compacted-first-L1", "exit"))
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
connstr = endpoint.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
|
||||
|
||||
time.sleep(10) # let compaction to be performed
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T200", "-Mprepared", connstr])
|
||||
|
||||
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
This test sets fail point at the end of first compaction phase:
|
||||
after flushing new L1 layers but before deletion of L0 layers
|
||||
it should cause generation of duplicate L1 layer by compaction after restart.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
|
||||
test_name="test_duplicate_layers",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "0 s",
|
||||
"compaction_threshold": "3",
|
||||
}
|
||||
)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
|
||||
pageserver_http.configure_failpoints(("after-timeline-compacted-first-L1", "exit"))
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
connstr = endpoint.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
|
||||
|
||||
with pytest.raises(ConnectionError, match="Remote end closed connection without response"):
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
# pageserver has already exited at this point
|
||||
env.pageserver.stop()
|
||||
|
||||
# now the duplicate L1 has been created, but is not yet uploaded
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
|
||||
# path = env.remote_storage.timeline_path(tenant_id, timeline_id)
|
||||
l1_found = None
|
||||
for path in env.timeline_dir(tenant_id, timeline_id).iterdir():
|
||||
if path.name == "metadata" or path.name.startswith("ephemeral-"):
|
||||
continue
|
||||
|
||||
if len(path.suffixes) > 0:
|
||||
# temp files
|
||||
continue
|
||||
|
||||
[key_range, lsn_range] = path.name.split("__", maxsplit=1)
|
||||
|
||||
if "-" not in lsn_range:
|
||||
# image layer
|
||||
continue
|
||||
|
||||
[key_start, key_end] = key_range.split("-", maxsplit=1)
|
||||
|
||||
if key_start == "0" * 36 and key_end == "F" * 36:
|
||||
# L0
|
||||
continue
|
||||
|
||||
assert l1_found is None, f"found multiple L1: {l1_found.name} and {path.name}"
|
||||
l1_found = path
|
||||
|
||||
assert l1_found is not None, "failed to find L1 locally"
|
||||
original_created_at = l1_found.stat()[8]
|
||||
|
||||
uploaded = env.remote_storage.timeline_path(tenant_id, timeline_id) / l1_found.name
|
||||
assert not uploaded.exists(), "to-be-overwritten should not yet be uploaded"
|
||||
|
||||
# give room for fs timestamps
|
||||
time.sleep(1)
|
||||
|
||||
env.pageserver.start()
|
||||
warning = f".*duplicated L1 layer layer={l1_found.name}"
|
||||
env.pageserver.allowed_errors.append(warning)
|
||||
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
# give time for log flush
|
||||
time.sleep(1)
|
||||
|
||||
env.pageserver.log_contains(warning)
|
||||
|
||||
overwritten_at = l1_found.stat()[8]
|
||||
assert original_created_at < overwritten_at, "expected the L1 to be overwritten"
|
||||
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
|
||||
|
||||
uploaded_at = uploaded.stat()[8]
|
||||
assert overwritten_at <= uploaded_at, "expected the L1 to finally be uploaded"
|
||||
|
||||
# why does compaction not wait for uploads? probably so that we can compact
|
||||
# faster than we can upload in some cases.
|
||||
#
|
||||
# timeline_compact should wait for uploads as well
|
||||
|
||||
@@ -256,34 +256,34 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
|
||||
ps_http.evict_all_layers(tenant_id, timeline_id)
|
||||
|
||||
def ensure_resident_and_remote_size_metrics():
|
||||
log.info("ensure that all the layers are gone")
|
||||
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
|
||||
# we have disabled all background loops, so, this should hold
|
||||
assert len(resident_layers) == 0
|
||||
assert len(resident_layers) == 0, "ensure that all the layers are gone"
|
||||
|
||||
info = ps_http.layer_map_info(tenant_id, timeline_id)
|
||||
log.info("layer map dump: %s", info)
|
||||
|
||||
log.info("ensure that resident_physical_size metric is zero")
|
||||
resident_physical_size_metric = ps_http.get_timeline_metric(
|
||||
tenant_id, timeline_id, "pageserver_resident_physical_size"
|
||||
)
|
||||
assert resident_physical_size_metric == 0
|
||||
log.info("ensure that resident_physical_size metric corresponds to layer map dump")
|
||||
assert (
|
||||
resident_physical_size_metric == 0
|
||||
), "ensure that resident_physical_size metric is zero"
|
||||
assert resident_physical_size_metric == sum(
|
||||
[layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote]
|
||||
)
|
||||
layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote
|
||||
), "ensure that resident_physical_size metric corresponds to layer map dump"
|
||||
|
||||
log.info("ensure that remote_physical_size metric matches layer map")
|
||||
remote_physical_size_metric = ps_http.get_timeline_metric(
|
||||
tenant_id, timeline_id, "pageserver_remote_physical_size"
|
||||
)
|
||||
log.info("ensure that remote_physical_size metric corresponds to layer map dump")
|
||||
assert remote_physical_size_metric == sum(
|
||||
layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote
|
||||
)
|
||||
), "ensure that remote_physical_size metric corresponds to layer map dump"
|
||||
|
||||
log.info("before runnning GC, ensure that remote_physical size is zero")
|
||||
# leaving index_part.json upload from successful compaction out will show
|
||||
# up here as a mismatch between remove_physical_size and summed up layermap
|
||||
# size
|
||||
ensure_resident_and_remote_size_metrics()
|
||||
|
||||
log.info("run GC")
|
||||
|
||||
@@ -13,13 +13,12 @@ from fixtures.neon_fixtures import (
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until_tenant_state,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
|
||||
from fixtures.types import Lsn
|
||||
@@ -391,7 +390,7 @@ def test_download_remote_layers_api(
|
||||
env.pageserver.start(extra_env_vars={"FAILPOINTS": "remote-storage-download-pre-rename=return"})
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
f".*download_all_remote_layers.*{tenant_id}.*{timeline_id}.*layer download failed.*remote-storage-download-pre-rename failpoint",
|
||||
".*download failed: downloading evicted layer file failed.*",
|
||||
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size",
|
||||
]
|
||||
)
|
||||
@@ -658,62 +657,5 @@ def test_compaction_downloads_on_demand_with_image_creation(
|
||||
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_ondemand_download_failure_to_replace(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
"""
|
||||
Make sure that we fail on being unable to replace a RemoteLayer instead of for example livelocking.
|
||||
|
||||
See: https://github.com/neondatabase/neon/issues/3533
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_ondemand_download_failure_to_replace",
|
||||
)
|
||||
|
||||
# disable gc and compaction via default tenant config because config is lost while detaching
|
||||
# so that compaction will not be the one to download the layer but the http handler is
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
"""tenant_config={gc_period = "0s", compaction_period = "0s"}"""
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# remove layers so that they will be redownloaded
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
|
||||
pageserver_http.configure_failpoints(("layermap-replace-notfound", "return"))
|
||||
|
||||
# requesting details with non-incremental size should trigger a download of the only layer
|
||||
# this will need to be adjusted if an index for logical sizes is ever implemented
|
||||
with pytest.raises(PageserverApiException):
|
||||
# PageserverApiException is expected because of the failpoint (timeline_detail building does something)
|
||||
# ReadTimeout can happen on our busy CI, but it should not, because there is no more busylooping
|
||||
# but should it be added back, we would wait for 15s here.
|
||||
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=15)
|
||||
|
||||
actual_message = ".* ERROR .*layermap-replace-notfound"
|
||||
assert env.pageserver.log_contains(actual_message) is not None
|
||||
env.pageserver.allowed_errors.append(actual_message)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".* ERROR .*Error processing HTTP request: InternalServerError\\(get local timeline info"
|
||||
)
|
||||
# this might get to run and attempt on-demand, but not always
|
||||
env.pageserver.allowed_errors.append(".* ERROR .*Task 'initial size calculation'")
|
||||
|
||||
# if the above returned, then we didn't have a livelock, and all is well
|
||||
|
||||
|
||||
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
|
||||
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))
|
||||
|
||||
@@ -603,7 +603,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
log.info("sending delete request")
|
||||
checkpoint_allowed_to_fail.set()
|
||||
env.pageserver.allowed_errors.append(
|
||||
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
|
||||
".* ERROR .*Error processing HTTP request: InternalServerError\\(The timeline or pageserver is shutting down"
|
||||
)
|
||||
|
||||
# Generous timeout, because currently deletions can get blocked waiting for compaction
|
||||
@@ -861,10 +861,8 @@ def test_compaction_delete_before_upload(
|
||||
# Ensure that this actually terminates
|
||||
wait_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
|
||||
# For now we are hitting this message.
|
||||
# Maybe in the future the underlying race condition will be fixed,
|
||||
# but until then, ensure that this message is hit instead.
|
||||
assert env.pageserver.log_contains(
|
||||
# fixed in #4938
|
||||
assert not env.pageserver.log_contains(
|
||||
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
|
||||
)
|
||||
|
||||
|
||||
@@ -688,7 +688,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Broken.*"
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: (Stopping|Broken).*"
|
||||
)
|
||||
|
||||
# ignore the tenant and remove its metadata
|
||||
|
||||
@@ -239,9 +239,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*removing local file .* because it has unexpected length.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*removing local file .* because .*")
|
||||
|
||||
# FIXME: Are these expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
|
||||
Reference in New Issue
Block a user