Compare commits

...

10 Commits

Author SHA1 Message Date
Christian Schwarz
0bf9db2e5d flush_frozen_layer: assert we own the frozen layer when we remove its ephemeral file
TODO the assert is incorrect because get_reconstruct_data's traversal_path
can be holding references to inmemory and frozen layers.
2023-06-09 17:30:37 +02:00
Christian Schwarz
7ef080c404 TenantHarness: shutdown the tenant from Drop
This is for general cleanliness.

As a next step, we could try to remove the test temporary directory.
This has been attempted in the past, but back then,
we didn't have proper tenant shutdown.
2023-06-09 17:23:52 +02:00
Christian Schwarz
f557790969 EphemeralFile: panic if removal fails
This requires us to be disciplined about dropping
all EphemeralFile objects (=> InMemoryLayer objects)
before removing the timeline / tenant dir on disk.

If we don't do that, we'll panic inside the EphemeralFile::drop.

We know that detach doesn't honor this, so, we cannot
ship this patch just yet. But it's good to have it as an
aspirational goal.
2023-06-09 17:23:05 +02:00
Christian Schwarz
9ebccbcdd5 page_cache: in cfg(test), bail if writeback fails
Tests use an extremely small page cache.
This is very likely to trigger the indefinitely retrying loop
mentioned in the comment above the changed line.

It's easier to debug the tests if we bail out and fail.

I also tried panicking, but, that wasn't as helpful as
bailing out, in my experience.
2023-06-09 16:56:31 +02:00
Christian Schwarz
b6cb362f11 tenant shutdown: add some debug logging
This proved useful while debugging

https://github.com/neondatabase/neon/pull/4451#issuecomment-1584503248

resulting in

d4a86a415b
2023-06-09 16:45:18 +02:00
Christian Schwarz
88baa4fff7 EphemeralFile: assert no reuse of file ids + use create_new
Just for some robustness in depth.
To my knowledge, the asserted conditions have never occurred in real life.
2023-06-09 16:40:45 +02:00
Christian Schwarz
d4a86a415b fix it, the way this failed was far from obvious (took a page_cache deep dive) 2023-06-09 16:26:04 +02:00
Christian Schwarz
142eabe390 fix the tests 2023-06-09 13:52:04 +02:00
Christian Schwarz
5e87cedb95 clippy 2023-06-09 13:26:18 +02:00
Christian Schwarz
b460f617e9 create_test_timeline: do DatadirModification::init_empty
See the added comment on `create_empty_timeline`.

Rough context: https://github.com/neondatabase/neon/pull/4364#discussion_r1221995691
2023-06-09 13:02:30 +02:00
11 changed files with 263 additions and 101 deletions

View File

@@ -799,8 +799,12 @@ impl PageCache {
// a different victim. But if the problem persists, the page cache
// could fill up with dirty pages that we cannot evict, and we will
// loop retrying the writebacks indefinitely.
error!("writeback of buffer {:?} failed: {}", old_key, err);
continue;
if cfg!(test) {
anyhow::bail!("writeback of buffer {:?} failed: {}", old_key, err);
} else {
error!("writeback of buffer {:?} failed: {}", old_key, err);
continue;
}
}
}

View File

@@ -1601,9 +1601,6 @@ pub fn create_test_timeline(
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;
Ok(tline)
}

View File

