diff --git a/pageserver/src/tenant/delta_layer.rs b/pageserver/src/tenant/delta_layer.rs index 41715ab0a4..a908d66200 100644 --- a/pageserver/src/tenant/delta_layer.rs +++ b/pageserver/src/tenant/delta_layer.rs @@ -610,9 +610,9 @@ impl DeltaLayer { /// /// 3. Call `finish`. /// -pub struct DeltaLayerWriter { +struct DeltaLayerWriterInner { conf: &'static PageServerConf, - path: PathBuf, + pub path: PathBuf, timeline_id: TimelineId, tenant_id: TenantId, @@ -624,17 +624,17 @@ pub struct DeltaLayerWriter { blob_writer: WriteBlobWriter>, } -impl DeltaLayerWriter { +impl DeltaLayerWriterInner { /// /// Start building a new delta layer. /// - pub fn new( + fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, key_start: Key, lsn_range: Range, - ) -> Result { + ) -> anyhow::Result { // Create the file initially with a temporary filename. We don't know // the end key yet, so we cannot form the final filename yet. We will // rename it when we're done. @@ -653,7 +653,7 @@ impl DeltaLayerWriter { let block_buf = BlockBuf::new(); let tree_builder = DiskBtreeBuilder::new(block_buf); - Ok(DeltaLayerWriter { + Ok(Self { conf, path, timeline_id, @@ -670,17 +670,17 @@ impl DeltaLayerWriter { /// /// The values must be appended in key, lsn order. /// - pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> { + fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) } - pub fn put_value_bytes( + fn put_value_bytes( &mut self, key: Key, lsn: Lsn, val: &[u8], will_init: bool, - ) -> Result<()> { + ) -> anyhow::Result<()> { assert!(self.lsn_range.start <= lsn); let off = self.blob_writer.write_blob(val)?; @@ -693,14 +693,14 @@ impl DeltaLayerWriter { Ok(()) } - pub fn size(&self) -> u64 { + fn size(&self) -> u64 { self.blob_writer.size() + self.tree.borrow_writer().size() } /// /// Finish writing the delta layer. /// - pub fn finish(self, key_end: Key) -> anyhow::Result { + fn finish(self, key_end: Key) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -768,6 +768,102 @@ impl DeltaLayerWriter { } } +/// A builder object for constructing a new delta layer. +/// +/// Usage: +/// +/// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...) +/// +/// 2. Write the contents by calling `put_value` for every page +/// version to store in the layer. +/// +/// 3. Call `finish`. +/// +/// # Note +/// +/// As described in https://github.com/neondatabase/neon/issues/2650, it's +/// possible for the writer to drop before `finish` is actually called. So this +/// could lead to odd temporary files in the directory, exhausting file system. +/// This structure wraps `DeltaLayerWriterInner` and also contains `Drop` +/// implementation that cleans up the temporary file in failure. It's not +/// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves +/// out some fields, making it impossible to implement `Drop`. +/// +#[must_use] +pub struct DeltaLayerWriter { + inner: Option, +} + +impl DeltaLayerWriter { + /// + /// Start building a new delta layer. + /// + pub fn new( + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_id: TenantId, + key_start: Key, + lsn_range: Range, + ) -> anyhow::Result { + Ok(Self { + inner: Some(DeltaLayerWriterInner::new( + conf, + timeline_id, + tenant_id, + key_start, + lsn_range, + )?), + }) + } + + /// + /// Append a key-value pair to the file. + /// + /// The values must be appended in key, lsn order. + /// + pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { + self.inner.as_mut().unwrap().put_value(key, lsn, val) + } + + pub fn put_value_bytes( + &mut self, + key: Key, + lsn: Lsn, + val: &[u8], + will_init: bool, + ) -> anyhow::Result<()> { + self.inner + .as_mut() + .unwrap() + .put_value_bytes(key, lsn, val, will_init) + } + + pub fn size(&self) -> u64 { + self.inner.as_ref().unwrap().size() + } + + /// + /// Finish writing the delta layer. + /// + pub fn finish(mut self, key_end: Key) -> anyhow::Result { + self.inner.take().unwrap().finish(key_end) + } +} + +impl Drop for DeltaLayerWriter { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + match inner.blob_writer.into_inner().into_inner() { + Ok(vfile) => vfile.remove(), + Err(err) => warn!( + "error while flushing buffer of image layer temporary file: {}", + err + ), + } + } + } +} + /// /// Iterator over all key-value pairse stored in a delta layer /// diff --git a/pageserver/src/tenant/image_layer.rs b/pageserver/src/tenant/image_layer.rs index cbfa0134b0..8409d34bc9 100644 --- a/pageserver/src/tenant/image_layer.rs +++ b/pageserver/src/tenant/image_layer.rs @@ -411,7 +411,7 @@ impl ImageLayer { /// /// 3. Call `finish`. /// -pub struct ImageLayerWriter { +struct ImageLayerWriterInner { conf: &'static PageServerConf, path: PathBuf, timeline_id: TimelineId, @@ -423,14 +423,17 @@ pub struct ImageLayerWriter { tree: DiskBtreeBuilder, } -impl ImageLayerWriter { - pub fn new( +impl ImageLayerWriterInner { + /// + /// Start building a new image layer. + /// + fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, key_range: &Range, lsn: Lsn, - ) -> anyhow::Result { + ) -> anyhow::Result { // Create the file initially with a temporary filename. // We'll atomically rename it to the final name when we're done. let path = ImageLayer::temp_path_for( @@ -455,7 +458,7 @@ impl ImageLayerWriter { let block_buf = BlockBuf::new(); let tree_builder = DiskBtreeBuilder::new(block_buf); - let writer = ImageLayerWriter { + let writer = Self { conf, path, timeline_id, @@ -474,7 +477,7 @@ impl ImageLayerWriter { /// /// The page versions must be appended in blknum order. /// - pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> { + fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); let off = self.blob_writer.write_blob(img)?; @@ -485,7 +488,10 @@ impl ImageLayerWriter { Ok(()) } - pub fn finish(self) -> anyhow::Result { + /// + /// Finish writing the image layer. + /// + fn finish(self) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -552,3 +558,76 @@ impl ImageLayerWriter { Ok(layer) } } + +/// A builder object for constructing a new image layer. +/// +/// Usage: +/// +/// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...) +/// +/// 2. Write the contents by calling `put_page_image` for every key-value +/// pair in the key range. +/// +/// 3. Call `finish`. +/// +/// # Note +/// +/// As described in https://github.com/neondatabase/neon/issues/2650, it's +/// possible for the writer to drop before `finish` is actually called. So this +/// could lead to odd temporary files in the directory, exhausting file system. +/// This structure wraps `ImageLayerWriterInner` and also contains `Drop` +/// implementation that cleans up the temporary file in failure. It's not +/// possible to do this directly in `ImageLayerWriterInner` since `finish` moves +/// out some fields, making it impossible to implement `Drop`. +/// +#[must_use] +pub struct ImageLayerWriter { + inner: Option, +} + +impl ImageLayerWriter { + /// + /// Start building a new image layer. + /// + pub fn new( + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_id: TenantId, + key_range: &Range, + lsn: Lsn, + ) -> anyhow::Result { + Ok(Self { + inner: Some(ImageLayerWriterInner::new( + conf, + timeline_id, + tenant_id, + key_range, + lsn, + )?), + }) + } + + /// + /// Write next value to the file. + /// + /// The page versions must be appended in blknum order. + /// + pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { + self.inner.as_mut().unwrap().put_image(key, img) + } + + /// + /// Finish writing the image layer. + /// + pub fn finish(mut self) -> anyhow::Result { + self.inner.take().unwrap().finish() + } +} + +impl Drop for ImageLayerWriter { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + inner.blob_writer.into_inner().remove(); + } + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6a96254df4..d63429ea6a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1541,6 +1541,10 @@ impl Timeline { lsn, )?; + fail_point!("image-layer-writer-fail-before-finish", |_| { + anyhow::bail!("failpoint image-layer-writer-fail-before-finish"); + }); + for range in &partition.ranges { let mut key = range.start; while key < range.end { @@ -1835,6 +1839,11 @@ impl Timeline { }, )?); } + + fail_point!("delta-layer-writer-fail-before-finish", |_| { + anyhow::bail!("failpoint delta-layer-writer-fail-before-finish"); + }); + writer.as_mut().unwrap().put_value(key, lsn, value)?; prev_key = Some(key); } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 896c2603a2..46e4acd50c 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -319,6 +319,12 @@ impl VirtualFile { Ok(result) } + + pub fn remove(self) { + let path = self.path.clone(); + drop(self); + std::fs::remove_file(path).expect("failed to remove the virtual file"); + } } impl Drop for VirtualFile { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 38a0db7cf7..e7e0e4ce56 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1628,6 +1628,8 @@ class NeonPageserver(PgProtocol): Initializes the repository via `neon init`. """ + TEMP_FILE_SUFFIX = "___temp" + def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional[str] = None): super().__init__(host="localhost", port=port.pg, user="cloud_admin") self.env = env diff --git a/test_runner/regress/test_layer_writers_fail.py b/test_runner/regress/test_layer_writers_fail.py new file mode 100644 index 0000000000..e8ba0e7d91 --- /dev/null +++ b/test_runner/regress/test_layer_writers_fail.py @@ -0,0 +1,92 @@ +import pytest +from fixtures.neon_fixtures import NeonEnv, NeonPageserver + + +@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2703") +def test_image_layer_writer_fail_before_finish(neon_simple_env: NeonEnv): + env = neon_simple_env + pageserver_http = env.pageserver.http_client() + + tenant_id, timeline_id = env.neon_cli.create_tenant( + conf={ + # small checkpoint distance to create more delta layer files + "checkpoint_distance": f"{1024 ** 2}", + # set the target size to be large to allow the image layer to cover the whole key space + "compaction_target_size": f"{1024 ** 3}", + # tweak the default settings to allow quickly create image layers and L1 layers + "compaction_period": "1 s", + "compaction_threshold": "2", + "image_creation_threshold": "1", + } + ) + + pg = env.postgres.create_start("main", tenant_id=tenant_id) + pg.safe_psql_many( + [ + "CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g""", + ] + ) + + pageserver_http.configure_failpoints(("image-layer-writer-fail-before-finish", "return")) + with pytest.raises(Exception, match="image-layer-writer-fail-before-finish"): + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + new_temp_layer_files = list( + filter( + lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX), + [path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()], + ) + ) + + assert ( + len(new_temp_layer_files) == 0 + ), "pageserver should clean its temporary new image layer files on failure" + + +@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2703") +def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv): + env = neon_simple_env + pageserver_http = env.pageserver.http_client() + + tenant_id, timeline_id = env.neon_cli.create_tenant( + conf={ + # small checkpoint distance to create more delta layer files + "checkpoint_distance": f"{1024 ** 2}", + # set the target size to be large to allow the image layer to cover the whole key space + "compaction_target_size": f"{1024 ** 3}", + # tweak the default settings to allow quickly create image layers and L1 layers + "compaction_period": "1 s", + "compaction_threshold": "2", + "image_creation_threshold": "1", + } + ) + + pg = env.postgres.create_start("main", tenant_id=tenant_id) + pg.safe_psql_many( + [ + "CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)", + """INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g""", + ] + ) + + pageserver_http.configure_failpoints(("delta-layer-writer-fail-before-finish", "return")) + # Note: we cannot test whether the exception is exactly 'delta-layer-writer-fail-before-finish' + # since our code does it in loop, we cannot get this exact error for our request. + with pytest.raises(Exception): + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + new_temp_layer_files = list( + filter( + lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX), + [path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()], + ) + ) + + assert ( + len(new_temp_layer_files) == 0 + ), "pageserver should clean its temporary new delta layer files on failure"