From 6ff74295b5b21d54192d20d114d78621d8d53ba0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 13 May 2024 14:52:06 +0200 Subject: [PATCH] chore(pageserver): plumb through RequestContext to VirtualFile open methods (#7725) This PR introduces no functional changes. The `open()` path will be done separately. refs https://github.com/neondatabase/neon/issues/6107 refs https://github.com/neondatabase/neon/issues/7386 Co-authored-by: Joonas Koivunen --- pageserver/ctl/src/layer_map_analyzer.rs | 2 +- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/pgdatadir_mapping.rs | 2 +- pageserver/src/tenant/blob_io.rs | 4 +- pageserver/src/tenant/ephemeral_file.rs | 4 +- .../tenant/remote_timeline_client/download.rs | 21 ++-- .../src/tenant/storage_layer/delta_layer.rs | 10 +- .../src/tenant/storage_layer/image_layer.rs | 8 +- .../tenant/storage_layer/inmemory_layer.rs | 4 +- pageserver/src/tenant/timeline.rs | 35 ++++-- pageserver/src/tenant/timeline/compaction.rs | 4 + .../src/tenant/timeline/detach_ancestor.rs | 2 + .../src/tenant/timeline/layer_manager.rs | 4 +- pageserver/src/virtual_file.rs | 102 +++++++++++++----- 14 files changed, 143 insertions(+), 61 deletions(-) diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index c4c282f33d..b4bb239f44 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -100,7 +100,7 @@ pub(crate) fn parse_filename(name: &str) -> Option { // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result> { - let file = VirtualFile::open(path).await?; + let file = VirtualFile::open(path, ctx).await?; let file_id = page_cache::next_file_id(); let block_reader = FileBlockReader::new(&file, file_id); let summary_blk = block_reader.read_blk(0, ctx).await?; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index be8f91675d..3611b0baab 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -61,7 +61,7 @@ async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); page_cache::init(100); - let file = VirtualFile::open(path).await?; + let file = VirtualFile::open(path, ctx).await?; let file_id = page_cache::next_file_id(); let block_reader = FileBlockReader::new(&file, file_id); let summary_blk = block_reader.read_blk(0, ctx).await?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a4215ee107..ffcab5f140 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1671,7 +1671,7 @@ impl<'a> DatadirModification<'a> { } if !self.pending_deletions.is_empty() { - writer.delete_batch(&self.pending_deletions).await?; + writer.delete_batch(&self.pending_deletions, ctx).await?; self.pending_deletions.clear(); } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 1dc451f5c9..24b4e4f3ea 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -299,7 +299,7 @@ mod tests { // Write part (in block to drop the file) let mut offsets = Vec::new(); { - let file = VirtualFile::create(pathbuf.as_path()).await?; + let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?; let mut wtr = BlobWriter::::new(file, 0); for blob in blobs.iter() { let (_, res) = wtr.write_blob(blob.clone(), &ctx).await; @@ -314,7 +314,7 @@ mod tests { wtr.flush_buffer(&ctx).await?; } - let file = VirtualFile::open(pathbuf.as_path()).await?; + let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?; let rdr = BlockReaderRef::VirtualFile(&file); let rdr = BlockCursor::new(rdr); for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() { diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 8b815a1885..79cc7bf153 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -28,6 +28,7 @@ impl EphemeralFile { conf: &PageServerConf, tenant_shard_id: TenantShardId, timeline_id: TimelineId, + ctx: &RequestContext, ) -> Result { static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); let filename_disambiguator = @@ -45,6 +46,7 @@ impl EphemeralFile { .read(true) .write(true) .create(true), + ctx, ) .await?; @@ -153,7 +155,7 @@ mod tests { async fn test_ephemeral_blobs() -> Result<(), io::Error> { let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?; - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?; + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?; let pos_foo = file.write_blob(b"foo", &ctx).await?; assert_eq!( diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index b464437422..f3c9e64533 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -112,14 +112,17 @@ pub async fn download_layer_file<'a>( // We use fatal_err() below because the after the rename above, // the in-memory state of the filesystem already has the layer file in its final place, // and subsequent pageserver code could think it's durable while it really isn't. - let work = async move { - let timeline_dir = VirtualFile::open(&timeline_path) - .await - .fatal_err("VirtualFile::open for timeline dir fsync"); - timeline_dir - .sync_all() - .await - .fatal_err("VirtualFile::sync_all timeline dir"); + let work = { + let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior()); + async move { + let timeline_dir = VirtualFile::open(&timeline_path, &ctx) + .await + .fatal_err("VirtualFile::open for timeline dir fsync"); + timeline_dir + .sync_all() + .await + .fatal_err("VirtualFile::sync_all timeline dir"); + } }; crate::virtual_file::io_engine::get() .spawn_blocking_and_block_on_if_std(work) @@ -196,7 +199,7 @@ async fn download_object<'a>( use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer}; use bytes::BytesMut; async { - let destination_file = VirtualFile::create(dst_path) + let destination_file = VirtualFile::create(dst_path, ctx) .await .with_context(|| format!("create a destination file for layer '{dst_path}'")) .map_err(DownloadError::Other)?; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 4f30cf2e84..1b3802840f 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -394,6 +394,7 @@ impl DeltaLayerWriterInner { tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. We don't know // the end key yet, so we cannot form the final filename yet. We will @@ -404,7 +405,7 @@ impl DeltaLayerWriterInner { let path = DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range); - let mut file = VirtualFile::create(&path).await?; + let mut file = VirtualFile::create(&path, ctx).await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); @@ -586,6 +587,7 @@ impl DeltaLayerWriter { tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( @@ -595,6 +597,7 @@ impl DeltaLayerWriter { tenant_shard_id, key_start, lsn_range, + ctx, ) .await?, ), @@ -701,6 +704,7 @@ impl DeltaLayer { let mut file = VirtualFile::open_with_options( path, virtual_file::OpenOptions::new().read(true).write(true), + ctx, ) .await .with_context(|| format!("Failed to open file '{}'", path))?; @@ -734,7 +738,7 @@ impl DeltaLayerInner { max_vectored_read_bytes: Option, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let file = match VirtualFile::open(path).await { + let file = match VirtualFile::open(path, ctx).await { Ok(file) => file, Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), }; @@ -1792,6 +1796,7 @@ mod test { harness.tenant_shard_id, entries_meta.key_range.start, entries_meta.lsn_range.clone(), + &ctx, ) .await?; @@ -1979,6 +1984,7 @@ mod test { tenant.tenant_shard_id, Key::MIN, Lsn(0x11)..truncate_at, + ctx, ) .await .unwrap(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 72d1f36cab..6ea452b993 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -343,6 +343,7 @@ impl ImageLayer { let mut file = VirtualFile::open_with_options( path, virtual_file::OpenOptions::new().read(true).write(true), + ctx, ) .await .with_context(|| format!("Failed to open file '{}'", path))?; @@ -377,7 +378,7 @@ impl ImageLayerInner { max_vectored_read_bytes: Option, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let file = match VirtualFile::open(path).await { + let file = match VirtualFile::open(path, ctx).await { Ok(file) => file, Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))), }; @@ -632,6 +633,7 @@ impl ImageLayerWriterInner { tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. // We'll atomically rename it to the final name when we're done. @@ -651,6 +653,7 @@ impl ImageLayerWriterInner { virtual_file::OpenOptions::new() .write(true) .create_new(true), + ctx, ) .await? }; @@ -805,10 +808,11 @@ impl ImageLayerWriter { tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( - ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn) + ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx) .await?, ), }) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 4dacbec2f3..9553f83026 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -473,10 +473,11 @@ impl InMemoryLayer { timeline_id: TimelineId, tenant_shard_id: TenantShardId, start_lsn: Lsn, + ctx: &RequestContext, ) -> Result { trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"); - let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?; + let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?; let key = InMemoryLayerFileId(file.page_cache_file_id()); Ok(InMemoryLayer { @@ -642,6 +643,7 @@ impl InMemoryLayer { self.tenant_shard_id, Key::MIN, self.start_lsn..end_lsn, + ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 505dc8c30d..d2fcd6c4a5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3560,7 +3560,11 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + async fn get_layer_for_write( + &self, + lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result> { let mut guard = self.layers.write().await; let layer = guard .get_layer_for_write( @@ -3569,6 +3573,7 @@ impl Timeline { self.conf, self.timeline_id, self.tenant_shard_id, + ctx, ) .await?; Ok(layer) @@ -3833,8 +3838,8 @@ impl Timeline { ); self.create_delta_layer( &frozen_layer, - ctx, Some(metadata_keyspace.0.ranges[0].clone()), + ctx, ) .await? } else { @@ -3863,7 +3868,7 @@ impl Timeline { // Normal case, write out a L0 delta layer file. // `create_delta_layer` will not modify the layer map. // We will remove frozen layer and add delta layer in one atomic operation later. - let Some(layer) = self.create_delta_layer(&frozen_layer, ctx, None).await? else { + let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else { panic!("delta layer cannot be empty if no filter is applied"); }; ( @@ -3992,8 +3997,8 @@ impl Timeline { async fn create_delta_layer( self: &Arc, frozen_layer: &Arc, - ctx: &RequestContext, key_range: Option>, + ctx: &RequestContext, ) -> anyhow::Result> { let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); @@ -4016,6 +4021,7 @@ impl Timeline { &self_clone .conf .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id), + &ctx, ) .await .fatal_err("VirtualFile::open for timeline dir fsync"); @@ -4209,6 +4215,7 @@ impl Timeline { self.tenant_shard_id, &img_range, lsn, + ctx, ) .await?; @@ -4313,6 +4320,7 @@ impl Timeline { &self .conf .timeline_path(&self.tenant_shard_id, &self.timeline_id), + ctx, ) .await .fatal_err("VirtualFile::open for timeline dir fsync"); @@ -5214,7 +5222,7 @@ impl<'a> TimelineWriter<'a> { let buf_size: u64 = buf.len().try_into().expect("oversized value buf"); let action = self.get_open_layer_action(lsn, buf_size); - let layer = self.handle_open_layer_action(lsn, action).await?; + let layer = self.handle_open_layer_action(lsn, action, ctx).await?; let res = layer.put_value(key, lsn, &buf, ctx).await; if res.is_ok() { @@ -5237,14 +5245,15 @@ impl<'a> TimelineWriter<'a> { &mut self, at: Lsn, action: OpenLayerAction, + ctx: &RequestContext, ) -> anyhow::Result<&Arc> { match action { OpenLayerAction::Roll => { let freeze_at = self.write_guard.as_ref().unwrap().max_lsn.unwrap(); self.roll_layer(freeze_at).await?; - self.open_layer(at).await?; + self.open_layer(at, ctx).await?; } - OpenLayerAction::Open => self.open_layer(at).await?, + OpenLayerAction::Open => self.open_layer(at, ctx).await?, OpenLayerAction::None => { assert!(self.write_guard.is_some()); } @@ -5253,8 +5262,8 @@ impl<'a> TimelineWriter<'a> { Ok(&self.write_guard.as_ref().unwrap().open_layer) } - async fn open_layer(&mut self, at: Lsn) -> anyhow::Result<()> { - let layer = self.tl.get_layer_for_write(at).await?; + async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> { + let layer = self.tl.get_layer_for_write(at, ctx).await?; let initial_size = layer.size().await?; let last_freeze_at = self.last_freeze_at.load(); @@ -5331,10 +5340,14 @@ impl<'a> TimelineWriter<'a> { Ok(()) } - pub(crate) async fn delete_batch(&mut self, batch: &[(Range, Lsn)]) -> anyhow::Result<()> { + pub(crate) async fn delete_batch( + &mut self, + batch: &[(Range, Lsn)], + ctx: &RequestContext, + ) -> anyhow::Result<()> { if let Some((_, lsn)) = batch.first() { let action = self.get_open_layer_action(*lsn, 0); - let layer = self.handle_open_layer_action(*lsn, action).await?; + let layer = self.handle_open_layer_action(*lsn, action, ctx).await?; layer.put_tombstones(batch).await?; } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index e83878b8fb..4226bf431e 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -700,6 +700,7 @@ impl Timeline { debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); lsn_range.clone() }, + ctx, ) .await?, ); @@ -755,6 +756,7 @@ impl Timeline { &self .conf .timeline_path(&self.tenant_shard_id, &self.timeline_id), + ctx, ) .await .fatal_err("VirtualFile::open for timeline dir fsync"); @@ -1093,6 +1095,7 @@ impl CompactionJobExecutor for TimelineAdaptor { self.timeline.tenant_shard_id, key_range.start, lsn_range.clone(), + ctx, ) .await?; @@ -1167,6 +1170,7 @@ impl TimelineAdaptor { self.timeline.tenant_shard_id, key_range, lsn, + ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 69b82344a6..9471ba860f 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -215,6 +215,7 @@ pub(super) async fn prepare( &detached .conf .timeline_path(&detached.tenant_shard_id, &detached.timeline_id), + ctx, ) .await .fatal_err("VirtualFile::open for timeline dir fsync"); @@ -339,6 +340,7 @@ async fn copy_lsn_prefix( target_timeline.tenant_shard_id, layer.layer_desc().key_range.start, layer.layer_desc().lsn_range.start..end_lsn, + ctx, ) .await .map_err(CopyDeltaPrefix)?; diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index a72eb1b3bf..248420e632 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -9,6 +9,7 @@ use utils::{ use crate::{ config::PageServerConf, + context::RequestContext, metrics::TimelineMetrics, tenant::{ layer_map::{BatchedUpdates, LayerMap}, @@ -69,6 +70,7 @@ impl LayerManager { conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, + ctx: &RequestContext, ) -> Result> { ensure!(lsn.is_aligned()); @@ -105,7 +107,7 @@ impl LayerManager { ); let new_layer = - InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?; + InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 8dee73891b..b68f3a0e89 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -344,16 +344,23 @@ macro_rules! with_file { impl VirtualFile { /// Open a file in read-only mode. Like File::open. - pub async fn open(path: &Utf8Path) -> Result { - Self::open_with_options(path, OpenOptions::new().read(true)).await + pub async fn open( + path: &Utf8Path, + ctx: &RequestContext, + ) -> Result { + Self::open_with_options(path, OpenOptions::new().read(true), ctx).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. - pub async fn create(path: &Utf8Path) -> Result { + pub async fn create( + path: &Utf8Path, + ctx: &RequestContext, + ) -> Result { Self::open_with_options( path, OpenOptions::new().write(true).create(true).truncate(true), + ctx, ) .await } @@ -366,6 +373,7 @@ impl VirtualFile { pub async fn open_with_options( path: &Utf8Path, open_options: &OpenOptions, + _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ ) -> Result { let path_str = path.to_string(); let parts = path_str.split('/').collect::>(); @@ -1179,7 +1187,6 @@ mod tests { use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; - use std::future::Future; use std::io::Write; use std::os::unix::fs::FileExt; use std::sync::Arc; @@ -1293,41 +1300,69 @@ mod tests { // results with VirtualFiles as with native Files. (Except that with // native files, you will run out of file descriptors if the ulimit // is low enough.) - test_files("virtual_files", |path, open_options| async move { - let vf = VirtualFile::open_with_options(&path, &open_options).await?; - Ok(MaybeVirtualFile::VirtualFile(vf)) - }) - .await + struct A; + + impl Adapter for A { + async fn open( + path: Utf8PathBuf, + opts: OpenOptions, + ctx: &RequestContext, + ) -> Result { + let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?; + Ok(MaybeVirtualFile::VirtualFile(vf)) + } + } + test_files::("virtual_files").await } #[tokio::test] async fn test_physical_files() -> anyhow::Result<()> { - test_files("physical_files", |path, open_options| async move { - Ok(MaybeVirtualFile::File({ - let owned_fd = open_options.open(path.as_std_path()).await?; - File::from(owned_fd) - })) - }) - .await + struct B; + + impl Adapter for B { + async fn open( + path: Utf8PathBuf, + opts: OpenOptions, + _ctx: &RequestContext, + ) -> Result { + Ok(MaybeVirtualFile::File({ + let owned_fd = opts.open(path.as_std_path()).await?; + File::from(owned_fd) + })) + } + } + + test_files::("physical_files").await } - async fn test_files(testname: &str, openfunc: OF) -> anyhow::Result<()> + /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition + /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function + /// in trait which benefits from the new lifetime capture rules already. + trait Adapter { + async fn open( + path: Utf8PathBuf, + opts: OpenOptions, + ctx: &RequestContext, + ) -> Result; + } + + async fn test_files(testname: &str) -> anyhow::Result<()> where - OF: Fn(Utf8PathBuf, OpenOptions) -> FT, - FT: Future>, + A: Adapter, { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; let path_a = testdir.join("file_a"); - let mut file_a = openfunc( + let mut file_a = A::open( path_a.clone(), OpenOptions::new() .write(true) .create(true) .truncate(true) .to_owned(), + &ctx, ) .await?; file_a.write_all(b"foobar".to_vec(), &ctx).await?; @@ -1336,7 +1371,7 @@ mod tests { let _ = file_a.read_string(&ctx).await.unwrap_err(); // Close the file and re-open for reading - let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; + let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; // cannot write to a file opened in read-only mode let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err(); @@ -1371,7 +1406,7 @@ mod tests { // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); - let mut file_b = openfunc( + let mut file_b = A::open( path_b.clone(), OpenOptions::new() .read(true) @@ -1379,6 +1414,7 @@ mod tests { .create(true) .truncate(true) .to_owned(), + &ctx, ) .await?; file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?; @@ -1394,8 +1430,12 @@ mod tests { let mut vfiles = Vec::new(); for _ in 0..100 { - let mut vfile = - openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; + let mut vfile = A::open( + path_b.clone(), + OpenOptions::new().read(true).to_owned(), + &ctx, + ) + .await?; assert_eq!("FOOBAR", vfile.read_string(&ctx).await?); vfiles.push(vfile); } @@ -1441,8 +1481,12 @@ mod tests { // Open the file many times. let mut files = Vec::new(); for _ in 0..VIRTUAL_FILES { - let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true)) - .await?; + let f = VirtualFile::open_with_options( + &test_file_path, + OpenOptions::new().read(true), + &ctx, + ) + .await?; files.push(f); } let files = Arc::new(files); @@ -1488,7 +1532,7 @@ mod tests { VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec()) .await .unwrap(); - let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); + let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); @@ -1497,7 +1541,7 @@ mod tests { VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec()) .await .unwrap(); - let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); + let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); @@ -1521,7 +1565,7 @@ mod tests { .await .unwrap(); - let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); + let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists());