spawn_blocking-based file open for image and delta layer loading

This commit is contained in:
Christian Schwarz
2023-08-29 12:49:53 +00:00
parent e7e1df2a79
commit 99f8f87ba5
3 changed files with 76 additions and 3 deletions

View File

@@ -845,7 +845,8 @@ impl DeltaLayerInner {
path: &std::path::Path,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
let file = VirtualFile::open_async(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);

View File

@@ -438,7 +438,8 @@ impl ImageLayerInner {
lsn: Lsn,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
let file = VirtualFile::open_async(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0).await?;

View File

@@ -121,6 +121,74 @@ impl VirtualFile {
Ok(vfile)
}
/// Open a file in read-only mode. Like File::open.
pub async fn open_async(path: &Path) -> Result<VirtualFile, std::io::Error> {
let mut options = OpenOptions::new();
options.read(true);
Self::open_with_options_async(path, options).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create_async(path: &Path) -> Result<VirtualFile, std::io::Error> {
let mut options = OpenOptions::new();
options.write(true).create(true).truncate(true);
Self::open_with_options_async(path, options).await
}
/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options_async(
path: &Path,
open_options: OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string_lossy();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let start = std::time::Instant::now();
let file = tokio::task::spawn_blocking({
let path = path.to_owned();
let open_options = open_options.clone();
move || open_options.open(path)
})
.await
.expect("spawn_blocking")?;
STORAGE_IO_TIME
.with_label_values(&["open"])
.observe(start.elapsed().as_secs_f64());
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options;
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: Arc::new(Mutex::new(Some(file))),
pos: 0,
path: path.to_path_buf(),
tenant_id,
timeline_id,
};
Ok(vfile)
}
/// Call File::sync_all() on the underlying File.
pub fn sync_all(&self) -> Result<(), Error> {
self.with_file("fsync", |file| file.sync_all())?
@@ -258,7 +326,10 @@ impl VirtualFile {
init_up_to: 0,
};
let ((file, buf), res) = system.read(file.into(), offset, buf).await;
let PageWriteGuardBuf { buf: write_guard, init_up_to } = buf;
let PageWriteGuardBuf {
buf: write_guard,
init_up_to,
} = buf;
if let Ok(num_read) = res {
assert!(init_up_to <= num_read);
}