diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 32d0d1bed2..495dae87e3 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option { // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" async fn get_holes(path: &Path, max_holes: usize) -> Result> { - let file = FileBlockReader::new(VirtualFile::open(path)?); + let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index ff2044653a..33a6f197cf 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { let path = path.as_ref(); virtual_file::init(10); page_cache::init(100); - let file = FileBlockReader::new(VirtualFile::open(path)?); + let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 71db8d2978..4fad1f3c14 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -238,7 +238,7 @@ mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(&path)?; + let file = VirtualFile::create(&path).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let offs = wtr.write_blob(blob).await?; @@ -251,7 +251,7 @@ mod tests { wtr.flush_buffer().await?; } - let file = VirtualFile::open(&path)?; + let file = VirtualFile::open(&path).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new(rdr); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 4c5fe424f3..887834cd9b 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -28,7 +28,7 @@ pub struct EphemeralFile { } impl EphemeralFile { - pub fn create( + pub async fn create( conf: &PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, @@ -44,7 +44,8 @@ impl EphemeralFile { let file = VirtualFile::open_with_options( &filename, OpenOptions::new().read(true).write(true).create(true), - )?; + ) + .await?; Ok(EphemeralFile { page_cache_file_id: page_cache::next_file_id(), @@ -286,7 +287,7 @@ mod tests { async fn test_ephemeral_blobs() -> Result<(), io::Error> { let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?; - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; let pos_foo = file.write_blob(b"foo").await?; assert_eq!( diff --git a/pageserver/src/tenant/par_fsync.rs b/pageserver/src/tenant/par_fsync.rs index 3cbcfe8774..705b42aff7 100644 --- a/pageserver/src/tenant/par_fsync.rs +++ b/pageserver/src/tenant/par_fsync.rs @@ -4,10 +4,9 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; -use crate::virtual_file::VirtualFile; - fn fsync_path(path: &Path) -> io::Result<()> { - let file = VirtualFile::open(path)?; + // TODO use VirtualFile::fsync_all once we fully go async. + let file = std::fs::File::open(path)?; file.sync_all() } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b6fbf98962..6925cb59cd 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -604,7 +604,7 @@ impl DeltaLayerWriterInner { // FIXME: throw an error instead? let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path)?; + let mut file = VirtualFile::create(&path).await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); @@ -732,7 +732,7 @@ impl DeltaLayerWriterInner { }; // fsync the file - file.sync_all()?; + file.sync_all().await?; // Rename the file to its final name // // Note: This overwrites any existing file. There shouldn't be any. @@ -851,6 +851,7 @@ impl DeltaLayerInner { summary: Option, ) -> anyhow::Result { let file = VirtualFile::open(path) + .await .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 8f7fb8175c..2a6cabcc97 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -438,6 +438,7 @@ impl ImageLayerInner { summary: Option, ) -> anyhow::Result { let file = VirtualFile::open(path) + .await .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); let summary_blk = file.read_blk(0).await?; @@ -540,7 +541,8 @@ impl ImageLayerWriterInner { let mut file = VirtualFile::open_with_options( &path, std::fs::OpenOptions::new().write(true).create_new(true), - )?; + ) + .await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); @@ -645,7 +647,7 @@ impl ImageLayerWriterInner { }; // fsync the file - file.sync_all()?; + file.sync_all().await?; // Rename the file to its final name // diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 374b0bb60c..3ff1c6bb18 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -236,7 +236,7 @@ impl InMemoryLayer { /// /// Create a new, empty, in-memory layer /// - pub fn create( + pub async fn create( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_id: TenantId, @@ -244,7 +244,7 @@ impl InMemoryLayer { ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = EphemeralFile::create(conf, tenant_id, timeline_id)?; + let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; Ok(InMemoryLayer { conf, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index cb8b842cf6..fa3b487589 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2546,13 +2546,15 @@ impl Timeline { /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { let mut guard = self.layers.write().await; - let layer = guard.get_layer_for_write( - lsn, - self.get_last_record_lsn(), - self.conf, - self.timeline_id, - self.tenant_id, - )?; + let layer = guard + .get_layer_for_write( + lsn, + self.get_last_record_lsn(), + self.conf, + self.timeline_id, + self.tenant_id, + ) + .await?; Ok(layer) } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 5522ea1788..3c88d31f24 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -87,7 +87,7 @@ impl LayerManager { /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, /// called within `get_layer_for_write`. - pub(crate) fn get_layer_for_write( + pub(crate) async fn get_layer_for_write( &mut self, lsn: Lsn, last_record_lsn: Lsn, @@ -129,7 +129,7 @@ impl LayerManager { lsn ); - let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?; + let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index c4241c4270..1fa5fcc297 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -210,17 +210,18 @@ impl CrashsafeOverwriteError { impl VirtualFile { /// Open a file in read-only mode. Like File::open. - pub fn open(path: &Path) -> Result { - Self::open_with_options(path, OpenOptions::new().read(true)) + pub async fn open(path: &Path) -> Result { + Self::open_with_options(path, OpenOptions::new().read(true)).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. - pub fn create(path: &Path) -> Result { + pub async fn create(path: &Path) -> Result { Self::open_with_options( path, OpenOptions::new().write(true).create(true).truncate(true), ) + .await } /// Open a file with given options. @@ -228,7 +229,7 @@ impl VirtualFile { /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, /// they will be applied also when the file is subsequently re-opened, not only /// on the first time. Make sure that's sane! - pub fn open_with_options( + pub async fn open_with_options( path: &Path, open_options: &OpenOptions, ) -> Result { @@ -299,11 +300,13 @@ impl VirtualFile { // we bail out instead of causing damage. .create_new(true), ) + .await .map_err(CrashsafeOverwriteError::CreateTempfile)?; file.write_all(content) .await .map_err(CrashsafeOverwriteError::WriteContents)?; file.sync_all() + .await .map_err(CrashsafeOverwriteError::SyncTempfile)?; drop(file); // before the rename, that's important! // renames are atomic @@ -316,26 +319,28 @@ impl VirtualFile { // try_lock. let final_parent_dirfd = Self::open_with_options(final_path_parent, OpenOptions::new().read(true)) + .await .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; final_parent_dirfd .sync_all() + .await .map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?; Ok(()) } /// Call File::sync_all() on the underlying File. - pub fn sync_all(&self) -> Result<(), Error> { - self.with_file("fsync", |file| file.sync_all())? + pub async fn sync_all(&self) -> Result<(), Error> { + self.with_file("fsync", |file| file.sync_all()).await? } pub async fn metadata(&self) -> Result { - self.with_file("metadata", |file| file.metadata())? + self.with_file("metadata", |file| file.metadata()).await? } /// Helper function that looks up the underlying File for this VirtualFile, /// opening it and evicting some other File if necessary. It calls 'func' /// with the physical File. - fn with_file(&self, op: &str, mut func: F) -> Result + async fn with_file(&self, op: &str, mut func: F) -> Result where F: FnMut(&File) -> R, { @@ -415,7 +420,9 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = self.with_file("seek", |mut file| file.seek(SeekFrom::End(offset)))?? + self.pos = self + .with_file("seek", |mut file| file.seek(SeekFrom::End(offset))) + .await?? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -503,7 +510,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self.with_file("read", |file| file.read_at(buf, offset))?; + let result = self + .with_file("read", |file| file.read_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -513,7 +522,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = self.with_file("write", |file| file.write_at(buf, offset))?; + let result = self + .with_file("write", |file| file.write_at(buf, offset)) + .await?; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) @@ -625,6 +636,7 @@ mod tests { use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; + use std::future::Future; use std::io::Write; use std::sync::Arc; @@ -694,8 +706,8 @@ mod tests { // results with VirtualFiles as with native Files. (Except that with // native files, you will run out of file descriptors if the ulimit // is low enough.) - test_files("virtual_files", |path, open_options| { - let vf = VirtualFile::open_with_options(path, open_options)?; + test_files("virtual_files", |path, open_options| async move { + let vf = VirtualFile::open_with_options(&path, &open_options).await?; Ok(MaybeVirtualFile::VirtualFile(vf)) }) .await @@ -703,31 +715,37 @@ mod tests { #[tokio::test] async fn test_physical_files() -> Result<(), Error> { - test_files("physical_files", |path, open_options| { + test_files("physical_files", |path, open_options| async move { Ok(MaybeVirtualFile::File(open_options.open(path)?)) }) .await } - async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> + async fn test_files(testname: &str, openfunc: OF) -> Result<(), Error> where - OF: Fn(&Path, &OpenOptions) -> Result, + OF: Fn(PathBuf, OpenOptions) -> FT, + FT: Future>, { let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; let path_a = testdir.join("file_a"); let mut file_a = openfunc( - &path_a, - OpenOptions::new().write(true).create(true).truncate(true), - )?; + path_a.clone(), + OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .to_owned(), + ) + .await?; file_a.write_all(b"foobar").await?; // cannot read from a file opened in write-only mode let _ = file_a.read_string().await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; + let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar").await.unwrap_err(); @@ -763,13 +781,15 @@ mod tests { // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); let mut file_b = openfunc( - &path_b, + path_b.clone(), OpenOptions::new() .read(true) .write(true) .create(true) - .truncate(true), - )?; + .truncate(true) + .to_owned(), + ) + .await?; file_b.write_all_at(b"BAR", 3).await?; file_b.write_all_at(b"FOO", 0).await?; @@ -783,7 +803,8 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; + let mut vfile = + openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; assert_eq!("FOOBAR", vfile.read_string().await?); vfiles.push(vfile); } @@ -808,8 +829,8 @@ mod tests { /// Test using VirtualFiles from many threads concurrently. This tests both using /// a lot of VirtualFiles concurrently, causing evictions, and also using the same /// VirtualFile from multiple threads concurrently. - #[test] - fn test_vfile_concurrency() -> Result<(), Error> { + #[tokio::test] + async fn test_vfile_concurrency() -> Result<(), Error> { const SIZE: usize = 8 * 1024; const VIRTUAL_FILES: usize = 100; const THREADS: usize = 100; @@ -828,7 +849,8 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?; + let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true)) + .await?; files.push(f); } let files = Arc::new(files); @@ -839,9 +861,10 @@ mod tests { .thread_name("test_vfile_concurrency thread") .build() .unwrap(); + let mut hdls = Vec::new(); for _threadno in 0..THREADS { let files = files.clone(); - rt.spawn(async move { + let hdl = rt.spawn(async move { let mut buf = [0u8; SIZE]; let mut rng = rand::rngs::OsRng; for _ in 1..1000 { @@ -850,7 +873,12 @@ mod tests { assert!(buf == SAMPLE); } }); + hdls.push(hdl); } + for hdl in hdls { + hdl.await?; + } + std::mem::forget(rt); Ok(()) }