diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index e4dede2c30..ba3542d869 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -83,15 +83,6 @@ impl<'a> BlockCursor<'a> { } } -/// -/// Abstract trait for a data sink that you can write blobs to. -/// -pub trait BlobWriter { - /// Write a blob of data. Returns the offset that it was written to, - /// which can be used to retrieve the data later. - fn write_blob(&mut self, srcbuf: &[u8]) -> Result; -} - /// /// An implementation of BlobWriter to write blobs to anything that /// implements std::io::Write. @@ -123,11 +114,13 @@ impl WriteBlobWriter { } } -impl BlobWriter for WriteBlobWriter +impl WriteBlobWriter where W: std::io::Write, { - fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + /// Write a blob of data. Returns the offset that it was written to, + /// which can be used to retrieve the data later. + pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result { let offset = self.offset; if srcbuf.len() < 128 { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 60427a22e4..e95c422d29 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -31,7 +31,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; -use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; +use crate::tenant::blob_io::WriteBlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -590,7 +590,7 @@ impl DeltaLayerWriterInner { /// /// Start building a new delta layer. /// - fn new( + async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -607,7 +607,7 @@ impl DeltaLayerWriterInner { let mut file = VirtualFile::create(&path)?; // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64))?; + file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let buf_writer = BufWriter::new(file); let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64); @@ -632,11 +632,12 @@ impl DeltaLayerWriterInner { /// /// The values must be appended in key, lsn order. /// - fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { + async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) + .await } - fn put_value_bytes( + async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, @@ -645,7 +646,7 @@ impl DeltaLayerWriterInner { ) -> anyhow::Result<()> { assert!(self.lsn_range.start <= lsn); - let off = self.blob_writer.write_blob(val)?; + let off = self.blob_writer.write_blob(val).await?; let blob_ref = BlobRef::new(off, will_init); @@ -662,7 +663,7 @@ impl DeltaLayerWriterInner { /// /// Finish writing the delta layer. /// - fn finish(self, key_end: Key) -> anyhow::Result { + async fn finish(self, key_end: Key) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -671,7 +672,8 @@ impl DeltaLayerWriterInner { // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?; + file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) + .await?; for buf in block_buf.blocks { file.write_all(buf.as_ref())?; } @@ -687,11 +689,22 @@ impl DeltaLayerWriterInner { index_start_blk, index_root_blk, }; - file.seek(SeekFrom::Start(0))?; - Summary::ser_into(&summary, &mut file)?; + + let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + Summary::ser_into(&summary, &mut buf)?; + if buf.spilled() { + // This is bad as we only have one free block for the summary + warn!( + "Used more than one page size for summary buffer: {}", + buf.len() + ); + } + file.seek(SeekFrom::Start(0)).await?; + file.write_all(&buf)?; let metadata = file .metadata() + .await .context("get file metadata to determine size")?; // 5GB limit for objects without multipart upload (which we don't want to use) @@ -774,7 +787,7 @@ impl DeltaLayerWriter { /// /// Start building a new delta layer. /// - pub fn new( + pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -782,13 +795,10 @@ impl DeltaLayerWriter { lsn_range: Range, ) -> anyhow::Result { Ok(Self { - inner: Some(DeltaLayerWriterInner::new( - conf, - timeline_id, - tenant_id, - key_start, - lsn_range, - )?), + inner: Some( + DeltaLayerWriterInner::new(conf, timeline_id, tenant_id, key_start, lsn_range) + .await?, + ), }) } @@ -797,11 +807,11 @@ impl DeltaLayerWriter { /// /// The values must be appended in key, lsn order. /// - pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { - self.inner.as_mut().unwrap().put_value(key, lsn, val) + pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { + self.inner.as_mut().unwrap().put_value(key, lsn, val).await } - pub fn put_value_bytes( + pub async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, @@ -812,6 +822,7 @@ impl DeltaLayerWriter { .as_mut() .unwrap() .put_value_bytes(key, lsn, val, will_init) + .await } pub fn size(&self) -> u64 { @@ -821,8 +832,8 @@ impl DeltaLayerWriter { /// /// Finish writing the delta layer. /// - pub fn finish(mut self, key_end: Key) -> anyhow::Result { - self.inner.take().unwrap().finish(key_end) + pub async fn finish(mut self, key_end: Key) -> anyhow::Result { + self.inner.take().unwrap().finish(key_end).await } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index f329041fb1..62bfeb8eec 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -27,7 +27,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; -use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; +use crate::tenant::blob_io::WriteBlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ @@ -519,7 +519,7 @@ impl ImageLayerWriterInner { /// /// Start building a new image layer. /// - fn new( + async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -543,7 +543,7 @@ impl ImageLayerWriterInner { std::fs::OpenOptions::new().write(true).create_new(true), )?; // make room for the header block - file.seek(SeekFrom::Start(PAGE_SZ as u64))?; + file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64); // Initialize the b-tree index builder @@ -569,9 +569,9 @@ impl ImageLayerWriterInner { /// /// The page versions must be appended in blknum order. /// - fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { + async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let off = self.blob_writer.write_blob(img)?; + let off = self.blob_writer.write_blob(img).await?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); @@ -583,14 +583,15 @@ impl ImageLayerWriterInner { /// /// Finish writing the image layer. /// - fn finish(self) -> anyhow::Result { + async fn finish(self) -> anyhow::Result { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; let mut file = self.blob_writer.into_inner(); // Write out the index - file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?; + file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) + .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { file.write_all(buf.as_ref())?; @@ -607,11 +608,22 @@ impl ImageLayerWriterInner { index_start_blk, index_root_blk, }; - file.seek(SeekFrom::Start(0))?; - Summary::ser_into(&summary, &mut file)?; + + let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + Summary::ser_into(&summary, &mut buf)?; + if buf.spilled() { + // This is bad as we only have one free block for the summary + warn!( + "Used more than one page size for summary buffer: {}", + buf.len() + ); + } + file.seek(SeekFrom::Start(0)).await?; + file.write_all(&buf)?; let metadata = file .metadata() + .await .context("get metadata to determine file size")?; let desc = PersistentLayerDesc::new_img( @@ -687,7 +699,7 @@ impl ImageLayerWriter { /// /// Start building a new image layer. /// - pub fn new( + pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -695,13 +707,9 @@ impl ImageLayerWriter { lsn: Lsn, ) -> anyhow::Result { Ok(Self { - inner: Some(ImageLayerWriterInner::new( - conf, - timeline_id, - tenant_id, - key_range, - lsn, - )?), + inner: Some( + ImageLayerWriterInner::new(conf, timeline_id, tenant_id, key_range, lsn).await?, + ), }) } @@ -710,15 +718,15 @@ impl ImageLayerWriter { /// /// The page versions must be appended in blknum order. /// - pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { - self.inner.as_mut().unwrap().put_image(key, img) + pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { + self.inner.as_mut().unwrap().put_image(key, img).await } /// /// Finish writing the image layer. /// - pub fn finish(mut self) -> anyhow::Result { - self.inner.take().unwrap().finish() + pub async fn finish(mut self) -> anyhow::Result { + self.inner.take().unwrap().finish().await } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 35a77a7331..374b0bb60c 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -333,7 +333,8 @@ impl InMemoryLayer { self.tenant_id, Key::MIN, self.start_lsn..end_lsn, - )?; + ) + .await?; let mut buf = Vec::new(); @@ -348,11 +349,13 @@ impl InMemoryLayer { for (lsn, pos) in vec_map.as_slice() { cursor.read_blob_into_buf(*pos, &mut buf).await?; let will_init = Value::des(&buf)?.will_init(); - delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?; + delta_layer_writer + .put_value_bytes(key, *lsn, &buf, will_init) + .await?; } } - let delta_layer = delta_layer_writer.finish(Key::MAX)?; + let delta_layer = delta_layer_writer.finish(Key::MAX).await?; Ok(delta_layer) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 816af214a5..d7ff70a29c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3033,7 +3033,8 @@ impl Timeline { self.tenant_id, &img_range, lsn, - )?; + ) + .await?; fail_point!("image-layer-writer-fail-before-finish", |_| { Err(PageReconstructError::Other(anyhow::anyhow!( @@ -3069,11 +3070,11 @@ impl Timeline { } } }; - image_layer_writer.put_image(key, &img)?; + image_layer_writer.put_image(key, &img).await?; key = key.next(); } } - let image_layer = image_layer_writer.finish()?; + let image_layer = image_layer_writer.finish().await?; image_layers.push(image_layer); } } @@ -3618,7 +3619,11 @@ impl Timeline { { // ... if so, flush previous layer and prepare to write new one new_layers.push(Arc::new( - writer.take().unwrap().finish(prev_key.unwrap().next())?, + writer + .take() + .unwrap() + .finish(prev_key.unwrap().next()) + .await?, )); writer = None; @@ -3633,20 +3638,23 @@ impl Timeline { } if writer.is_none() { // Create writer if not initiaized yet - writer = Some(DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_id, - key, - if dup_end_lsn.is_valid() { - // this is a layer containing slice of values of the same key - debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); - dup_start_lsn..dup_end_lsn - } else { - debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); - lsn_range.clone() - }, - )?); + writer = Some( + DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + key, + if dup_end_lsn.is_valid() { + // this is a layer containing slice of values of the same key + debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); + dup_start_lsn..dup_end_lsn + } else { + debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); + lsn_range.clone() + }, + ) + .await?, + ); } fail_point!("delta-layer-writer-fail-before-finish", |_| { @@ -3655,11 +3663,11 @@ impl Timeline { ))) }); - writer.as_mut().unwrap().put_value(key, lsn, value)?; + writer.as_mut().unwrap().put_value(key, lsn, value).await?; prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?)); + new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next()).await?)); } // Sync layers diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 41c5eb96cf..7e9b50dfeb 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -326,7 +326,7 @@ impl VirtualFile { self.with_file("fsync", |file| file.sync_all())? } - pub fn metadata(&self) -> Result { + pub async fn metadata(&self) -> Result { self.with_file("metadata", |file| file.metadata())? } @@ -407,7 +407,7 @@ impl VirtualFile { std::fs::remove_file(path).expect("failed to remove the virtual file"); } - pub fn seek(&mut self, pos: SeekFrom) -> Result { + pub async fn seek(&mut self, pos: SeekFrom) -> Result { match pos { SeekFrom::Start(offset) => { self.pos = offset; @@ -619,9 +619,9 @@ mod tests { MaybeVirtualFile::File(file) => file.write_all_at(buf, offset), } } - fn seek(&mut self, pos: SeekFrom) -> Result { + async fn seek(&mut self, pos: SeekFrom) -> Result { match self { - MaybeVirtualFile::VirtualFile(file) => file.seek(pos), + MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await, MaybeVirtualFile::File(file) => file.seek(pos), } } @@ -712,23 +712,23 @@ mod tests { assert_eq!("", file_a.read_string().await?); // Test seeks. - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); assert_eq!("oobar", file_a.read_string().await?); - assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); + assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4); assert_eq!("ar", file_a.read_string().await?); - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); + assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3); assert_eq!("bar", file_a.read_string().await?); - assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); + assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1); assert_eq!("oobar", file_a.read_string().await?); // Test erroneous seeks to before byte 0 - assert!(file_a.seek(SeekFrom::End(-7)).is_err()); - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); - assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); + file_a.seek(SeekFrom::End(-7)).await.unwrap_err(); + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); + file_a.seek(SeekFrom::Current(-2)).await.unwrap_err(); // the erroneous seek should have left the position unchanged assert_eq!("oobar", file_a.read_string().await?); @@ -752,7 +752,7 @@ mod tests { // open the same file many times. The effect is the same.) // // leave file_a positioned at offset 1 before we start - assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); + assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); let mut vfiles = Vec::new(); for _ in 0..100 {