Compare commits

...

16 Commits

Author SHA1 Message Date
Arpad Müller
d2314afaf6 Make persist_tenant_config async fn 2023-09-01 14:29:06 +02:00
Arpad Müller
0aed765c6d Make tenant_map_insert and its closure async fn 2023-09-01 14:27:27 +02:00
Arpad Müller
aa4763bed1 Use write_and_fsync in persist_tenant_config 2023-09-01 14:18:34 +02:00
Arpad Müller
a13b0cc86b Make read_at async fn 2023-09-01 13:52:21 +02:00
Arpad Müller
167342db13 Make read_exact_at async fn 2023-09-01 13:52:21 +02:00
Arpad Müller
dc6668d4cd Add MaybeVirtualFile and use it in tests 2023-09-01 13:52:21 +02:00
Arpad Müller
df28e7afa0 Use write_and_fsync in save_metadata 2023-09-01 13:51:33 +02:00
Arpad Müller
9cffba255c Make save_metadata async fn 2023-09-01 13:51:33 +02:00
Arpad Müller
8ef27a3bf8 Add write_and_fsync function 2023-09-01 13:51:33 +02:00
Arpad Müller
afbebce051 Remove bounds 2023-09-01 13:51:33 +02:00
Arpad Müller
486a7f6d63 Make write_all_at async 2023-09-01 13:51:33 +02:00
Christian Schwarz
3413937eb2 FileBlockReader::fill_buffer make it obvious that we need to switch to async API 2023-09-01 13:51:33 +02:00
Arpad Müller
67f7e6f192 Remove Read impl that was only used in one place 2023-09-01 13:51:33 +02:00
Arpad Müller
69d046fd7e Move used FileExt functions to inherent impls 2023-09-01 13:51:33 +02:00
Christian Schwarz
353ebefefc FileBlockReader<File> is never used 2023-09-01 13:51:33 +02:00
Arpad Müller
17b3f4bc7d Move VirtualFile::seek to inherent function 2023-09-01 13:51:33 +02:00
12 changed files with 336 additions and 251 deletions

View File

