mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
Compare commits
12 Commits
cross_regi
...
problame/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0bf9db2e5d | ||
|
|
7ef080c404 | ||
|
|
f557790969 | ||
|
|
9ebccbcdd5 | ||
|
|
b6cb362f11 | ||
|
|
88baa4fff7 | ||
|
|
d4a86a415b | ||
|
|
142eabe390 | ||
|
|
5e87cedb95 | ||
|
|
b460f617e9 | ||
|
|
cdce04d721 | ||
|
|
6bac770811 |
@@ -799,8 +799,12 @@ impl PageCache {
|
||||
// a different victim. But if the problem persists, the page cache
|
||||
// could fill up with dirty pages that we cannot evict, and we will
|
||||
// loop retrying the writebacks indefinitely.
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
if cfg!(test) {
|
||||
anyhow::bail!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
} else {
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1601,9 +1601,6 @@ pub fn create_test_timeline(
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
|
||||
@@ -86,6 +86,7 @@ pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
pub mod manifest;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
@@ -488,6 +489,7 @@ impl std::fmt::Display for WaitToBecomeActiveError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ShutdownError {
|
||||
AlreadyStopping,
|
||||
}
|
||||
@@ -1267,6 +1269,18 @@ impl Tenant {
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
///
|
||||
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
|
||||
/// and the timeline will fail to load at a restart.
|
||||
///
|
||||
/// That's why we add an uninit mark file, and wrap it together witht the Timeline
|
||||
/// in-memory object into UninitializedTimeline.
|
||||
/// Once the caller is done setting up the timeline, they should call
|
||||
/// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
|
||||
///
|
||||
/// For tests, use `DatadirModification::init_empty` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a working timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
@@ -1315,8 +1329,21 @@ impl Tenant {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
|
||||
// Setup minimum keys required for the timeline to be usable.
|
||||
let mut modification = uninit_tl
|
||||
.raw_timeline()
|
||||
.expect("we just created it")
|
||||
.begin_modification(initdb_lsn);
|
||||
modification.init_empty().context("init_empty")?;
|
||||
modification
|
||||
.commit()
|
||||
.context("commit init_empty modification")?;
|
||||
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
|
||||
// load_layers=false because create_empty_timeline already did that what's necessary (set next_open_layer)
|
||||
// and modification.init_empty() already created layers.
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -1900,6 +1927,8 @@ impl Tenant {
|
||||
/// This will attempt to shutdown even if tenant is broken.
|
||||
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
debug!("start");
|
||||
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
//
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
@@ -1946,6 +1975,7 @@ impl Tenant {
|
||||
// this will additionally shutdown and await all timeline tasks.
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
|
||||
|
||||
debug!("complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3422,6 +3452,7 @@ pub mod harness {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
tenant: std::sync::Mutex<Option<Arc<Tenant>>>,
|
||||
|
||||
pub lock_guard: (
|
||||
Option<RwLockReadGuard<'a, ()>>,
|
||||
@@ -3481,6 +3512,7 @@ pub mod harness {
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
lock_guard,
|
||||
tenant: std::sync::Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3529,6 +3561,7 @@ pub mod harness {
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
}
|
||||
*self.tenant.lock().unwrap() = Some(Arc::clone(&tenant));
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
@@ -3537,6 +3570,32 @@ pub mod harness {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for TenantHarness<'a> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tenant) = self.tenant.lock().unwrap().take() {
|
||||
// Shutdown with freeze_and_flush so that we don't drop `EphemeralFile` objects in `InMemoryLayer`s.
|
||||
// Without this, we remove the ephemeral files on disk but they remain in pageserver's PageCache.
|
||||
// This causes write-back failures down the line.
|
||||
let tenant = Arc::clone(&tenant);
|
||||
std::thread::Builder::new()
|
||||
.name("TenantHarness::drop thread".to_owned())
|
||||
.spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(tenant.shutdown(true).instrument(
|
||||
info_span!("tenant_harness_drop_shutdown", tenant_id=%tenant.tenant_id),
|
||||
))
|
||||
.unwrap()
|
||||
})
|
||||
.unwrap()
|
||||
.join()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mock WAL redo manager that doesn't do much
|
||||
pub struct TestRedoManager;
|
||||
|
||||
@@ -3585,8 +3644,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_basic")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3616,12 +3677,12 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_duplicate_timelines() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("no_duplicate_timelines")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -3648,9 +3709,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_branch() -> anyhow::Result<()> {
|
||||
use std::str::from_utf8;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_branch")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -3743,11 +3806,12 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3779,10 +3843,9 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
|
||||
.load()
|
||||
.await;
|
||||
let harness = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
@@ -3830,11 +3893,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3878,11 +3940,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3901,11 +3963,10 @@ mod tests {
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3938,7 +3999,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
@@ -3958,7 +4019,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
@@ -3995,7 +4056,8 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
@@ -4032,8 +4094,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_images")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -4097,8 +4161,10 @@ mod tests {
|
||||
//
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_bulk_insert")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -4139,8 +4205,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_random_updates")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4152,7 +4220,7 @@ mod tests {
|
||||
// a read sees the latest page version.
|
||||
let mut updated = [Lsn(0); NUM_KEYS];
|
||||
|
||||
let mut lsn = Lsn(0);
|
||||
let mut lsn = Lsn(0x10);
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
@@ -4210,11 +4278,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_traverse_branches() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let harness = TenantHarness::create("test_traverse_branches")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4226,7 +4293,7 @@ mod tests {
|
||||
// a read sees the latest page version.
|
||||
let mut updated = [Lsn(0); NUM_KEYS];
|
||||
|
||||
let mut lsn = Lsn(0);
|
||||
let mut lsn = Lsn(0x10);
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
@@ -4293,11 +4360,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_traverse_ancestors() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let harness = TenantHarness::create("test_traverse_ancestors")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -4306,7 +4372,7 @@ mod tests {
|
||||
// Track page mutation lsns across different timelines.
|
||||
let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
|
||||
|
||||
let mut lsn = Lsn(0);
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for idx in 0..NUM_TLINES {
|
||||
@@ -4352,6 +4418,28 @@ mod tests {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_test_timeline_is_usable() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_empty_test_timeline_is_usable")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x20), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
// Make sure the timeline has the minimum set of required keys for operation.
|
||||
// The only operation you can do on an empty timeline is to write new data.
|
||||
// Repartition is the only code on the write path that requires other keys to be present.
|
||||
// Make sure it works.
|
||||
{
|
||||
let cache = tline.partitioning.lock().unwrap();
|
||||
assert_eq!(cache.1, Lsn(0), "must not have repartitioned yet, otherwise the repartition call below might just use the cache");
|
||||
}
|
||||
tline
|
||||
.repartition(Lsn(0x20), tline.get_compaction_target_size(), &ctx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
|
||||
@@ -39,7 +39,7 @@ pub struct EphemeralFile {
|
||||
file_id: u64,
|
||||
_tenant_id: TenantId,
|
||||
_timeline_id: TimelineId,
|
||||
file: Arc<VirtualFile>,
|
||||
file: Option<Arc<VirtualFile>>,
|
||||
|
||||
pub size: u64,
|
||||
}
|
||||
@@ -52,7 +52,10 @@ impl EphemeralFile {
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
let mut l = EPHEMERAL_FILES.write().unwrap();
|
||||
let file_id = l.next_file_id;
|
||||
l.next_file_id += 1;
|
||||
l.next_file_id = l
|
||||
.next_file_id
|
||||
.checked_add(1)
|
||||
.expect("next_file_id is u64, expecting it to not overflow");
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&timeline_id, &tenant_id)
|
||||
@@ -60,16 +63,30 @@ impl EphemeralFile {
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
// The next_file_id doesn't overlfow, so technically, `create_new` is not needed.
|
||||
// But it's cheap, so why not.
|
||||
.create_new(true),
|
||||
)?;
|
||||
let file_rc = Arc::new(file);
|
||||
l.files.insert(file_id, file_rc.clone());
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
debug!(
|
||||
"created ephemeral file {}\n{}",
|
||||
filename.display(),
|
||||
std::backtrace::Backtrace::force_capture()
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
debug!("created ephemeral file {}", filename.display());
|
||||
|
||||
Ok(EphemeralFile {
|
||||
file_id,
|
||||
_tenant_id: tenant_id,
|
||||
_timeline_id: timeline_id,
|
||||
file: file_rc,
|
||||
file: Some(file_rc),
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
@@ -79,6 +96,8 @@ impl EphemeralFile {
|
||||
while off < PAGE_SZ {
|
||||
let n = self
|
||||
.file
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
|
||||
|
||||
if n == 0 {
|
||||
@@ -261,17 +280,43 @@ impl Drop for EphemeralFile {
|
||||
cache.drop_buffers_for_ephemeral(self.file_id);
|
||||
|
||||
// remove entry from the hash map
|
||||
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
|
||||
let virtual_file = EPHEMERAL_FILES
|
||||
.write()
|
||||
.unwrap()
|
||||
.files
|
||||
.remove(&self.file_id)
|
||||
.unwrap();
|
||||
|
||||
// remove file from self
|
||||
let self_file = self.file.take().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Arc::as_ptr(&virtual_file) as *const (),
|
||||
Arc::as_ptr(&self_file) as *const ()
|
||||
);
|
||||
drop(self_file);
|
||||
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&virtual_file), 0);
|
||||
let virtual_file = Arc::try_unwrap(virtual_file).expect(
|
||||
"we are being dropped and EPHEMERAL_FILES is the only other place where we put the Arc",
|
||||
);
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.file.path);
|
||||
if let Err(e) = res {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
// TODO: we should be able to unwrap here, but, timeline delete and tenant detach do
|
||||
// std::fs::remove_dir_all without dropping all InMemoryLayer => EphemeralFile
|
||||
// of the tenant => need to fix that first.
|
||||
match virtual_file.remove() {
|
||||
Ok(()) => (),
|
||||
Err((virtual_file, e)) => {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
virtual_file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
325
pageserver/src/tenant/manifest.rs
Normal file
325
pageserver/src/tenant/manifest.rs
Normal file
@@ -0,0 +1,325 @@
|
||||
//! This module contains the encoding and decoding of the local manifest file.
|
||||
//!
|
||||
//! MANIFEST is a write-ahead log which is stored locally to each timeline. It
|
||||
//! records the state of the storage engine. It contains a snapshot of the
|
||||
//! state and all operations proceeding that snapshot. The file begins with a
|
||||
//! header recording MANIFEST version number. After that, it contains a snapshot.
|
||||
//! The snapshot is followed by a list of operations. Each operation is a list
|
||||
//! of records. Each record is either an addition or a removal of a layer.
|
||||
//!
|
||||
//! With MANIFEST, we can:
|
||||
//!
|
||||
//! 1. recover state quickly by reading the file, potentially boosting the
|
||||
//! startup speed.
|
||||
//! 2. ensure all operations are atomic and avoid corruption, solving issues
|
||||
//! like redundant image layer and preparing us for future compaction
|
||||
//! strategies.
|
||||
//!
|
||||
//! There is also a format for storing all layer files on S3, called
|
||||
//! `index_part.json`. Compared with index_part, MANIFEST is an WAL which
|
||||
//! records all operations as logs, and therefore we can easily replay the
|
||||
//! operations when recovering from crash, while ensuring those operations
|
||||
//! are atomic upon restart.
|
||||
//!
|
||||
//! Currently, this is not used in the system. Future refactors will ensure
|
||||
//! the storage state will be recorded in this file, and the system can be
|
||||
//! recovered from this file. This is tracked in
|
||||
//! https://github.com/neondatabase/neon/issues/4418
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use anyhow::Result;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crc32c::crc32c;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::log::warn;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
|
||||
pub struct Manifest {
|
||||
file: VirtualFile,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub struct Snapshot {
|
||||
pub layers: Vec<PersistentLayerDesc>,
|
||||
}
|
||||
|
||||
/// serde by default encode this in tagged enum, and therefore it will be something
|
||||
/// like `{ "AddLayer": { ... } }`.
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub enum Record {
|
||||
AddLayer(PersistentLayerDesc),
|
||||
RemoveLayer(PersistentLayerDesc),
|
||||
}
|
||||
|
||||
/// `echo neon.manifest | sha1sum` and take the leading 8 bytes.
|
||||
const MANIFEST_MAGIC_NUMBER: u64 = 0xf5c44592b806109c;
|
||||
const MANIFEST_VERSION: u64 = 1;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub struct ManifestHeader {
|
||||
magic_number: u64,
|
||||
version: u64,
|
||||
}
|
||||
|
||||
const MANIFEST_HEADER_LEN: usize = 16;
|
||||
|
||||
impl ManifestHeader {
|
||||
fn encode(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::with_capacity(MANIFEST_HEADER_LEN);
|
||||
buf.put_u64(self.magic_number);
|
||||
buf.put_u64(self.version);
|
||||
buf
|
||||
}
|
||||
|
||||
fn decode(mut buf: &[u8]) -> Self {
|
||||
assert!(buf.len() == MANIFEST_HEADER_LEN, "invalid header");
|
||||
Self {
|
||||
magic_number: buf.get_u64(),
|
||||
version: buf.get_u64(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
|
||||
pub enum Operation {
|
||||
/// A snapshot of the current state.
|
||||
///
|
||||
/// Lsn field represents the LSN that is persisted to disk for this snapshot.
|
||||
Snapshot(Snapshot, Lsn),
|
||||
/// An atomic operation that changes the state.
|
||||
///
|
||||
/// Lsn field represents the LSN that is persisted to disk after the operation is done.
|
||||
/// This will only change when new L0 is flushed to the disk.
|
||||
Operation(Vec<Record>, Lsn),
|
||||
}
|
||||
|
||||
struct RecordHeader {
|
||||
size: u32,
|
||||
checksum: u32,
|
||||
}
|
||||
|
||||
const RECORD_HEADER_LEN: usize = 8;
|
||||
|
||||
impl RecordHeader {
|
||||
fn encode(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::with_capacity(RECORD_HEADER_LEN);
|
||||
buf.put_u32(self.size);
|
||||
buf.put_u32(self.checksum);
|
||||
buf
|
||||
}
|
||||
|
||||
fn decode(mut buf: &[u8]) -> Self {
|
||||
assert!(buf.len() == RECORD_HEADER_LEN, "invalid header");
|
||||
Self {
|
||||
size: buf.get_u32(),
|
||||
checksum: buf.get_u32(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ManifestLoadError {
|
||||
#[error("manifest header is corrupted")]
|
||||
CorruptedManifestHeader,
|
||||
#[error("unsupported manifest version: got {0}, expected {1}")]
|
||||
UnsupportedVersion(u64, u64),
|
||||
#[error("error when decoding record: {0}")]
|
||||
DecodeRecord(serde_json::Error),
|
||||
#[error("I/O error: {0}")]
|
||||
Io(io::Error),
|
||||
}
|
||||
|
||||
#[must_use = "Should check if the manifest is partially corrupted"]
|
||||
pub struct ManifestPartiallyCorrupted(bool);
|
||||
|
||||
impl Manifest {
|
||||
/// Create a new manifest by writing the manifest header and a snapshot record to the given file.
|
||||
pub fn init(file: VirtualFile, snapshot: Snapshot, lsn: Lsn) -> Result<Self> {
|
||||
let mut manifest = Self { file };
|
||||
manifest.append_manifest_header(ManifestHeader {
|
||||
magic_number: MANIFEST_MAGIC_NUMBER,
|
||||
version: MANIFEST_VERSION,
|
||||
})?;
|
||||
manifest.append_operation(Operation::Snapshot(snapshot, lsn))?;
|
||||
Ok(manifest)
|
||||
}
|
||||
|
||||
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
|
||||
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
|
||||
/// backup the current one.
|
||||
pub fn load(
|
||||
mut file: VirtualFile,
|
||||
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
||||
let mut buf = vec![];
|
||||
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
|
||||
|
||||
// Read manifest header
|
||||
let mut buf = Bytes::from(buf);
|
||||
if buf.remaining() < MANIFEST_HEADER_LEN {
|
||||
return Err(ManifestLoadError::CorruptedManifestHeader);
|
||||
}
|
||||
let header = ManifestHeader::decode(&buf[..MANIFEST_HEADER_LEN]);
|
||||
buf.advance(MANIFEST_HEADER_LEN);
|
||||
if header.version != MANIFEST_VERSION {
|
||||
return Err(ManifestLoadError::UnsupportedVersion(
|
||||
header.version,
|
||||
MANIFEST_VERSION,
|
||||
));
|
||||
}
|
||||
|
||||
// Read operations
|
||||
let mut operations = Vec::new();
|
||||
let corrupted = loop {
|
||||
if buf.remaining() == 0 {
|
||||
break false;
|
||||
}
|
||||
if buf.remaining() < RECORD_HEADER_LEN {
|
||||
warn!("incomplete header when decoding manifest, could be corrupted");
|
||||
break true;
|
||||
}
|
||||
let RecordHeader { size, checksum } = RecordHeader::decode(&buf[..RECORD_HEADER_LEN]);
|
||||
let size = size as usize;
|
||||
buf.advance(RECORD_HEADER_LEN);
|
||||
if buf.remaining() < size {
|
||||
warn!("incomplete data when decoding manifest, could be corrupted");
|
||||
break true;
|
||||
}
|
||||
let data = &buf[..size];
|
||||
if crc32c(data) != checksum {
|
||||
warn!("checksum mismatch when decoding manifest, could be corrupted");
|
||||
break true;
|
||||
}
|
||||
// if the following decode fails, we cannot use the manifest or safely ignore any record.
|
||||
operations.push(serde_json::from_slice(data).map_err(ManifestLoadError::DecodeRecord)?);
|
||||
buf.advance(size);
|
||||
};
|
||||
Ok((
|
||||
Self { file },
|
||||
operations,
|
||||
ManifestPartiallyCorrupted(corrupted),
|
||||
))
|
||||
}
|
||||
|
||||
fn append_data(&mut self, data: &[u8]) -> Result<()> {
|
||||
if data.len() >= u32::MAX as usize {
|
||||
panic!("data too large");
|
||||
}
|
||||
let header = RecordHeader {
|
||||
size: data.len() as u32,
|
||||
checksum: crc32c(data),
|
||||
};
|
||||
let header = header.encode();
|
||||
self.file.write_all(&header)?;
|
||||
self.file.write_all(data)?;
|
||||
self.file.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_manifest_header(&mut self, header: ManifestHeader) -> Result<()> {
|
||||
let encoded = header.encode();
|
||||
self.file.write_all(&encoded)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add an operation to the manifest. The operation will be appended to the end of the file,
|
||||
/// and the file will fsync.
|
||||
pub fn append_operation(&mut self, operation: Operation) -> Result<()> {
|
||||
let encoded = Vec::from(serde_json::to_string(&operation)?);
|
||||
self.append_data(&encoded)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
use crate::repository::Key;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_read_manifest() {
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
|
||||
let layer1 = PersistentLayerDesc::new_test(Key::from_i128(0)..Key::from_i128(233));
|
||||
let layer2 = PersistentLayerDesc::new_test(Key::from_i128(233)..Key::from_i128(2333));
|
||||
let layer3 = PersistentLayerDesc::new_test(Key::from_i128(2333)..Key::from_i128(23333));
|
||||
let layer4 = PersistentLayerDesc::new_test(Key::from_i128(23333)..Key::from_i128(233333));
|
||||
|
||||
// Write a manifest with a snapshot and some operations
|
||||
let snapshot = Snapshot {
|
||||
layers: vec![layer1, layer2],
|
||||
};
|
||||
let mut manifest = Manifest::init(file, snapshot.clone(), Lsn::from(0)).unwrap();
|
||||
manifest
|
||||
.append_operation(Operation::Operation(
|
||||
vec![Record::AddLayer(layer3.clone())],
|
||||
Lsn::from(1),
|
||||
))
|
||||
.unwrap();
|
||||
drop(manifest);
|
||||
|
||||
// Open the second time and write
|
||||
let file = VirtualFile::open_with_options(
|
||||
&testdir.join("MANIFEST"),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create_new(false)
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 2);
|
||||
assert_eq!(
|
||||
&operations[0],
|
||||
&Operation::Snapshot(snapshot.clone(), Lsn::from(0))
|
||||
);
|
||||
assert_eq!(
|
||||
&operations[1],
|
||||
&Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1))
|
||||
);
|
||||
manifest
|
||||
.append_operation(Operation::Operation(
|
||||
vec![
|
||||
Record::RemoveLayer(layer3.clone()),
|
||||
Record::AddLayer(layer4.clone()),
|
||||
],
|
||||
Lsn::from(2),
|
||||
))
|
||||
.unwrap();
|
||||
drop(manifest);
|
||||
|
||||
// Open the third time and verify
|
||||
let file = VirtualFile::open_with_options(
|
||||
&testdir.join("MANIFEST"),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create_new(false)
|
||||
.truncate(false),
|
||||
)
|
||||
.unwrap();
|
||||
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
||||
assert!(!corrupted.0);
|
||||
assert_eq!(operations.len(), 3);
|
||||
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
|
||||
assert_eq!(
|
||||
&operations[1],
|
||||
&Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1))
|
||||
);
|
||||
assert_eq!(
|
||||
&operations[2],
|
||||
&Operation::Operation(
|
||||
vec![Record::RemoveLayer(layer3), Record::AddLayer(layer4)],
|
||||
Lsn::from(2)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1392,7 +1392,7 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -917,7 +917,7 @@ impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
match inner.blob_writer.into_inner().into_inner() {
|
||||
Ok(vfile) => vfile.remove(),
|
||||
Ok(vfile) => vfile.remove().unwrap(),
|
||||
Err(err) => warn!(
|
||||
"error while flushing buffer of image layer temporary file: {}",
|
||||
err
|
||||
|
||||
@@ -709,7 +709,7 @@ impl ImageLayerWriter {
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
inner.blob_writer.into_inner().remove().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,10 +9,12 @@ use crate::{context::RequestContext, repository::Key};
|
||||
|
||||
use super::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
|
||||
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
|
||||
/// a unified way to generate layer information like file name.
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct PersistentLayerDesc {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
@@ -50,6 +52,19 @@ impl PersistentLayerDesc {
|
||||
self.filename().file_name()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new_test(key_range: Range<Key>) -> Self {
|
||||
Self {
|
||||
tenant_id: TenantId::generate(),
|
||||
timeline_id: TimelineId::generate(),
|
||||
key_range,
|
||||
lsn_range: Lsn(0)..Lsn(1),
|
||||
is_delta: false,
|
||||
is_incremental: false,
|
||||
file_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_img(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
|
||||
@@ -216,7 +216,7 @@ pub struct Timeline {
|
||||
pub initdb_lsn: Lsn,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
pub(super) partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
@@ -684,8 +684,11 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
debug!("start");
|
||||
self.freeze_inmem_layer(false);
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
let ret = self.flush_frozen_layers_and_wait().await;
|
||||
debug!(is_err = ret.is_err(), "complete");
|
||||
ret
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
@@ -1306,7 +1309,7 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
pub(super) fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
@@ -2386,8 +2389,11 @@ impl Timeline {
|
||||
ValueReconstructResult::Missing => {
|
||||
return Err(layer_traversal_error(
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key, cont_lsn, request_lsn
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
|
||||
key,
|
||||
cont_lsn,
|
||||
request_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
),
|
||||
traversal_path,
|
||||
));
|
||||
@@ -2859,14 +2865,21 @@ impl Timeline {
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
let l = layers.frozen_layers.pop_front().unwrap();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
// timeline). If two threads tried to flush the same frozen
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer));
|
||||
|
||||
// release lock on 'layers'
|
||||
assert!(LayerMap::compare_arced_layers(&l, &frozen_layer));
|
||||
drop(frozen_layer);
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&l), 0);
|
||||
let l =
|
||||
Arc::try_unwrap(l).expect("no-one except us holds references to this layer");
|
||||
drop(layers); // don't hold layer map lock when doing disk IO
|
||||
info!("dropping frozen layer, this should remove the ephemeral file on disk");
|
||||
drop(l);
|
||||
}
|
||||
|
||||
fail_point!("checkpoint-after-sync");
|
||||
@@ -3000,7 +3013,7 @@ impl Timeline {
|
||||
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
async fn repartition(
|
||||
pub(super) async fn repartition(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partition_size: u64,
|
||||
|
||||
@@ -1324,7 +1324,7 @@ mod tests {
|
||||
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -324,16 +324,8 @@ impl VirtualFile {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
/// Idempotently close the file descriptor we might have or have not open for this VirtualFile.
|
||||
pub fn close(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
@@ -351,6 +343,26 @@ impl Drop for VirtualFile {
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
}
|
||||
}
|
||||
|
||||
/// Caller can retry if we return an `Err`.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn remove(mut self) -> Result<(), (Self, std::io::Error)> {
|
||||
// close our fd before unlink system call, so that the unlink actually performs the removal
|
||||
self.close();
|
||||
// Try to remove file on disk.
|
||||
// If it fails, we idempotently closed the fd, but the caller can choose to retry.
|
||||
match std::fs::remove_file(&self.path) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => Err((self, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for VirtualFile {
|
||||
|
||||
@@ -1208,7 +1208,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let harness = TenantHarness::create("test_relsize")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1427,7 +1428,8 @@ mod tests {
|
||||
// and then created it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let harness = TenantHarness::create("test_drop_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1496,7 +1498,8 @@ mod tests {
|
||||
// and then extended it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let harness = TenantHarness::create("test_truncate_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1636,7 +1639,8 @@ mod tests {
|
||||
/// split into multiple 1 GB segments in Postgres.
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let harness = TenantHarness::create("test_large_rel")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
|
||||
@@ -1,10 +1,63 @@
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# Just start and measure duration.
|
||||
#
|
||||
# This test runs pretty quickly and can be informative when used in combination
|
||||
# with emulated network delay. Some useful delay commands:
|
||||
#
|
||||
# 1. Add 2msec delay to all localhost traffic
|
||||
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
|
||||
#
|
||||
# 2. Test that it works (you should see 4ms ping)
|
||||
# `ping localhost`
|
||||
#
|
||||
# 3. Revert back to normal
|
||||
# `sudo tc qdisc del dev lo root netem`
|
||||
#
|
||||
# NOTE this test might not represent the real startup time because the basebackup
|
||||
# for a large database might be larger if there's a lof of transaction metadata,
|
||||
# or safekeepers might need more syncing, or there might be more operations to
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{i}_start_and_select"):
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
|
||||
"basebackup_ms": f"{i}_basebackup",
|
||||
"config_ms": f"{i}_config",
|
||||
"total_startup_ms": f"{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
|
||||
Reference in New Issue
Block a user