diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 1cf0241631..992f94c0e1 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -1437,12 +1437,11 @@ async fn load_heatmap( path: &Utf8PathBuf, ctx: &RequestContext, ) -> Result, anyhow::Error> { - let mut file = match VirtualFile::open(path, ctx).await { + let st = match VirtualFile::read_to_string(path, ctx).await { Ok(file) => file, Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(e) => Err(e)?, }; - let st = file.read_to_string(ctx).await?; let htm = serde_json::from_str(&st)?; Ok(Some(htm)) } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index fc69b702d3..63b1fa2189 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,7 +12,7 @@ //! src/backend/storage/file/fd.c //! use std::fs::File; -use std::io::{Error, ErrorKind, Seek, SeekFrom}; +use std::io::{Error, ErrorKind}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; @@ -189,10 +189,6 @@ impl VirtualFile { self.inner.metadata().await } - pub async fn seek(&mut self, pos: SeekFrom) -> Result { - self.inner.seek(pos).await - } - pub async fn read_exact_at( &self, slice: Slice, @@ -223,17 +219,31 @@ impl VirtualFile { self.inner.write_all_at(buf, offset, ctx).await } - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - self.inner.read_to_end(buf, ctx).await - } - - pub(crate) async fn read_to_string( - &mut self, + pub(crate) async fn read_to_string>( + path: P, ctx: &RequestContext, - ) -> Result { + ) -> std::io::Result { + let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2 let mut buf = Vec::new(); - self.read_to_end(&mut buf, ctx).await?; - Ok(String::from_utf8(buf)?) + let mut tmp = vec![0; 128]; + let mut pos: u64 = 0; + loop { + let slice = tmp.slice(..128); + let (slice, res) = file.inner.read_at(slice, pos, ctx).await; + match res { + Ok(0) => break, + Ok(n) => { + pos += n as u64; + buf.extend_from_slice(&slice[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + tmp = slice.into_inner(); + } + String::from_utf8(buf).map_err(|_| { + std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8") + }) } } @@ -280,9 +290,6 @@ pub struct VirtualFileInner { /// belongs to a different VirtualFile. handle: RwLock, - /// Current file position - pos: u64, - /// File path and options to use to open it. /// /// Note: this only contains the options needed to re-open it. For example, @@ -596,7 +603,6 @@ impl VirtualFileInner { let vfile = VirtualFileInner { handle: RwLock::new(handle), - pos: 0, path: path.to_owned(), open_options: reopen_options, }; @@ -730,32 +736,6 @@ impl VirtualFileInner { }) } - pub async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(offset) => { - self.pos = offset; - } - SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard - .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))? - } - SeekFrom::Current(offset) => { - let pos = self.pos as i128 + offset as i128; - if pos < 0 { - return Err(Error::new( - ErrorKind::InvalidInput, - "offset would be negative", - )); - } - if pos > u64::MAX as i128 { - return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); - } - self.pos = pos as u64; - } - } - Ok(self.pos) - } - /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`. /// /// The returned `Slice` is equivalent to the input `slice`, i.e., it's the same view into the same buffer. @@ -839,7 +819,7 @@ impl VirtualFileInner { (restore(buf), Ok(())) } - pub(crate) async fn read_at( + pub(super) async fn read_at( &self, buf: tokio_epoll_uring::Slice, offset: u64, @@ -887,24 +867,6 @@ impl VirtualFileInner { (buf, result) }) } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - let mut tmp = vec![0; 128]; - loop { - let slice = tmp.slice(..128); - let (slice, res) = self.read_at(slice, self.pos, ctx).await; - match res { - Ok(0) => return Ok(()), - Ok(n) => { - self.pos += n as u64; - buf.extend_from_slice(&slice[..n]); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - tmp = slice.into_inner(); - } - } } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 @@ -1119,19 +1081,6 @@ impl FileGuard { let _ = file.into_raw_fd(); res } - /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually. - fn with_std_file_mut(&mut self, with: F) -> R - where - F: FnOnce(&mut File) -> R, - { - // SAFETY: - // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. - // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd - let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; - let res = with(&mut file); - let _ = file.into_raw_fd(); - res - } } impl tokio_epoll_uring::IoFd for FileGuard { @@ -1351,30 +1300,6 @@ mod tests { MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), } } - async fn seek(&mut self, pos: SeekFrom) -> Result { - match self { - MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await, - MaybeVirtualFile::File(file) => file.seek(pos), - } - } - - // Helper function to slurp contents of a file, starting at the current position, - // into a string - async fn read_string(&mut self, ctx: &RequestContext) -> Result { - use std::io::Read; - let mut buf = String::new(); - match self { - MaybeVirtualFile::VirtualFile(file) => { - let mut buf = Vec::new(); - file.read_to_end(&mut buf, ctx).await?; - return Ok(String::from_utf8(buf).unwrap()); - } - MaybeVirtualFile::File(file) => { - file.read_to_string(&mut buf)?; - } - } - Ok(buf) - } // Helper function to slurp a portion of a file into a string async fn read_string_at( @@ -1474,7 +1399,7 @@ mod tests { .await?; // cannot read from a file opened in write-only mode - let _ = file_a.read_string(&ctx).await.unwrap_err(); + let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err(); // Close the file and re-open for reading let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; @@ -1486,32 +1411,7 @@ mod tests { .unwrap_err(); // Try simple read - assert_eq!("foobar", file_a.read_string(&ctx).await?); - - // It's positioned at the EOF now. - assert_eq!("", file_a.read_string(&ctx).await?); - - // Test seeks. - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!("oobar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4); - assert_eq!("ar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3); - assert_eq!("bar", file_a.read_string(&ctx).await?); - - assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1); - assert_eq!("oobar", file_a.read_string(&ctx).await?); - - // Test erroneous seeks to before byte 0 - file_a.seek(SeekFrom::End(-7)).await.unwrap_err(); - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); - file_a.seek(SeekFrom::Current(-2)).await.unwrap_err(); - - // the erroneous seek should have left the position unchanged - assert_eq!("oobar", file_a.read_string(&ctx).await?); + assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); // Create another test file, and try FileExt functions on it. let path_b = testdir.join("file_b"); @@ -1537,9 +1437,6 @@ mod tests { // Open a lot of files, enough to cause some evictions. (Or to be precise, // open the same file many times. The effect is the same.) - // - // leave file_a positioned at offset 1 before we start - assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1); let mut vfiles = Vec::new(); for _ in 0..100 { @@ -1549,7 +1446,7 @@ mod tests { &ctx, ) .await?; - assert_eq!("FOOBAR", vfile.read_string(&ctx).await?); + assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?); vfiles.push(vfile); } @@ -1557,8 +1454,8 @@ mod tests { assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2); // The underlying file descriptor for 'file_a' should be closed now. Try to read - // from it again. We left the file positioned at offset 1 above. - assert_eq!("oobar", file_a.read_string(&ctx).await?); + // from it again. + assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?); // Check that all the other FDs still work too. Use them in random order for // good measure. @@ -1652,7 +1549,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file); @@ -1661,7 +1558,7 @@ mod tests { .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "bar"); assert!(!tmp_path.exists()); drop(file); @@ -1686,7 +1583,7 @@ mod tests { .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap()); - let post = file.read_string(&ctx).await.unwrap(); + let post = file.read_string_at(0, 3, &ctx).await.unwrap(); assert_eq!(post, "foo"); assert!(!tmp_path.exists()); drop(file);