mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
10 Commits
release-pr
...
problame/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0bf9db2e5d | ||
|
|
7ef080c404 | ||
|
|
f557790969 | ||
|
|
9ebccbcdd5 | ||
|
|
b6cb362f11 | ||
|
|
88baa4fff7 | ||
|
|
d4a86a415b | ||
|
|
142eabe390 | ||
|
|
5e87cedb95 | ||
|
|
b460f617e9 |
@@ -799,8 +799,12 @@ impl PageCache {
|
||||
// a different victim. But if the problem persists, the page cache
|
||||
// could fill up with dirty pages that we cannot evict, and we will
|
||||
// loop retrying the writebacks indefinitely.
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
if cfg!(test) {
|
||||
anyhow::bail!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
} else {
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1601,9 +1601,6 @@ pub fn create_test_timeline(
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
|
||||
@@ -489,6 +489,7 @@ impl std::fmt::Display for WaitToBecomeActiveError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ShutdownError {
|
||||
AlreadyStopping,
|
||||
}
|
||||
@@ -1268,6 +1269,18 @@ impl Tenant {
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
///
|
||||
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
|
||||
/// and the timeline will fail to load at a restart.
|
||||
///
|
||||
/// That's why we add an uninit mark file, and wrap it together witht the Timeline
|
||||
/// in-memory object into UninitializedTimeline.
|
||||
/// Once the caller is done setting up the timeline, they should call
|
||||
/// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
|
||||
///
|
||||
/// For tests, use `DatadirModification::init_empty` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a working timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
@@ -1316,8 +1329,21 @@ impl Tenant {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
|
||||
// Setup minimum keys required for the timeline to be usable.
|
||||
let mut modification = uninit_tl
|
||||
.raw_timeline()
|
||||
.expect("we just created it")
|
||||
.begin_modification(initdb_lsn);
|
||||
modification.init_empty().context("init_empty")?;
|
||||
modification
|
||||
.commit()
|
||||
.context("commit init_empty modification")?;
|
||||
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
|
||||
// load_layers=false because create_empty_timeline already did that what's necessary (set next_open_layer)
|
||||
// and modification.init_empty() already created layers.
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -1901,6 +1927,8 @@ impl Tenant {
|
||||
/// This will attempt to shutdown even if tenant is broken.
|
||||
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
debug!("start");
|
||||
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
//
|
||||
// Since we can only transition into Stopping state after activation is complete,
|
||||
@@ -1947,6 +1975,7 @@ impl Tenant {
|
||||
// this will additionally shutdown and await all timeline tasks.
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
|
||||
|
||||
debug!("complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3423,6 +3452,7 @@ pub mod harness {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
tenant: std::sync::Mutex<Option<Arc<Tenant>>>,
|
||||
|
||||
pub lock_guard: (
|
||||
Option<RwLockReadGuard<'a, ()>>,
|
||||
@@ -3482,6 +3512,7 @@ pub mod harness {
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
lock_guard,
|
||||
tenant: std::sync::Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3530,6 +3561,7 @@ pub mod harness {
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
}
|
||||
*self.tenant.lock().unwrap() = Some(Arc::clone(&tenant));
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
@@ -3538,6 +3570,32 @@ pub mod harness {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for TenantHarness<'a> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tenant) = self.tenant.lock().unwrap().take() {
|
||||
// Shutdown with freeze_and_flush so that we don't drop `EphemeralFile` objects in `InMemoryLayer`s.
|
||||
// Without this, we remove the ephemeral files on disk but they remain in pageserver's PageCache.
|
||||
// This causes write-back failures down the line.
|
||||
let tenant = Arc::clone(&tenant);
|
||||
std::thread::Builder::new()
|
||||
.name("TenantHarness::drop thread".to_owned())
|
||||
.spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(tenant.shutdown(true).instrument(
|
||||
info_span!("tenant_harness_drop_shutdown", tenant_id=%tenant.tenant_id),
|
||||
))
|
||||
.unwrap()
|
||||
})
|
||||
.unwrap()
|
||||
.join()
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mock WAL redo manager that doesn't do much
|
||||
pub struct TestRedoManager;
|
||||
|
||||
@@ -3586,8 +3644,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_basic")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3617,12 +3677,12 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_duplicate_timelines() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("no_duplicate_timelines")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -3649,9 +3709,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_branch() -> anyhow::Result<()> {
|
||||
use std::str::from_utf8;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_branch")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -3744,11 +3806,12 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3780,10 +3843,9 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
|
||||
.load()
|
||||
.await;
|
||||
let harness = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?;
|
||||
let harness = harness;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
@@ -3831,11 +3893,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3879,11 +3940,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness =
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3902,11 +3963,10 @@ mod tests {
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3939,7 +3999,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
@@ -3959,7 +4019,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
@@ -3996,7 +4056,8 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
@@ -4033,8 +4094,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_images")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -4098,8 +4161,10 @@ mod tests {
|
||||
//
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_bulk_insert")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -4140,8 +4205,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let harness = TenantHarness::create("test_random_updates")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4153,7 +4220,7 @@ mod tests {
|
||||
// a read sees the latest page version.
|
||||
let mut updated = [Lsn(0); NUM_KEYS];
|
||||
|
||||
let mut lsn = Lsn(0);
|
||||
let mut lsn = Lsn(0x10);
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
@@ -4211,11 +4278,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_traverse_branches() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let harness = TenantHarness::create("test_traverse_branches")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4227,7 +4293,7 @@ mod tests {
|
||||
// a read sees the latest page version.
|
||||
let mut updated = [Lsn(0); NUM_KEYS];
|
||||
|
||||
let mut lsn = Lsn(0);
|
||||
let mut lsn = Lsn(0x10);
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
@@ -4294,11 +4360,10 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_traverse_ancestors() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let harness = TenantHarness::create("test_traverse_ancestors")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -4307,7 +4372,7 @@ mod tests {
|
||||
// Track page mutation lsns across different timelines.
|
||||
let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
|
||||
|
||||
let mut lsn = Lsn(0);
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
#[allow(clippy::needless_range_loop)]
|
||||
for idx in 0..NUM_TLINES {
|
||||
@@ -4353,6 +4418,28 @@ mod tests {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_test_timeline_is_usable() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_empty_test_timeline_is_usable")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x20), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
// Make sure the timeline has the minimum set of required keys for operation.
|
||||
// The only operation you can do on an empty timeline is to write new data.
|
||||
// Repartition is the only code on the write path that requires other keys to be present.
|
||||
// Make sure it works.
|
||||
{
|
||||
let cache = tline.partitioning.lock().unwrap();
|
||||
assert_eq!(cache.1, Lsn(0), "must not have repartitioned yet, otherwise the repartition call below might just use the cache");
|
||||
}
|
||||
tline
|
||||
.repartition(Lsn(0x20), tline.get_compaction_target_size(), &ctx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
|
||||
@@ -39,7 +39,7 @@ pub struct EphemeralFile {
|
||||
file_id: u64,
|
||||
_tenant_id: TenantId,
|
||||
_timeline_id: TimelineId,
|
||||
file: Arc<VirtualFile>,
|
||||
file: Option<Arc<VirtualFile>>,
|
||||
|
||||
pub size: u64,
|
||||
}
|
||||
@@ -52,7 +52,10 @@ impl EphemeralFile {
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
let mut l = EPHEMERAL_FILES.write().unwrap();
|
||||
let file_id = l.next_file_id;
|
||||
l.next_file_id += 1;
|
||||
l.next_file_id = l
|
||||
.next_file_id
|
||||
.checked_add(1)
|
||||
.expect("next_file_id is u64, expecting it to not overflow");
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&timeline_id, &tenant_id)
|
||||
@@ -60,16 +63,30 @@ impl EphemeralFile {
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
// The next_file_id doesn't overlfow, so technically, `create_new` is not needed.
|
||||
// But it's cheap, so why not.
|
||||
.create_new(true),
|
||||
)?;
|
||||
let file_rc = Arc::new(file);
|
||||
l.files.insert(file_id, file_rc.clone());
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
debug!(
|
||||
"created ephemeral file {}\n{}",
|
||||
filename.display(),
|
||||
std::backtrace::Backtrace::force_capture()
|
||||
);
|
||||
#[cfg(not(debug_assertions))]
|
||||
debug!("created ephemeral file {}", filename.display());
|
||||
|
||||
Ok(EphemeralFile {
|
||||
file_id,
|
||||
_tenant_id: tenant_id,
|
||||
_timeline_id: timeline_id,
|
||||
file: file_rc,
|
||||
file: Some(file_rc),
|
||||
size: 0,
|
||||
})
|
||||
}
|
||||
@@ -79,6 +96,8 @@ impl EphemeralFile {
|
||||
while off < PAGE_SZ {
|
||||
let n = self
|
||||
.file
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
|
||||
|
||||
if n == 0 {
|
||||
@@ -261,17 +280,43 @@ impl Drop for EphemeralFile {
|
||||
cache.drop_buffers_for_ephemeral(self.file_id);
|
||||
|
||||
// remove entry from the hash map
|
||||
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
|
||||
let virtual_file = EPHEMERAL_FILES
|
||||
.write()
|
||||
.unwrap()
|
||||
.files
|
||||
.remove(&self.file_id)
|
||||
.unwrap();
|
||||
|
||||
// remove file from self
|
||||
let self_file = self.file.take().unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Arc::as_ptr(&virtual_file) as *const (),
|
||||
Arc::as_ptr(&self_file) as *const ()
|
||||
);
|
||||
drop(self_file);
|
||||
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&virtual_file), 0);
|
||||
let virtual_file = Arc::try_unwrap(virtual_file).expect(
|
||||
"we are being dropped and EPHEMERAL_FILES is the only other place where we put the Arc",
|
||||
);
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.file.path);
|
||||
if let Err(e) = res {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
// TODO: we should be able to unwrap here, but, timeline delete and tenant detach do
|
||||
// std::fs::remove_dir_all without dropping all InMemoryLayer => EphemeralFile
|
||||
// of the tenant => need to fix that first.
|
||||
match virtual_file.remove() {
|
||||
Ok(()) => (),
|
||||
Err((virtual_file, e)) => {
|
||||
warn!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
virtual_file.path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1392,7 +1392,7 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -917,7 +917,7 @@ impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
match inner.blob_writer.into_inner().into_inner() {
|
||||
Ok(vfile) => vfile.remove(),
|
||||
Ok(vfile) => vfile.remove().unwrap(),
|
||||
Err(err) => warn!(
|
||||
"error while flushing buffer of image layer temporary file: {}",
|
||||
err
|
||||
|
||||
@@ -709,7 +709,7 @@ impl ImageLayerWriter {
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
inner.blob_writer.into_inner().remove().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,7 +216,7 @@ pub struct Timeline {
|
||||
pub initdb_lsn: Lsn,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
pub(super) partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
||||
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
@@ -684,8 +684,11 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
debug!("start");
|
||||
self.freeze_inmem_layer(false);
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
let ret = self.flush_frozen_layers_and_wait().await;
|
||||
debug!(is_err = ret.is_err(), "complete");
|
||||
ret
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers.
|
||||
@@ -1306,7 +1309,7 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
|
||||
}
|
||||
|
||||
fn get_compaction_target_size(&self) -> u64 {
|
||||
pub(super) fn get_compaction_target_size(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.compaction_target_size
|
||||
@@ -2386,8 +2389,11 @@ impl Timeline {
|
||||
ValueReconstructResult::Missing => {
|
||||
return Err(layer_traversal_error(
|
||||
format!(
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}",
|
||||
key, cont_lsn, request_lsn
|
||||
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
|
||||
key,
|
||||
cont_lsn,
|
||||
request_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
),
|
||||
traversal_path,
|
||||
));
|
||||
@@ -2859,14 +2865,21 @@ impl Timeline {
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
let l = layers.frozen_layers.pop_front().unwrap();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
// timeline). If two threads tried to flush the same frozen
|
||||
// layer to disk at the same time, that would not work.
|
||||
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer));
|
||||
|
||||
// release lock on 'layers'
|
||||
assert!(LayerMap::compare_arced_layers(&l, &frozen_layer));
|
||||
drop(frozen_layer);
|
||||
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
|
||||
// It does the following checks atomically.
|
||||
assert_eq!(Arc::weak_count(&l), 0);
|
||||
let l =
|
||||
Arc::try_unwrap(l).expect("no-one except us holds references to this layer");
|
||||
drop(layers); // don't hold layer map lock when doing disk IO
|
||||
info!("dropping frozen layer, this should remove the ephemeral file on disk");
|
||||
drop(l);
|
||||
}
|
||||
|
||||
fail_point!("checkpoint-after-sync");
|
||||
@@ -3000,7 +3013,7 @@ impl Timeline {
|
||||
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
async fn repartition(
|
||||
pub(super) async fn repartition(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partition_size: u64,
|
||||
|
||||
@@ -1324,7 +1324,7 @@ mod tests {
|
||||
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -324,16 +324,8 @@ impl VirtualFile {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
/// Idempotently close the file descriptor we might have or have not open for this VirtualFile.
|
||||
pub fn close(&mut self) {
|
||||
let handle = self.handle.get_mut().unwrap();
|
||||
|
||||
// We could check with a read-lock first, to avoid waiting on an
|
||||
@@ -351,6 +343,26 @@ impl Drop for VirtualFile {
|
||||
.observe_closure_duration(|| slot_guard.file.take());
|
||||
}
|
||||
}
|
||||
|
||||
/// Caller can retry if we return an `Err`.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn remove(mut self) -> Result<(), (Self, std::io::Error)> {
|
||||
// close our fd before unlink system call, so that the unlink actually performs the removal
|
||||
self.close();
|
||||
// Try to remove file on disk.
|
||||
// If it fails, we idempotently closed the fd, but the caller can choose to retry.
|
||||
match std::fs::remove_file(&self.path) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => Err((self, e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||
fn drop(&mut self) {
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for VirtualFile {
|
||||
|
||||
@@ -1208,7 +1208,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let harness = TenantHarness::create("test_relsize")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1427,7 +1428,8 @@ mod tests {
|
||||
// and then created it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let harness = TenantHarness::create("test_drop_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1496,7 +1498,8 @@ mod tests {
|
||||
// and then extended it again within the same layer.
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let harness = TenantHarness::create("test_truncate_extend")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
@@ -1636,7 +1639,8 @@ mod tests {
|
||||
/// split into multiple 1 GB segments in Postgres.
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let harness = TenantHarness::create("test_large_rel")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user