@@ -489,6 +489,7 @@ impl std::fmt::Display for WaitToBecomeActiveError {
}
}
#[derive(Debug)]
pub(crate) enum ShutdownError {
AlreadyStopping,
}
@@ -1268,6 +1269,18 @@ impl Tenant {
/// This is used to create the initial 'main' timeline during bootstrapping,
/// or when importing a new base backup. The caller is expected to load an
/// initial image of the datadir to the new timeline after this.
///
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
/// and the timeline will fail to load at a restart.
///
/// That's why we add an uninit mark file, and wrap it together witht the Timeline
/// in-memory object into UninitializedTimeline.
/// Once the caller is done setting up the timeline, they should call
/// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
///
/// For tests, use `DatadirModification::init_empty` + `commit` to setup the
/// minimum amount of keys required to get a working timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
@@ -1316,8 +1329,21 @@ impl Tenant {
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
// Setup minimum keys required for the timeline to be usable.
let mut modification = uninit_tl
.raw_timeline()
.expect("we just created it")
.begin_modification(initdb_lsn);
modification.init_empty().context("init_empty")?;
modification
.commit()
.context("commit init_empty modification")?;
let mut timelines = self.timelines.lock().unwrap();
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
// load_layers=false because create_empty_timeline already did that what's necessary (set next_open_layer)
// and modification.init_empty() already created layers.
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, false)?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
Ok(tl)
@@ -1901,6 +1927,8 @@ impl Tenant {
/// This will attempt to shutdown even if tenant is broken.
pub(crate) async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
debug_assert_current_span_has_tenant_id();
debug!("start");
// Set tenant (and its timlines) to Stoppping state.
//
// Since we can only transition into Stopping state after activation is complete,
@@ -1947,6 +1975,7 @@ impl Tenant {
// this will additionally shutdown and await all timeline tasks.
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
debug!("complete");
Ok(())
}
@@ -3423,6 +3452,7 @@ pub mod harness {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: TenantId,
tenant: std::sync::Mutex<Option<Arc<Tenant>>>,
pub lock_guard: (
Option<RwLockReadGuard<'a, ()>>,
@@ -3482,6 +3512,7 @@ pub mod harness {
tenant_conf,
tenant_id,
lock_guard,
tenant: std::sync::Mutex::new(None),
})
}
@@ -3530,6 +3561,7 @@ pub mod harness {
for timeline in tenant.timelines.lock().unwrap().values() {
timeline.set_state(TimelineState::Active);
}
*self.tenant.lock().unwrap() = Some(Arc::clone(&tenant));
Ok(tenant)
}
@@ -3538,6 +3570,32 @@ pub mod harness {
}
}
impl<'a> Drop for TenantHarness<'a> {
fn drop(&mut self) {
if let Some(tenant) = self.tenant.lock().unwrap().take() {
// Shutdown with freeze_and_flush so that we don't drop `EphemeralFile` objects in `InMemoryLayer`s.
// Without this, we remove the ephemeral files on disk but they remain in pageserver's PageCache.
// This causes write-back failures down the line.
let tenant = Arc::clone(&tenant);
std::thread::Builder::new()
.name("TenantHarness::drop thread".to_owned())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(tenant.shutdown(true).instrument(
info_span!("tenant_harness_drop_shutdown", tenant_id=%tenant.tenant_id),
))
.unwrap()
})
.unwrap()
.join()
.unwrap();
}
}
}
// Mock WAL redo manager that doesn't do much
pub struct TestRedoManager;
@@ -3586,8 +3644,10 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_basic")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -3617,12 +3677,12 @@ mod tests {
#[tokio::test]
async fn no_duplicate_timelines() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
.load()
.await;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("no_duplicate_timelines")?;
let harness = harness;
let (tenant, ctx) = harness.load().await;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -3649,9 +3709,11 @@ mod tests {
#[tokio::test]
async fn test_branch() -> anyhow::Result<()> {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_branch")?;
let harness = harness;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
#[allow(non_snake_case)]
@@ -3744,11 +3806,12 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?;
let harness = harness;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -3780,10 +3843,9 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
.load()
.await;
let harness = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?;
let harness = harness;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
@@ -3831,11 +3893,10 @@ mod tests {
#[tokio::test]
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
@@ -3879,11 +3940,11 @@ mod tests {
#[tokio::test]
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
@@ -3902,11 +3963,10 @@ mod tests {
}
#[tokio::test]
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
@@ -3939,7 +3999,7 @@ mod tests {
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
}
@@ -3959,7 +4019,7 @@ mod tests {
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
@@ -3996,7 +4056,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
let (tenant, ctx) = harness.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
drop(tline);
drop(tenant);
@@ -4033,8 +4094,10 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_images")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -4098,8 +4161,10 @@ mod tests {
//
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_bulk_insert")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
let mut lsn = Lsn(0x10);
@@ -4140,8 +4205,10 @@ mod tests {
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let harness = TenantHarness::create("test_random_updates")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 1000;
@@ -4153,7 +4220,7 @@ mod tests {
// a read sees the latest page version.
let mut updated = [Lsn(0); NUM_KEYS];
let mut lsn = Lsn(0);
let mut lsn = Lsn(0x10);
#[allow(clippy::needless_range_loop)]
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
@@ -4211,11 +4278,10 @@ mod tests {
#[tokio::test]
async fn test_traverse_branches() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
.load()
.await;
let harness = TenantHarness::create("test_traverse_branches")?;
let (tenant, ctx) = harness.load().await;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 1000;
@@ -4227,7 +4293,7 @@ mod tests {
// a read sees the latest page version.
let mut updated = [Lsn(0); NUM_KEYS];
let mut lsn = Lsn(0);
let mut lsn = Lsn(0x10);
#[allow(clippy::needless_range_loop)]
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
@@ -4294,11 +4360,10 @@ mod tests {
#[tokio::test]
async fn test_traverse_ancestors() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
.load()
.await;
let harness = TenantHarness::create("test_traverse_ancestors")?;
let (tenant, ctx) = harness.load().await;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;
@@ -4307,7 +4372,7 @@ mod tests {
// Track page mutation lsns across different timelines.
let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
let mut lsn = Lsn(0);
let mut lsn = Lsn(0x10);
#[allow(clippy::needless_range_loop)]
for idx in 0..NUM_TLINES {
@@ -4353,6 +4418,28 @@ mod tests {
}
Ok(())
}
#[tokio::test]
async fn test_empty_test_timeline_is_usable() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_empty_test_timeline_is_usable")?;
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x20), DEFAULT_PG_VERSION, &ctx)?;
// Make sure the timeline has the minimum set of required keys for operation.
// The only operation you can do on an empty timeline is to write new data.
// Repartition is the only code on the write path that requires other keys to be present.
// Make sure it works.
{
let cache = tline.partitioning.lock().unwrap();
assert_eq!(cache.1, Lsn(0), "must not have repartitioned yet, otherwise the repartition call below might just use the cache");
}
tline
.repartition(Lsn(0x20), tline.get_compaction_target_size(), &ctx)
.await?;
Ok(())
}
}
#[cfg(not(debug_assertions))]

View File

@@ -39,7 +39,7 @@ pub struct EphemeralFile {
file_id: u64,
_tenant_id: TenantId,
_timeline_id: TimelineId,
file: Arc<VirtualFile>,
file: Option<Arc<VirtualFile>>,
pub size: u64,
}
@@ -52,7 +52,10 @@ impl EphemeralFile {
) -> Result<EphemeralFile, io::Error> {
let mut l = EPHEMERAL_FILES.write().unwrap();
let file_id = l.next_file_id;
l.next_file_id += 1;
l.next_file_id = l
.next_file_id
.checked_add(1)
.expect("next_file_id is u64, expecting it to not overflow");
let filename = conf
.timeline_path(&timeline_id, &tenant_id)
@@ -60,16 +63,30 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
OpenOptions::new()
.read(true)
.write(true)
// The next_file_id doesn't overlfow, so technically, `create_new` is not needed.
// But it's cheap, so why not.
.create_new(true),
)?;
let file_rc = Arc::new(file);
l.files.insert(file_id, file_rc.clone());
#[cfg(debug_assertions)]
debug!(
"created ephemeral file {}\n{}",
filename.display(),
std::backtrace::Backtrace::force_capture()
);
#[cfg(not(debug_assertions))]
debug!("created ephemeral file {}", filename.display());
Ok(EphemeralFile {
file_id,
_tenant_id: tenant_id,
_timeline_id: timeline_id,
file: file_rc,
file: Some(file_rc),
size: 0,
})
}
@@ -79,6 +96,8 @@ impl EphemeralFile {
while off < PAGE_SZ {
let n = self
.file
.as_ref()
.unwrap()
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
if n == 0 {
@@ -261,17 +280,43 @@ impl Drop for EphemeralFile {
cache.drop_buffers_for_ephemeral(self.file_id);
// remove entry from the hash map
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
let virtual_file = EPHEMERAL_FILES
.write()
.unwrap()
.files
.remove(&self.file_id)
.unwrap();
// remove file from self
let self_file = self.file.take().unwrap();
assert_eq!(
Arc::as_ptr(&virtual_file) as *const (),
Arc::as_ptr(&self_file) as *const ()
);
drop(self_file);
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
// It does the following checks atomically.
assert_eq!(Arc::weak_count(&virtual_file), 0);
let virtual_file = Arc::try_unwrap(virtual_file).expect(
"we are being dropped and EPHEMERAL_FILES is the only other place where we put the Arc",
);
// unlink the file
let res = std::fs::remove_file(&self.file.path);
if let Err(e) = res {
warn!(
"could not remove ephemeral file '{}': {}",
self.file.path.display(),
e
);
}
// TODO: we should be able to unwrap here, but, timeline delete and tenant detach do
// std::fs::remove_dir_all without dropping all InMemoryLayer => EphemeralFile
// of the tenant => need to fix that first.
match virtual_file.remove() {
Ok(()) => (),
Err((virtual_file, e)) => {
warn!(
"could not remove ephemeral file '{}': {}",
virtual_file.path.display(),
e
);
}
};
}
}

View File

@@ -1392,7 +1392,7 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -917,7 +917,7 @@ impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
match inner.blob_writer.into_inner().into_inner() {
Ok(vfile) => vfile.remove(),
Ok(vfile) => vfile.remove().unwrap(),
Err(err) => warn!(
"error while flushing buffer of image layer temporary file: {}",
err

View File

@@ -709,7 +709,7 @@ impl ImageLayerWriter {
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
inner.blob_writer.into_inner().remove().unwrap();
}
}
}

View File

@@ -216,7 +216,7 @@ pub struct Timeline {
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
pub(super) partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -684,8 +684,11 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
debug!("start");
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await
let ret = self.flush_frozen_layers_and_wait().await;
debug!(is_err = ret.is_err(), "complete");
ret
}
/// Outermost timeline compaction operation; downloads needed layers.
@@ -1306,7 +1309,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
pub(super) fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_target_size
@@ -2386,8 +2389,11 @@ impl Timeline {
ValueReconstructResult::Missing => {
return Err(layer_traversal_error(
format!(
"could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
key,
cont_lsn,
request_lsn,
std::backtrace::Backtrace::force_capture(),
),
traversal_path,
));
@@ -2859,14 +2865,21 @@ impl Timeline {
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let l = layers.frozen_layers.pop_front();
let l = layers.frozen_layers.pop_front().unwrap();
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer));
// release lock on 'layers'
assert!(LayerMap::compare_arced_layers(&l, &frozen_layer));
drop(frozen_layer);
// XXX once we upgrade to Rust 1.70, use Arc::into_inner.
// It does the following checks atomically.
assert_eq!(Arc::weak_count(&l), 0);
let l =
Arc::try_unwrap(l).expect("no-one except us holds references to this layer");
drop(layers); // don't hold layer map lock when doing disk IO
info!("dropping frozen layer, this should remove the ephemeral file on disk");
drop(l);
}
fail_point!("checkpoint-after-sync");
@@ -3000,7 +3013,7 @@ impl Timeline {
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
}
async fn repartition(
pub(super) async fn repartition(
&self,
lsn: Lsn,
partition_size: u64,

View File

@@ -1324,7 +1324,7 @@ mod tests {
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
ConnectionManagerState {

View File

@@ -324,16 +324,8 @@ impl VirtualFile {
Ok(result)
}
pub fn remove(self) {
let path = self.path.clone();
drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file");
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
/// Idempotently close the file descriptor we might have or have not open for this VirtualFile.
pub fn close(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
@@ -351,6 +343,26 @@ impl Drop for VirtualFile {
.observe_closure_duration(|| slot_guard.file.take());
}
}
/// Caller can retry if we return an `Err`.
#[allow(clippy::result_large_err)]
pub fn remove(mut self) -> Result<(), (Self, std::io::Error)> {
// close our fd before unlink system call, so that the unlink actually performs the removal
self.close();
// Try to remove file on disk.
// If it fails, we idempotently closed the fd, but the caller can choose to retry.
match std::fs::remove_file(&self.path) {
Ok(()) => Ok(()),
Err(e) => Err((self, e)),
}
}
}
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
self.close();
}
}
impl Read for VirtualFile {

View File

@@ -1208,7 +1208,8 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let harness = TenantHarness::create("test_relsize")?;
let (tenant, ctx) = harness.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
@@ -1427,7 +1428,8 @@ mod tests {
// and then created it again within the same layer.
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let harness = TenantHarness::create("test_drop_extend")?;
let (tenant, ctx) = harness.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
@@ -1496,7 +1498,8 @@ mod tests {
// and then extended it again within the same layer.
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let harness = TenantHarness::create("test_truncate_extend")?;
let (tenant, ctx) = harness.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
@@ -1636,7 +1639,8 @@ mod tests {
/// split into multiple 1 GB segments in Postgres.
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let harness = TenantHarness::create("test_large_rel")?;
let (tenant, ctx) = harness.load().await;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;