@@ -469,7 +469,9 @@ impl PageServerHandler {
// Create empty timeline // Create empty timeline
info!("creating new timeline"); info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?; let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn. // TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute // We might have some wal to import as well, and we should prevent compute

View File

@@ -34,7 +34,6 @@ use std::fs;
use std::fs::File; use std::fs::File;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io; use std::io;
use std::io::Write;
use std::ops::Bound::Included; use std::ops::Bound::Included;
use std::path::Path; use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
@@ -448,6 +447,7 @@ impl Tenant {
up_to_date_metadata, up_to_date_metadata,
first_save, first_save,
) )
.await
.context("save_metadata")?; .context("save_metadata")?;
} }
@@ -1450,7 +1450,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline. /// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.) /// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline( pub async fn create_empty_timeline(
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
@@ -1462,10 +1462,10 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant" "Cannot create empty timelines on inactive tenant"
); );
let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = {
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; let timelines = self.timelines.lock().unwrap();
drop(timelines); self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
};
let new_metadata = TimelineMetadata::new( let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to // Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation() // make it valid, before calling finish_creation()
@@ -1484,6 +1484,7 @@ impl Tenant {
initdb_lsn, initdb_lsn,
None, None,
) )
.await
} }
/// Helper for unit tests to create an empty timeline. /// Helper for unit tests to create an empty timeline.
@@ -1499,7 +1500,9 @@ impl Tenant {
pg_version: u32, pg_version: u32,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?; let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
let tline = uninit_tl.raw_timeline().expect("we just created it"); let tline = uninit_tl.raw_timeline().expect("we just created it");
assert_eq!(tline.get_last_record_lsn(), Lsn(0)); assert_eq!(tline.get_last_record_lsn(), Lsn(0));
@@ -2421,67 +2424,67 @@ impl Tenant {
Ok(tenant_conf) Ok(tenant_conf)
} }
pub(super) fn persist_tenant_config( #[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config(
tenant_id: &TenantId, tenant_id: &TenantId,
target_config_path: &Path, target_config_path: &Path,
tenant_conf: TenantConfOpt, tenant_conf: TenantConfOpt,
creating_tenant: bool, creating_tenant: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let _enter = info_span!("saving tenantconf").entered();
// imitate a try-block with a closure // imitate a try-block with a closure
let do_persist = |target_config_path: &Path| -> anyhow::Result<()> { let do_persist = |target_config_path: PathBuf| -> _ {
let target_config_parent = target_config_path.parent().with_context(|| { async move {
format!( let target_config_parent = target_config_path.parent().with_context(|| {
"Config path does not have a parent: {}", format!(
target_config_path.display() "Config path does not have a parent: {}",
) target_config_path.display()
})?; )
})?;
info!("persisting tenantconf to {}", target_config_path.display()); info!("persisting tenantconf to {}", target_config_path.display());
let mut conf_content = r#"# This file contains a specific per-tenant's config. let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart. # It is read in case of pageserver restart.
[tenant_config] [tenant_config]
"# "#
.to_string(); .to_string();
// Convert the config to a toml file. // Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string(&tenant_conf)?; conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
let mut target_config_file = VirtualFile::open_with_options( let mut target_config_file = VirtualFile::open_with_options(
target_config_path, &target_config_path,
OpenOptions::new() OpenOptions::new()
.truncate(true) // This needed for overwriting with small config files .truncate(true) // This needed for overwriting with small config files
.write(true) .write(true)
.create_new(creating_tenant) .create_new(creating_tenant)
// when creating a new tenant, first_save will be true and `.create(true)` will be // when creating a new tenant, first_save will be true and `.create(true)` will be
// ignored (per rust std docs). // ignored (per rust std docs).
// //
// later when updating the config of created tenant, or persisting config for the // later when updating the config of created tenant, or persisting config for the
// first time for attached tenant, the `.create(true)` is used. // first time for attached tenant, the `.create(true)` is used.
.create(true), .create(true),
)?; )?;
target_config_file target_config_file
.write(conf_content.as_bytes()) .write_and_fsync(conf_content.as_bytes())
.context("write toml bytes into file") .context("write tenant config toml bytes into file")?;
.and_then(|_| target_config_file.sync_all().context("fsync config file"))
.context("write config file")?;
// fsync the parent directory to ensure the directory entry is durable. // fsync the parent directory to ensure the directory entry is durable.
// before this was done conditionally on creating_tenant, but these management actions are rare // before this was done conditionally on creating_tenant, but these management actions are rare
// enough to just fsync it always. // enough to just fsync it always.
crashsafe::fsync(target_config_parent)?; crashsafe::fsync(target_config_parent)?;
// XXX we're not fsyncing the parent dir, need to do that in case `creating_tenant` // XXX we're not fsyncing the parent dir, need to do that in case `creating_tenant`
Ok(()) Result::<_, anyhow::Error>::Ok(())
}
}; };
let pb = target_config_path.to_owned();
// this function is called from creating the tenant and updating the tenant config, which // this function is called from creating the tenant and updating the tenant config, which
// would otherwise share this context, so keep it here in one place. // would otherwise share this context, so keep it here in one place.
do_persist(target_config_path).with_context(|| { do_persist(pb).await.with_context(|| {
format!( format!(
"write tenant {tenant_id} config to {}", "write tenant {tenant_id} config to {}",
target_config_path.display() target_config_path.display()
@@ -2797,13 +2800,15 @@ impl Tenant {
src_timeline.pg_version, src_timeline.pg_version,
); );
let uninitialized_timeline = self.prepare_new_timeline( let uninitialized_timeline = self
dst_id, .prepare_new_timeline(
&metadata, dst_id,
timeline_uninit_mark, &metadata,
start_lsn + 1, timeline_uninit_mark,
Some(Arc::clone(src_timeline)), start_lsn + 1,
)?; Some(Arc::clone(src_timeline)),
)
.await?;
let new_timeline = uninitialized_timeline.finish_creation()?; let new_timeline = uninitialized_timeline.finish_creation()?;
@@ -2881,13 +2886,15 @@ impl Tenant {
pgdata_lsn, pgdata_lsn,
pg_version, pg_version,
); );
let raw_timeline = self.prepare_new_timeline( let raw_timeline = self
timeline_id, .prepare_new_timeline(
&new_metadata, timeline_id,
timeline_uninit_mark, &new_metadata,
pgdata_lsn, timeline_uninit_mark,
None, pgdata_lsn,
)?; None,
)
.await?;
let tenant_id = raw_timeline.owning_tenant.tenant_id; let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?; let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2958,7 +2965,7 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call /// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the /// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file. /// uninit mark file.
fn prepare_new_timeline( async fn prepare_new_timeline(
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata, new_metadata: &TimelineMetadata,
@@ -2986,8 +2993,9 @@ impl Tenant {
timeline_struct.init_empty_layer_map(start_lsn); timeline_struct.init_empty_layer_map(start_lsn);
if let Err(e) = if let Err(e) = self
self.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata) .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
.await
{ {
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}"); error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark); cleanup_timeline_directory(uninit_mark);
@@ -3003,7 +3011,7 @@ impl Tenant {
)) ))
} }
fn create_timeline_files( async fn create_timeline_files(
&self, &self,
timeline_path: &Path, timeline_path: &Path,
new_timeline_id: &TimelineId, new_timeline_id: &TimelineId,
@@ -3022,6 +3030,7 @@ impl Tenant {
new_metadata, new_metadata,
true, true,
) )
.await
.context("Failed to create timeline metadata")?; .context("Failed to create timeline metadata")?;
Ok(()) Ok(())
} }
@@ -3169,7 +3178,7 @@ pub(crate) enum CreateTenantFilesMode {
Attach, Attach,
} }
pub(crate) fn create_tenant_files( pub(crate) async fn create_tenant_files(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_conf: TenantConfOpt, tenant_conf: TenantConfOpt,
tenant_id: &TenantId, tenant_id: &TenantId,
@@ -3205,7 +3214,8 @@ pub(crate) fn create_tenant_files(
mode, mode,
&temporary_tenant_dir, &temporary_tenant_dir,
&target_tenant_directory, &target_tenant_directory,
); )
.await;
if creation_result.is_err() { if creation_result.is_err() {
error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data"); error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
@@ -3223,7 +3233,7 @@ pub(crate) fn create_tenant_files(
Ok(target_tenant_directory) Ok(target_tenant_directory)
} }
fn try_create_target_tenant_dir( async fn try_create_target_tenant_dir(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_conf: TenantConfOpt, tenant_conf: TenantConfOpt,
tenant_id: &TenantId, tenant_id: &TenantId,
@@ -3262,7 +3272,8 @@ fn try_create_target_tenant_dir(
) )
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?; .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf, true)?; Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf, true)
.await?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| { crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!( format!(
@@ -3649,7 +3660,10 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?; .await?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) { match tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"), Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!( Err(e) => assert_eq!(
e.to_string(), e.to_string(),
@@ -4489,8 +4503,9 @@ mod tests {
.await; .await;
let initdb_lsn = Lsn(0x20); let initdb_lsn = Lsn(0x20);
let utline = let utline = tenant
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?; .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = utline.raw_timeline().unwrap(); let tline = utline.raw_timeline().unwrap();
// Spawn flush loop now so that we can set the `expect_initdb_optimization` // Spawn flush loop now so that we can set the `expect_initdb_optimization`
@@ -4555,8 +4570,9 @@ mod tests {
let harness = TenantHarness::create(name)?; let harness = TenantHarness::create(name)?;
{ {
let (tenant, ctx) = harness.load().await; let (tenant, ctx) = harness.load().await;
let tline = let tline = tenant
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
// Keeps uninit mark in place // Keeps uninit mark in place
std::mem::forget(tline); std::mem::forget(tline);
} }

View File

@@ -96,18 +96,12 @@ pub trait BlobWriter {
/// An implementation of BlobWriter to write blobs to anything that /// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write. /// implements std::io::Write.
/// ///
pub struct WriteBlobWriter<W> pub struct WriteBlobWriter<W> {
where
W: std::io::Write,
{
inner: W, inner: W,
offset: u64, offset: u64,
} }
impl<W> WriteBlobWriter<W> impl<W> WriteBlobWriter<W> {
where
W: std::io::Write,
{
pub fn new(inner: W, start_offset: u64) -> Self { pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter { WriteBlobWriter {
inner, inner,

View File

@@ -7,9 +7,7 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use bytes::Bytes; use bytes::Bytes;
use std::fs::File;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
/// This is implemented by anything that can read 8 kB (PAGE_SZ) /// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache /// blocks, using the page cache
@@ -74,7 +72,6 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though. /// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> { pub(crate) enum BlockReaderRef<'a> {
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>), FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
EphemeralFile(&'a EphemeralFile), EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>), Adapter(Adapter<&'a DeltaLayerInner>),
#[cfg(test)] #[cfg(test)]
@@ -87,7 +84,6 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*; use BlockReaderRef::*;
match self { match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum).await, FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await, EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await, Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)] #[cfg(test)]
@@ -150,20 +146,19 @@ pub struct FileBlockReader<F> {
file_id: page_cache::FileId, file_id: page_cache::FileId,
} }
impl<F> FileBlockReader<F> impl FileBlockReader<VirtualFile> {
where pub fn new(file: VirtualFile) -> Self {
F: FileExt,
{
pub fn new(file: F) -> Self {
let file_id = page_cache::next_file_id(); let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file } FileBlockReader { file_id, file }
} }
/// Read a page from the underlying file into given buffer. /// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ); assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.await
} }
/// Read a block. /// Read a block.
/// ///
@@ -185,7 +180,7 @@ where
ReadBufResult::Found(guard) => break Ok(guard.into()), ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => { ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer // Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?; self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid(); write_guard.mark_valid();
// Swap for read lock // Swap for read lock
@@ -196,12 +191,6 @@ where
} }
} }
impl BlockReader for FileBlockReader<File> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
}
}
impl BlockReader for FileBlockReader<VirtualFile> { impl BlockReader for FileBlockReader<VirtualFile> {
fn block_cursor(&self) -> BlockCursor<'_> { fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self)) BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))

View File

@@ -9,7 +9,6 @@ use std::cmp::min;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{self, ErrorKind}; use std::io::{self, ErrorKind};
use std::ops::DerefMut; use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use tracing::*; use tracing::*;
@@ -88,7 +87,8 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut(); let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ); debug_assert_eq!(buf.len(), PAGE_SZ);
self.file self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid(); write_guard.mark_valid();
// Swap for read lock // Swap for read lock
@@ -128,10 +128,15 @@ impl EphemeralFile {
self.off += n; self.off += n;
src_remaining = &src_remaining[n..]; src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ { if self.off == PAGE_SZ {
match self.ephemeral_file.file.write_all_at( match self
&self.ephemeral_file.mutable_tail, .ephemeral_file
self.blknum as u64 * PAGE_SZ as u64, .file
) { .write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
Ok(_) => { Ok(_) => {
// Pre-warm the page cache with what we just wrote. // Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it. // This isn't necessary for coherency/correctness, but it's how we've always done it.

View File

@@ -26,7 +26,7 @@
//! recovered from this file. This is tracked in //! recovered from this file. This is tracked in
//! <https://github.com/neondatabase/neon/issues/4418> //! <https://github.com/neondatabase/neon/issues/4418>
use std::io::{self, Read, Write}; use std::io::{self, Write};
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use anyhow::Result; use anyhow::Result;
@@ -151,11 +151,13 @@ impl Manifest {
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted, /// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and /// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
/// backup the current one. /// backup the current one.
pub fn load( pub async fn load(
mut file: VirtualFile, file: VirtualFile,
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> { ) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
let mut buf = vec![]; let mut buf = vec![];
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?; file.read_exact_at(&mut buf, 0)
.await
.map_err(ManifestLoadError::Io)?;
// Read manifest header // Read manifest header
let mut buf = Bytes::from(buf); let mut buf = Bytes::from(buf);
@@ -241,8 +243,8 @@ mod tests {
use super::*; use super::*;
#[test] #[tokio::test]
fn test_read_manifest() { async fn test_read_manifest() {
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest"); let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
std::fs::create_dir_all(&testdir).unwrap(); std::fs::create_dir_all(&testdir).unwrap();
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap(); let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
@@ -274,7 +276,7 @@ mod tests {
.truncate(false), .truncate(false),
) )
.unwrap(); .unwrap();
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap(); let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0); assert!(!corrupted.0);
assert_eq!(operations.len(), 2); assert_eq!(operations.len(), 2);
assert_eq!( assert_eq!(
@@ -306,7 +308,7 @@ mod tests {
.truncate(false), .truncate(false),
) )
.unwrap(); .unwrap();
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap(); let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0); assert!(!corrupted.0);
assert_eq!(operations.len(), 3); assert_eq!(operations.len(), 3);
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0))); assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));

View File

@@ -9,9 +9,9 @@
//! [`remote_timeline_client`]: super::remote_timeline_client //! [`remote_timeline_client`]: super::remote_timeline_client
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::{self, Write}; use std::io;
use anyhow::{bail, ensure, Context}; use anyhow::{ensure, Context};
use serde::{de::Error, Deserialize, Serialize, Serializer}; use serde::{de::Error, Deserialize, Serialize, Serializer};
use thiserror::Error; use thiserror::Error;
use tracing::info_span; use tracing::info_span;
@@ -255,7 +255,7 @@ impl Serialize for TimelineMetadata {
} }
/// Save timeline metadata to file /// Save timeline metadata to file
pub fn save_metadata( pub async fn save_metadata(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_id: &TenantId, tenant_id: &TenantId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
@@ -273,10 +273,7 @@ pub fn save_metadata(
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?; let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
if file.write(&metadata_bytes)? != metadata_bytes.len() { file.write_and_fsync(&metadata_bytes)?;
bail!("Could not write all the metadata bytes in a single call");
}
file.sync_all()?;
// fsync the parent directory to ensure the directory entry is durable // fsync the parent directory to ensure the directory entry is durable
if first_save { if first_save {

View File

@@ -361,11 +361,11 @@ pub async fn create_tenant(
remote_storage: Option<GenericRemoteStorage>, remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> { ) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || { tenant_map_insert(tenant_id, || async {
// We're holding the tenants lock in write mode while doing local IO. // We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating` // If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state. // and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create)?; let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
// TODO: tenant directory remains on disk if we bail out from here on. // TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233 // See https://github.com/neondatabase/neon/issues/4233
@@ -405,6 +405,7 @@ pub async fn set_new_tenant_config(
let tenant_config_path = conf.tenant_config_path(&tenant_id); let tenant_config_path = conf.tenant_config_path(&tenant_id);
Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf, false) Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf, false)
.await
.map_err(SetNewTenantConfigError::Persist)?; .map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf); tenant.set_new_tenant_config(new_tenant_conf);
Ok(()) Ok(())
@@ -525,7 +526,7 @@ pub async fn load_tenant(
remote_storage: Option<GenericRemoteStorage>, remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> { ) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || { tenant_map_insert(tenant_id, || async {
let tenant_path = conf.tenant_path(&tenant_id); let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
if tenant_ignore_mark.exists() { if tenant_ignore_mark.exists() {
@@ -606,8 +607,8 @@ pub async fn attach_tenant(
remote_storage: GenericRemoteStorage, remote_storage: GenericRemoteStorage,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> { ) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || { tenant_map_insert(tenant_id, || async {
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?; let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
// TODO: tenant directory remains on disk if we bail out from here on. // TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233 // See https://github.com/neondatabase/neon/issues/4233
@@ -655,12 +656,13 @@ pub enum TenantMapInsertError {
/// ///
/// NB: the closure should return quickly because the current implementation of tenants map /// NB: the closure should return quickly because the current implementation of tenants map
/// serializes access through an `RwLock`. /// serializes access through an `RwLock`.
async fn tenant_map_insert<F>( async fn tenant_map_insert<F, R>(
tenant_id: TenantId, tenant_id: TenantId,
insert_fn: F, insert_fn: F,
) -> Result<Arc<Tenant>, TenantMapInsertError> ) -> Result<Arc<Tenant>, TenantMapInsertError>
where where
F: FnOnce() -> anyhow::Result<Arc<Tenant>>, F: FnOnce() -> R,
R: std::future::Future<Output = anyhow::Result<Arc<Tenant>>>,
{ {
let mut guard = TENANTS.write().await; let mut guard = TENANTS.write().await;
let m = match &mut *guard { let m = match &mut *guard {
@@ -673,7 +675,7 @@ where
tenant_id, tenant_id,
e.get().current_state(), e.get().current_state(),
)), )),
hash_map::Entry::Vacant(v) => match insert_fn() { hash_map::Entry::Vacant(v) => match insert_fn().await {
Ok(tenant) => { Ok(tenant) => {
v.insert(tenant.clone()); v.insert(tenant.clone());
Ok(tenant) Ok(tenant)

View File

@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range; use std::ops::Range;
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};

View File

@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::Write; use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range; use std::ops::Range;
use std::os::unix::prelude::FileExt; use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};

View File

@@ -2735,6 +2735,7 @@ impl Timeline {
if disk_consistent_lsn != old_disk_consistent_lsn { if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn); assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload) self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
.await
.context("update_metadata_file")?; .context("update_metadata_file")?;
// Also update the in-memory copy // Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn); self.disk_consistent_lsn.store(disk_consistent_lsn);
@@ -2743,7 +2744,7 @@ impl Timeline {
} }
/// Update metadata file /// Update metadata file
fn update_metadata_file( async fn update_metadata_file(
&self, &self,
disk_consistent_lsn: Lsn, disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>, layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
@@ -2791,6 +2792,7 @@ impl Timeline {
&metadata, &metadata,
false, false,
) )
.await
.context("save_metadata")?; .context("save_metadata")?;
if let Some(remote_client) = &self.remote_client { if let Some(remote_client) = &self.remote_client {
@@ -4122,7 +4124,8 @@ impl Timeline {
if !layers_to_remove.is_empty() { if !layers_to_remove.is_empty() {
// Persist the new GC cutoff value in the metadata file, before // Persist the new GC cutoff value in the metadata file, before
// we actually remove anything. // we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?; self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
.await?;
// Actually delete the layers from disk and remove them from the map. // Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection // (couldn't do this in the loop above, because you cannot modify a collection

View File

@@ -13,7 +13,7 @@
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -321,54 +321,8 @@ impl VirtualFile {
drop(self); drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file"); std::fs::remove_file(path).expect("failed to remove the virtual file");
} }
}
impl Drop for VirtualFile { pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Read for VirtualFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.read_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl Seek for VirtualFile {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos { match pos {
SeekFrom::Start(offset) => { SeekFrom::Start(offset) => {
self.pos = offset; self.pos = offset;
@@ -392,10 +346,64 @@ impl Seek for VirtualFile {
} }
Ok(self.pos) Ok(self.pos)
} }
}
impl FileExt for VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> { pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.write_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
/// Write the given buffer (which has to be below the kernel's internal page size) and fsync
///
/// This ensures some level of atomicity (not a good one, but it's the best we have).
pub fn write_and_fsync(&mut self, buf: &[u8]) -> Result<(), Error> {
if self.write(buf)? != buf.len() {
return Err(Error::new(
std::io::ErrorKind::Other,
"Could not write all the bytes in a single call",
));
}
self.sync_all()?;
Ok(())
}
async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?; let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result { if let Ok(size) = result {
STORAGE_IO_SIZE STORAGE_IO_SIZE
@@ -405,7 +413,7 @@ impl FileExt for VirtualFile {
result result
} }
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> { pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?; let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result { if let Ok(size) = result {
STORAGE_IO_SIZE STORAGE_IO_SIZE
@@ -416,6 +424,41 @@ impl FileExt for VirtualFile {
} }
} }
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl OpenFiles { impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles { fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots)); let mut slots = Box::new(Vec::with_capacity(num_slots));
@@ -470,32 +513,68 @@ mod tests {
use rand::thread_rng; use rand::thread_rng;
use rand::Rng; use rand::Rng;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
// Helper function to slurp contents of a file, starting at the current position, enum MaybeVirtualFile {
// into a string VirtualFile(VirtualFile),
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error> File(File),
where
FD: Read,
{
let mut buf = String::new();
vfile.read_to_string(&mut buf)?;
Ok(buf)
} }
// Helper function to slurp a portion of a file into a string impl MaybeVirtualFile {
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error> async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
where match self {
FD: FileExt, MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
{ MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
let mut buf = Vec::new(); }
buf.resize(len, 0); }
vfile.read_exact_at(&mut buf, pos)?; async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
Ok(String::from_utf8(buf).unwrap()) match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos),
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf),
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => {
let pos = file.seek(SeekFrom::Current(0))?;
let len = file.metadata()?.len().saturating_sub(pos);
let len_usize = len.try_into().unwrap();
return self.read_string_at(pos, len_usize).await;
}
MaybeVirtualFile::File(file) => {
file.read_to_string(&mut buf)?;
}
}
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let mut buf = Vec::new();
buf.resize(len, 0);
self.read_exact_at(&mut buf, pos).await?;
Ok(String::from_utf8(buf).unwrap())
}
} }
#[test] #[tokio::test]
fn test_virtual_files() -> Result<(), Error> { async fn test_virtual_files() -> Result<(), Error> {
// The real work is done in the test_files() helper function. This // The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and // allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them, // VirtualFile. We trust the native Files and wouldn't need to test them,
@@ -504,21 +583,23 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit // native files, you will run out of file descriptors if the ulimit
// is low enough.) // is low enough.)
test_files("virtual_files", |path, open_options| { test_files("virtual_files", |path, open_options| {
VirtualFile::open_with_options(path, open_options) let vf = VirtualFile::open_with_options(path, open_options)?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}) })
.await
} }
#[test] #[tokio::test]
fn test_physical_files() -> Result<(), Error> { async fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| { test_files("physical_files", |path, open_options| {
open_options.open(path) Ok(MaybeVirtualFile::File(open_options.open(path)?))
}) })
.await
} }
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error> async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
where where
FD: Read + Write + Seek + FileExt, OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
{ {
let testdir = crate::config::PageServerConf::test_repo_dir(testname); let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?; std::fs::create_dir_all(&testdir)?;
@@ -528,36 +609,36 @@ mod tests {
&path_a, &path_a,
OpenOptions::new().write(true).create(true).truncate(true), OpenOptions::new().write(true).create(true).truncate(true),
)?; )?;
file_a.write_all(b"foobar")?; file_a.write_all(b"foobar").await?;
// cannot read from a file opened in write-only mode // cannot read from a file opened in write-only mode
assert!(read_string(&mut file_a).is_err()); assert!(file_a.read_string().await.is_err());
// Close the file and re-open for reading // Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
// cannot write to a file opened in read-only mode // cannot write to a file opened in read-only mode
assert!(file_a.write(b"bar").is_err()); assert!(file_a.write_all(b"bar").await.is_err());
// Try simple read // Try simple read
assert_eq!("foobar", read_string(&mut file_a)?); assert_eq!("foobar", file_a.read_string().await?);
// It's positioned at the EOF now. // It's positioned at the EOF now.
assert_eq!("", read_string(&mut file_a)?); assert_eq!("", file_a.read_string().await?);
// Test seeks. // Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
assert_eq!("ar", read_string(&mut file_a)?); assert_eq!("ar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
assert_eq!("bar", read_string(&mut file_a)?); assert_eq!("bar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
// Test erroneous seeks to before byte 0 // Test erroneous seeks to before byte 0
assert!(file_a.seek(SeekFrom::End(-7)).is_err()); assert!(file_a.seek(SeekFrom::End(-7)).is_err());
@@ -565,7 +646,7 @@ mod tests {
assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
// the erroneous seek should have left the position unchanged // the erroneous seek should have left the position unchanged
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
// Create another test file, and try FileExt functions on it. // Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b"); let path_b = testdir.join("file_b");
@@ -577,10 +658,10 @@ mod tests {
.create(true) .create(true)
.truncate(true), .truncate(true),
)?; )?;
file_b.write_all_at(b"BAR", 3)?; file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0)?; file_b.write_all_at(b"FOO", 0).await?;
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA"); assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise, // Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.) // open the same file many times. The effect is the same.)
@@ -591,7 +672,7 @@ mod tests {
let mut vfiles = Vec::new(); let mut vfiles = Vec::new();
for _ in 0..100 { for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
assert_eq!("FOOBAR", read_string(&mut vfile)?); assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile); vfiles.push(vfile);
} }
@@ -600,13 +681,13 @@ mod tests {
// The underlying file descriptor for 'file_a' should be closed now. Try to read // The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above. // from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
// Check that all the other FDs still work too. Use them in random order for // Check that all the other FDs still work too. Use them in random order for
// good measure. // good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng()); vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() { for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?); assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
} }
Ok(()) Ok(())
@@ -641,28 +722,22 @@ mod tests {
let files = Arc::new(files); let files = Arc::new(files);
// Launch many threads, and use the virtual files concurrently in random order. // Launch many threads, and use the virtual files concurrently in random order.
let mut threads = Vec::new(); let rt = tokio::runtime::Builder::new_multi_thread()
for threadno in 0..THREADS { .worker_threads(THREADS)
let builder = .thread_name("test_vfile_concurrency thread")
thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno)); .build()
.unwrap();
for _threadno in 0..THREADS {
let files = files.clone(); let files = files.clone();
let thread = builder rt.spawn(async move {
.spawn(move || { let mut buf = [0u8; SIZE];
let mut buf = [0u8; SIZE]; let mut rng = rand::rngs::OsRng;
let mut rng = rand::thread_rng(); for _ in 1..1000 {
for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())];
let f = &files[rng.gen_range(0..files.len())]; f.read_exact_at(&mut buf, 0).await.unwrap();
f.read_exact_at(&mut buf, 0).unwrap(); assert!(buf == SAMPLE);
assert!(buf == SAMPLE); }
} });
})
.unwrap();
threads.push(thread);
}
for thread in threads {
thread.join().unwrap();
} }
Ok(()) Ok(())