Make VirtualFile::{open_with_options, create} async fn

This commit is contained in:
Arpad Müller
2023-09-06 18:30:02 +02:00
parent ae1af9d10e
commit bd04abbcab
8 changed files with 30 additions and 22 deletions

View File

@@ -238,7 +238,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(&path)?;
let file = VirtualFile::create(&path).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;

View File

@@ -28,7 +28,7 @@ pub struct EphemeralFile {
}
impl EphemeralFile {
pub fn create(
pub async fn create(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -44,7 +44,8 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
)
.await?;
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
@@ -286,7 +287,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
let pos_foo = file.write_blob(b"foo").await?;
assert_eq!(

View File

@@ -604,7 +604,7 @@ impl DeltaLayerWriterInner {
// FIXME: throw an error instead?
let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path)?;
let mut file = VirtualFile::create(&path).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);

View File

@@ -541,7 +541,8 @@ impl ImageLayerWriterInner {
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
)
.await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);

View File

@@ -236,7 +236,7 @@ impl InMemoryLayer {
///
/// Create a new, empty, in-memory layer
///
pub fn create(
pub async fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -244,7 +244,7 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
let file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
Ok(InMemoryLayer {
conf,

View File

@@ -2544,13 +2544,15 @@ impl Timeline {
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)?;
let layer = guard
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)
.await?;
Ok(layer)
}

View File

@@ -87,7 +87,7 @@ impl LayerManager {
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
pub(crate) fn get_layer_for_write(
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
@@ -129,7 +129,7 @@ impl LayerManager {
lsn
);
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn).await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());

View File

@@ -211,16 +211,17 @@ impl CrashsafeOverwriteError {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
Self::open_with_options(path, OpenOptions::new().read(true)).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
}
/// Open a file with given options.
@@ -228,7 +229,7 @@ impl VirtualFile {
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub fn open_with_options(
pub async fn open_with_options(
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
@@ -299,6 +300,7 @@ impl VirtualFile {
// we bail out instead of causing damage.
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
@@ -317,6 +319,7 @@ impl VirtualFile {
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
@@ -703,7 +706,7 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| {
let vf = VirtualFile::open_with_options(path, open_options)?;
let vf = VirtualFile::open_with_options(path, open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
@@ -836,7 +839,8 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))?;
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
files.push(f);
}
let files = Arc::new(files);