diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 37c84be342..92928116c1 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -102,7 +102,7 @@ impl<'a> BlockReaderRef<'a> { #[cfg(test)] TestDisk(r) => r.read_blk(blknum), #[cfg(test)] - VirtualFile(r) => r.read_blk(blknum).await, + VirtualFile(r) => r.read_blk(blknum, ctx).await, } } } @@ -177,10 +177,11 @@ impl<'a> FileBlockReader<'a> { &self, buf: PageWriteGuard<'static>, blkno: u32, + ctx: &RequestContext, ) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file - .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64) + .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx) .await } /// Read a block. @@ -206,7 +207,7 @@ impl<'a> FileBlockReader<'a> { ReadBufResult::Found(guard) => Ok(guard.into()), ReadBufResult::NotFound(write_guard) => { // Read the page from disk into the buffer - let write_guard = self.fill_buffer(write_guard, blknum).await?; + let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 42def8858e..276ac87064 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -78,7 +78,7 @@ impl RW { page_cache::ReadBufResult::NotFound(write_guard) => { let write_guard = writer .file - .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) + .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c38c9bb656..4f30cf2e84 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -908,7 +908,7 @@ impl DeltaLayerInner { .await .map_err(GetVectoredError::Other)?; - self.do_reads_and_update_state(reads, reconstruct_state) + self.do_reads_and_update_state(reads, reconstruct_state, ctx) .await; reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start); @@ -1012,6 +1012,7 @@ impl DeltaLayerInner { &self, reads: Vec, reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, ) { let vectored_blob_reader = VectoredBlobReader::new(&self.file); let mut ignore_key_with_err = None; @@ -1029,7 +1030,7 @@ impl DeltaLayerInner { // track when a key is done. for read in reads.into_iter().rev() { let res = vectored_blob_reader - .read_blobs(&read, buf.take().expect("Should have a buffer")) + .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx) .await; let blobs_buf = match res { @@ -1274,7 +1275,7 @@ impl DeltaLayerInner { buf.clear(); buf.reserve(read.size()); - let res = reader.read_blobs(&read, buf).await?; + let res = reader.read_blobs(&read, buf, ctx).await?; for blob in res.blobs { let key = blob.meta.key; @@ -1848,7 +1849,7 @@ mod test { for read in vectored_reads { let blobs_buf = vectored_blob_reader - .read_blobs(&read, buf.take().expect("Should have a buffer")) + .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx) .await?; for meta in blobs_buf.blobs.iter() { let value = &blobs_buf.buf[meta.start..meta.end]; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c9874873e4..72d1f36cab 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -474,7 +474,7 @@ impl ImageLayerInner { .await .map_err(GetVectoredError::Other)?; - self.do_reads_and_update_state(reads, reconstruct_state) + self.do_reads_and_update_state(reads, reconstruct_state, ctx) .await; Ok(()) @@ -537,6 +537,7 @@ impl ImageLayerInner { &self, reads: Vec, reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, ) { let max_vectored_read_bytes = self .max_vectored_read_bytes @@ -565,7 +566,7 @@ impl ImageLayerInner { } let buf = BytesMut::with_capacity(buf_size); - let res = vectored_blob_reader.read_blobs(&read, buf).await; + let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await; match res { Ok(blobs_buf) => { diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 91934d5e0e..6e825760e3 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -23,6 +23,7 @@ use pageserver_api::key::Key; use utils::lsn::Lsn; use utils::vec_map::VecMap; +use crate::context::RequestContext; use crate::virtual_file::VirtualFile; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -285,6 +286,7 @@ impl<'a> VectoredBlobReader<'a> { &self, read: &VectoredRead, buf: BytesMut, + ctx: &RequestContext, ) -> Result { assert!(read.size() > 0); assert!( @@ -295,7 +297,7 @@ impl<'a> VectoredBlobReader<'a> { ); let buf = self .file - .read_exact_at_n(buf, read.start, read.size()) + .read_exact_at_n(buf, read.start, read.size(), ctx) .await?; let blobs_at = read.blobs_at.as_slice(); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index a17488a286..8dee73891b 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -576,21 +576,34 @@ impl VirtualFile { Ok(self.pos) } - pub async fn read_exact_at(&self, buf: B, offset: u64) -> Result + pub async fn read_exact_at( + &self, + buf: B, + offset: u64, + ctx: &RequestContext, + ) -> Result where B: IoBufMut + Send, { - let (buf, res) = - read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await; + let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| { + self.read_at(buf, offset, ctx) + }) + .await; res.map(|()| buf) } - pub async fn read_exact_at_n(&self, buf: B, offset: u64, count: usize) -> Result + pub async fn read_exact_at_n( + &self, + buf: B, + offset: u64, + count: usize, + ctx: &RequestContext, + ) -> Result where B: IoBufMut + Send, { let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| { - self.read_at(buf, offset) + self.read_at(buf, offset, ctx) }) .await; res.map(|()| buf) @@ -601,12 +614,13 @@ impl VirtualFile { &self, page: PageWriteGuard<'static>, offset: u64, + ctx: &RequestContext, ) -> Result, Error> { let buf = PageWriteGuardBuf { page, init_up_to: 0, }; - let res = self.read_exact_at(buf, offset).await; + let res = self.read_exact_at(buf, offset, ctx).await; res.map(|PageWriteGuardBuf { page, .. }| page) .map_err(|e| Error::new(ErrorKind::Other, e)) } @@ -699,7 +713,12 @@ impl VirtualFile { (buf, Ok(n)) } - pub(crate) async fn read_at(&self, buf: B, offset: u64) -> (B, Result) + pub(crate) async fn read_at( + &self, + buf: B, + offset: u64, + _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ + ) -> (B, Result) where B: tokio_epoll_uring::BoundedBufMut + Send, { @@ -1020,20 +1039,21 @@ impl VirtualFile { pub(crate) async fn read_blk( &self, blknum: u32, + ctx: &RequestContext, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; let buf = vec![0; PAGE_SZ]; let buf = self - .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64)) + .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx) .await?; Ok(crate::tenant::block_io::BlockLease::Vec(buf)) } - async fn read_to_end(&mut self, buf: &mut Vec) -> Result<(), Error> { + async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { let mut tmp = vec![0; 128]; loop { let res; - (tmp, res) = self.read_at(tmp, self.pos).await; + (tmp, res) = self.read_at(tmp, self.pos, ctx).await; match res { Ok(0) => return Ok(()), Ok(n) => { @@ -1176,9 +1196,14 @@ mod tests { } impl MaybeVirtualFile { - async fn read_exact_at(&self, mut buf: Vec, offset: u64) -> Result, Error> { + async fn read_exact_at( + &self, + mut buf: Vec, + offset: u64, + ctx: &RequestContext, + ) -> Result, Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, + MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await, MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), } } @@ -1230,13 +1255,13 @@ mod tests { // Helper function to slurp contents of a file, starting at the current position, // into a string - async fn read_string(&mut self) -> Result { + async fn read_string(&mut self, ctx: &RequestContext) -> Result { use std::io::Read; let mut buf = String::new(); match self { MaybeVirtualFile::VirtualFile(file) => { let mut buf = Vec::new(); - file.read_to_end(&mut buf).await?; + file.read_to_end(&mut buf, ctx).await?; return Ok(String::from_utf8(buf).unwrap()); } MaybeVirtualFile::File(file) => { @@ -1247,9 +1272,14 @@ mod tests { } // Helper function to slurp a portion of a file into a string - async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { + async fn read_string_at( + &mut self, + pos: u64, + len: usize, + ctx: &RequestContext, + ) -> Result { let buf = vec![0; len]; - let buf = self.read_exact_at(buf, pos).await?; + let buf = self.read_exact_at(buf, pos, ctx).await?; Ok(String::from_utf8(buf).unwrap()) } } @@ -1303,7 +1333,7 @@ mod tests { file_a.write_all(b"foobar".to_vec(), &ctx).await?; // cannot read from a file opened in write-only mode - let _ = file_a.read_string().await.unwrap_err(); + let _ = file_a.read_string(&ctx).await.unwrap_err(); // Close the file and re-open for reading let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; @@ -1312,24 +1342,24 @@ mod tests { let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err(); // Try simple read - assert_eq!("foobar", file_a.read_string().await?); + assert_eq!("foobar", file_a.read_string(&ctx).await?); // It's positioned at the EOF now. - assert_eq!("", file_a.read_string().await?); + assert_eq!("", file_a.read_string(&ctx).await?); // Test seeks. assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!("oobar", file_a.read_string().await?); + assert_eq!("oobar", file_a.read_string(&ctx).await?); assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4); - assert_eq!("ar", file_a.read_string().await?); + assert_eq!("ar", file_a.read_string(&ctx).await?); assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3); - assert_eq!("bar", file_a.read_string().await?); + assert_eq!("bar", file_a.read_string(&ctx).await?); assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1); - assert_eq!("oobar", file_a.read_string().await?); + assert_eq!("oobar", file_a.read_string(&ctx).await?); // Test erroneous seeks to before byte 0 file_a.seek(SeekFrom::End(-7)).await.unwrap_err(); @@ -1337,7 +1367,7 @@ mod tests { file_a.seek(SeekFrom::Current(-2)).await.unwrap_err(); // the erroneous seek should have left the position unchanged - assert_eq!("oobar", file_a.read_string().await?); + assert_eq!("oobar", file_a.read_string(&ctx).await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -1354,7 +1384,7 @@ mod tests { file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?; file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?; - assert_eq!(file_b.read_string_at(2, 3).await?, "OBA"); + assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA"); // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) @@ -1366,7 +1396,7 @@ mod tests { for _ in 0..100 { let mut vfile = openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?; - assert_eq!("FOOBAR", vfile.read_string().await?); + assert_eq!("FOOBAR", vfile.read_string(&ctx).await?); vfiles.push(vfile); } @@ -1375,13 +1405,13 @@ mod tests { // The underlying file descriptor for 'file_a' should be closed now. Try to read // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", file_a.read_string().await?); + assert_eq!("oobar", file_a.read_string(&ctx).await?); // Check that all the other FDs still work too. Use them in random order for // good measure. vfiles.as_mut_slice().shuffle(&mut thread_rng()); for vfile in vfiles.iter_mut() { - assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?); + assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?); } Ok(()) @@ -1397,6 +1427,7 @@ mod tests { const THREADS: usize = 100; const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency"); std::fs::create_dir_all(&testdir)?; @@ -1425,12 +1456,13 @@ mod tests { let mut hdls = Vec::new(); for _threadno in 0..THREADS { let files = files.clone(); + let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error); let hdl = rt.spawn(async move { let mut buf = vec![0u8; SIZE]; let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; - buf = f.read_exact_at(buf, 0).await.unwrap(); + buf = f.read_exact_at(buf, 0, &ctx).await.unwrap(); assert!(buf == SAMPLE); } }); @@ -1446,6 +1478,7 @@ mod tests { #[tokio::test] async fn test_atomic_overwrite_basic() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic"); std::fs::create_dir_all(&testdir).unwrap(); @@ -1456,7 +1489,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); - let post = file.read_string().await.unwrap(); + let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); @@ -1465,7 +1498,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); - let post = file.read_string().await.unwrap(); + let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); drop(file); @@ -1473,6 +1506,7 @@ mod tests { #[tokio::test] async fn test_atomic_overwrite_preexisting_tmp() { + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp"); std::fs::create_dir_all(&testdir).unwrap(); @@ -1488,7 +1522,7 @@ mod tests { .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); - let post = file.read_string().await.unwrap(); + let post = file.read_string(&ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file);