Compare commits

...

13 Commits

Author SHA1 Message Date
Arpad Müller
a9e5da9613 wip 2023-09-01 00:19:47 +02:00
Arpad Müller
a5acfdaa5a Use write_and_fsync in save_metadata 2023-09-01 00:19:47 +02:00
Arpad Müller
70052ae1ca Make save_metadata async fn 2023-09-01 00:19:47 +02:00
Arpad Müller
fe69dd9a40 Add write_and_fsync function 2023-09-01 00:19:47 +02:00
Arpad Müller
930eccfcaa Make write_blob and the things it calls async fn 2023-09-01 00:19:47 +02:00
Arpad Müller
29c2381fa5 Move generics on trait into macro
Monomorphization is basically like macro expansion, it just happens at a
later compiler phase.
2023-09-01 00:19:47 +02:00
Arpad Müller
7839cda66a Remove bounds 2023-09-01 00:19:47 +02:00
Arpad Müller
d565df25d6 Make write_all_at async 2023-09-01 00:19:47 +02:00
Christian Schwarz
92b7d7f466 FileBlockReader::fill_buffer make it obvious that we need to switch to async API 2023-09-01 00:19:47 +02:00
Arpad Müller
cfabd8b598 Remove Read impl that was only used in one place 2023-09-01 00:19:47 +02:00
Arpad Müller
4432094443 Move used FileExt functions to inherent impls 2023-09-01 00:19:47 +02:00
Christian Schwarz
83babcce30 FileBlockReader<File> is never used 2023-09-01 00:19:47 +02:00
Arpad Müller
735e20112a Move VirtualFile::seek to inherent function 2023-09-01 00:19:47 +02:00
12 changed files with 302 additions and 215 deletions

View File

@@ -469,7 +469,9 @@ impl PageServerHandler {
// Create empty timeline // Create empty timeline
info!("creating new timeline"); info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?; let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn. // TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute // We might have some wal to import as well, and we should prevent compute

View File

@@ -448,6 +448,7 @@ impl Tenant {
up_to_date_metadata, up_to_date_metadata,
first_save, first_save,
) )
.await
.context("save_metadata")?; .context("save_metadata")?;
} }
@@ -1450,7 +1451,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline. /// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.) /// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline( pub async fn create_empty_timeline(
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
@@ -1462,10 +1463,10 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant" "Cannot create empty timelines on inactive tenant"
); );
let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = {
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; let timelines = self.timelines.lock().unwrap();
drop(timelines); self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
};
let new_metadata = TimelineMetadata::new( let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to // Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation() // make it valid, before calling finish_creation()
@@ -1484,6 +1485,7 @@ impl Tenant {
initdb_lsn, initdb_lsn,
None, None,
) )
.await
} }
/// Helper for unit tests to create an empty timeline. /// Helper for unit tests to create an empty timeline.
@@ -1499,7 +1501,9 @@ impl Tenant {
pg_version: u32, pg_version: u32,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?; let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
let tline = uninit_tl.raw_timeline().expect("we just created it"); let tline = uninit_tl.raw_timeline().expect("we just created it");
assert_eq!(tline.get_last_record_lsn(), Lsn(0)); assert_eq!(tline.get_last_record_lsn(), Lsn(0));
@@ -2797,13 +2801,15 @@ impl Tenant {
src_timeline.pg_version, src_timeline.pg_version,
); );
let uninitialized_timeline = self.prepare_new_timeline( let uninitialized_timeline = self
dst_id, .prepare_new_timeline(
&metadata, dst_id,
timeline_uninit_mark, &metadata,
start_lsn + 1, timeline_uninit_mark,
Some(Arc::clone(src_timeline)), start_lsn + 1,
)?; Some(Arc::clone(src_timeline)),
)
.await?;
let new_timeline = uninitialized_timeline.finish_creation()?; let new_timeline = uninitialized_timeline.finish_creation()?;
@@ -2881,13 +2887,15 @@ impl Tenant {
pgdata_lsn, pgdata_lsn,
pg_version, pg_version,
); );
let raw_timeline = self.prepare_new_timeline( let raw_timeline = self
timeline_id, .prepare_new_timeline(
&new_metadata, timeline_id,
timeline_uninit_mark, &new_metadata,
pgdata_lsn, timeline_uninit_mark,
None, pgdata_lsn,
)?; None,
)
.await?;
let tenant_id = raw_timeline.owning_tenant.tenant_id; let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?; let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2958,7 +2966,7 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call /// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the /// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file. /// uninit mark file.
fn prepare_new_timeline( async fn prepare_new_timeline(
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata, new_metadata: &TimelineMetadata,
@@ -2986,8 +2994,9 @@ impl Tenant {
timeline_struct.init_empty_layer_map(start_lsn); timeline_struct.init_empty_layer_map(start_lsn);
if let Err(e) = if let Err(e) = self
self.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata) .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
.await
{ {
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}"); error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark); cleanup_timeline_directory(uninit_mark);
@@ -3003,7 +3012,7 @@ impl Tenant {
)) ))
} }
fn create_timeline_files( async fn create_timeline_files(
&self, &self,
timeline_path: &Path, timeline_path: &Path,
new_timeline_id: &TimelineId, new_timeline_id: &TimelineId,
@@ -3022,6 +3031,7 @@ impl Tenant {
new_metadata, new_metadata,
true, true,
) )
.await
.context("Failed to create timeline metadata")?; .context("Failed to create timeline metadata")?;
Ok(()) Ok(())
} }
@@ -3649,7 +3659,10 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?; .await?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) { match tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"), Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!( Err(e) => assert_eq!(
e.to_string(), e.to_string(),
@@ -4489,8 +4502,9 @@ mod tests {
.await; .await;
let initdb_lsn = Lsn(0x20); let initdb_lsn = Lsn(0x20);
let utline = let utline = tenant
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?; .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = utline.raw_timeline().unwrap(); let tline = utline.raw_timeline().unwrap();
// Spawn flush loop now so that we can set the `expect_initdb_optimization` // Spawn flush loop now so that we can set the `expect_initdb_optimization`
@@ -4555,8 +4569,9 @@ mod tests {
let harness = TenantHarness::create(name)?; let harness = TenantHarness::create(name)?;
{ {
let (tenant, ctx) = harness.load().await; let (tenant, ctx) = harness.load().await;
let tline = let tline = tenant
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
// Keeps uninit mark in place // Keeps uninit mark in place
std::mem::forget(tline); std::mem::forget(tline);
} }

View File

@@ -96,18 +96,12 @@ pub trait BlobWriter {
/// An implementation of BlobWriter to write blobs to anything that /// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write. /// implements std::io::Write.
/// ///
pub struct WriteBlobWriter<W> pub struct WriteBlobWriter<W> {
where
W: std::io::Write,
{
inner: W, inner: W,
offset: u64, offset: u64,
} }
impl<W> WriteBlobWriter<W> impl<W> WriteBlobWriter<W> {
where
W: std::io::Write,
{
pub fn new(inner: W, start_offset: u64) -> Self { pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter { WriteBlobWriter {
inner, inner,
@@ -129,33 +123,38 @@ where
} }
} }
impl<W> BlobWriter for WriteBlobWriter<W> macro_rules! write_blob_impl {
where (WriteBlobWriter<$ty:ty>) => {
W: std::io::Write, impl WriteBlobWriter<$ty> {
{ pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> { use std::io::Write;
let offset = self.offset; let offset = self.offset;
if srcbuf.len() < 128 { if srcbuf.len() < 128 {
// Short blob. Write a 1-byte length header // Short blob. Write a 1-byte length header
let len_buf = srcbuf.len() as u8; let len_buf = srcbuf.len() as u8;
self.inner.write_all(&[len_buf])?; self.inner.write_all(&[len_buf])?;
self.offset += 1; self.offset += 1;
} else { } else {
// Write a 4-byte length header // Write a 4-byte length header
if srcbuf.len() > 0x7fff_ffff { if srcbuf.len() > 0x7fff_ffff {
return Err(Error::new( return Err(Error::new(
ErrorKind::Other, ErrorKind::Other,
format!("blob too large ({} bytes)", srcbuf.len()), format!("blob too large ({} bytes)", srcbuf.len()),
)); ));
}
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.inner.write_all(&len_buf)?;
self.offset += 4;
}
self.inner.write_all(srcbuf)?;
self.offset += srcbuf.len() as u64;
Ok(offset)
} }
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
len_buf[0] |= 0x80;
self.inner.write_all(&len_buf)?;
self.offset += 4;
} }
self.inner.write_all(srcbuf)?; };
self.offset += srcbuf.len() as u64;
Ok(offset)
}
} }
write_blob_impl!(WriteBlobWriter<crate::tenant::io::BufWriter<crate::virtual_file::VirtualFile>>);
write_blob_impl!(WriteBlobWriter<crate::virtual_file::VirtualFile>);

View File

@@ -7,9 +7,7 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use bytes::Bytes; use bytes::Bytes;
use std::fs::File;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
/// This is implemented by anything that can read 8 kB (PAGE_SZ) /// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache /// blocks, using the page cache
@@ -74,7 +72,6 @@ impl<'a> Deref for BlockLease<'a> {
/// Unlike traits, we also support the read function to be async though. /// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> { pub(crate) enum BlockReaderRef<'a> {
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>), FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
EphemeralFile(&'a EphemeralFile), EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>), Adapter(Adapter<&'a DeltaLayerInner>),
#[cfg(test)] #[cfg(test)]
@@ -87,7 +84,6 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*; use BlockReaderRef::*;
match self { match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum).await, FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await, EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await, Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)] #[cfg(test)]
@@ -150,18 +146,15 @@ pub struct FileBlockReader<F> {
file_id: page_cache::FileId, file_id: page_cache::FileId,
} }
impl<F> FileBlockReader<F> impl FileBlockReader<VirtualFile> {
where pub fn new(file: VirtualFile) -> Self {
F: FileExt,
{
pub fn new(file: F) -> Self {
let file_id = page_cache::next_file_id(); let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file } FileBlockReader { file_id, file }
} }
/// Read a page from the underlying file into given buffer. /// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ); assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
} }
@@ -185,7 +178,7 @@ where
ReadBufResult::Found(guard) => break Ok(guard.into()), ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => { ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer // Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?; self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid(); write_guard.mark_valid();
// Swap for read lock // Swap for read lock
@@ -196,12 +189,6 @@ where
} }
} }
impl BlockReader for FileBlockReader<File> {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
}
}
impl BlockReader for FileBlockReader<VirtualFile> { impl BlockReader for FileBlockReader<VirtualFile> {
fn block_cursor(&self) -> BlockCursor<'_> { fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self)) BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))

View File

@@ -9,7 +9,6 @@ use std::cmp::min;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{self, ErrorKind}; use std::io::{self, ErrorKind};
use std::ops::DerefMut; use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use tracing::*; use tracing::*;
@@ -128,10 +127,15 @@ impl EphemeralFile {
self.off += n; self.off += n;
src_remaining = &src_remaining[n..]; src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ { if self.off == PAGE_SZ {
match self.ephemeral_file.file.write_all_at( match self
&self.ephemeral_file.mutable_tail, .ephemeral_file
self.blknum as u64 * PAGE_SZ as u64, .file
) { .write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
Ok(_) => { Ok(_) => {
// Pre-warm the page cache with what we just wrote. // Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it. // This isn't necessary for coherency/correctness, but it's how we've always done it.

View File

@@ -26,7 +26,7 @@
//! recovered from this file. This is tracked in //! recovered from this file. This is tracked in
//! <https://github.com/neondatabase/neon/issues/4418> //! <https://github.com/neondatabase/neon/issues/4418>
use std::io::{self, Read, Write}; use std::io::{self, Write};
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use anyhow::Result; use anyhow::Result;
@@ -151,11 +151,12 @@ impl Manifest {
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted, /// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and /// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
/// backup the current one. /// backup the current one.
pub fn load( pub async fn load(
mut file: VirtualFile, file: VirtualFile,
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> { ) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
let mut buf = vec![]; let mut buf = vec![];
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?; file.read_exact_at(&mut buf, 0)
.map_err(ManifestLoadError::Io)?;
// Read manifest header // Read manifest header
let mut buf = Bytes::from(buf); let mut buf = Bytes::from(buf);
@@ -241,8 +242,8 @@ mod tests {
use super::*; use super::*;
#[test] #[tokio::test]
fn test_read_manifest() { async fn test_read_manifest() {
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest"); let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
std::fs::create_dir_all(&testdir).unwrap(); std::fs::create_dir_all(&testdir).unwrap();
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap(); let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
@@ -274,7 +275,7 @@ mod tests {
.truncate(false), .truncate(false),
) )
.unwrap(); .unwrap();
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap(); let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0); assert!(!corrupted.0);
assert_eq!(operations.len(), 2); assert_eq!(operations.len(), 2);
assert_eq!( assert_eq!(
@@ -306,7 +307,7 @@ mod tests {
.truncate(false), .truncate(false),
) )
.unwrap(); .unwrap();
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap(); let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
assert!(!corrupted.0); assert!(!corrupted.0);
assert_eq!(operations.len(), 3); assert_eq!(operations.len(), 3);
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0))); assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));

View File

@@ -9,9 +9,9 @@
//! [`remote_timeline_client`]: super::remote_timeline_client //! [`remote_timeline_client`]: super::remote_timeline_client
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::{self, Write}; use std::io;
use anyhow::{bail, ensure, Context}; use anyhow::{ensure, Context};
use serde::{de::Error, Deserialize, Serialize, Serializer}; use serde::{de::Error, Deserialize, Serialize, Serializer};
use thiserror::Error; use thiserror::Error;
use tracing::info_span; use tracing::info_span;
@@ -255,7 +255,7 @@ impl Serialize for TimelineMetadata {
} }
/// Save timeline metadata to file /// Save timeline metadata to file
pub fn save_metadata( pub async fn save_metadata(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_id: &TenantId, tenant_id: &TenantId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
@@ -273,10 +273,7 @@ pub fn save_metadata(
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?; let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
if file.write(&metadata_bytes)? != metadata_bytes.len() { file.write_and_fsync(&metadata_bytes)?;
bail!("Could not write all the metadata bytes in a single call");
}
file.sync_all()?;
// fsync the parent directory to ensure the directory entry is durable // fsync the parent directory to ensure the directory entry is durable
if first_save { if first_save {

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ; use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE}; use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{ use crate::tenant::storage_layer::{
@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range; use std::ops::Range;
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -632,11 +632,12 @@ impl DeltaLayerWriterInner {
/// ///
/// The values must be appended in key, lsn order. /// The values must be appended in key, lsn order.
/// ///
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init()) self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
.await
} }
fn put_value_bytes( async fn put_value_bytes(
&mut self, &mut self,
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
@@ -645,7 +646,7 @@ impl DeltaLayerWriterInner {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
assert!(self.lsn_range.start <= lsn); assert!(self.lsn_range.start <= lsn);
let off = self.blob_writer.write_blob(val)?; let off = self.blob_writer.write_blob(val).await?;
let blob_ref = BlobRef::new(off, will_init); let blob_ref = BlobRef::new(off, will_init);
@@ -797,11 +798,11 @@ impl DeltaLayerWriter {
/// ///
/// The values must be appended in key, lsn order. /// The values must be appended in key, lsn order.
/// ///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> { pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_value(key, lsn, val) self.inner.as_mut().unwrap().put_value(key, lsn, val).await
} }
pub fn put_value_bytes( pub async fn put_value_bytes(
&mut self, &mut self,
key: Key, key: Key,
lsn: Lsn, lsn: Lsn,
@@ -812,6 +813,7 @@ impl DeltaLayerWriter {
.as_mut() .as_mut()
.unwrap() .unwrap()
.put_value_bytes(key, lsn, val, will_init) .put_value_bytes(key, lsn, val, will_init)
.await
} }
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {

View File

@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ; use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, KEY_SIZE}; use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::blob_io::WriteBlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{ use crate::tenant::storage_layer::{
@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng}; use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::SeekFrom;
use std::io::Write; use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range; use std::ops::Range;
use std::os::unix::prelude::FileExt; use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -569,9 +569,9 @@ impl ImageLayerWriterInner {
/// ///
/// The page versions must be appended in blknum order. /// The page versions must be appended in blknum order.
/// ///
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key)); ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?; let off = self.blob_writer.write_blob(img).await?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf); key.write_to_byte_slice(&mut keybuf);
@@ -710,8 +710,8 @@ impl ImageLayerWriter {
/// ///
/// The page versions must be appended in blknum order. /// The page versions must be appended in blknum order.
/// ///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> { pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img) self.inner.as_mut().unwrap().put_image(key, img).await
} }
/// ///

View File

@@ -348,7 +348,9 @@ impl InMemoryLayer {
for (lsn, pos) in vec_map.as_slice() { for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf).await?; cursor.read_blob_into_buf(*pos, &mut buf).await?;
let will_init = Value::des(&buf)?.will_init(); let will_init = Value::des(&buf)?.will_init();
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?; delta_layer_writer
.put_value_bytes(key, *lsn, &buf, will_init)
.await?;
} }
} }

View File

@@ -2735,6 +2735,7 @@ impl Timeline {
if disk_consistent_lsn != old_disk_consistent_lsn { if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn); assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload) self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
.await
.context("update_metadata_file")?; .context("update_metadata_file")?;
// Also update the in-memory copy // Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn); self.disk_consistent_lsn.store(disk_consistent_lsn);
@@ -2743,7 +2744,7 @@ impl Timeline {
} }
/// Update metadata file /// Update metadata file
fn update_metadata_file( async fn update_metadata_file(
&self, &self,
disk_consistent_lsn: Lsn, disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>, layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
@@ -2791,6 +2792,7 @@ impl Timeline {
&metadata, &metadata,
false, false,
) )
.await
.context("save_metadata")?; .context("save_metadata")?;
if let Some(remote_client) = &self.remote_client { if let Some(remote_client) = &self.remote_client {
@@ -3030,7 +3032,7 @@ impl Timeline {
} }
} }
}; };
image_layer_writer.put_image(key, &img)?; image_layer_writer.put_image(key, &img).await?;
key = key.next(); key = key.next();
} }
} }
@@ -3616,7 +3618,7 @@ impl Timeline {
))) )))
}); });
writer.as_mut().unwrap().put_value(key, lsn, value)?; writer.as_mut().unwrap().put_value(key, lsn, value).await?;
prev_key = Some(key); prev_key = Some(key);
} }
if let Some(writer) = writer { if let Some(writer) = writer {
@@ -4122,7 +4124,8 @@ impl Timeline {
if !layers_to_remove.is_empty() { if !layers_to_remove.is_empty() {
// Persist the new GC cutoff value in the metadata file, before // Persist the new GC cutoff value in the metadata file, before
// we actually remove anything. // we actually remove anything.
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?; self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
.await?;
// Actually delete the layers from disk and remove them from the map. // Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection // (couldn't do this in the loop above, because you cannot modify a collection

View File

@@ -13,7 +13,7 @@
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write}; use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::os::unix::fs::FileExt; use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -321,54 +321,8 @@ impl VirtualFile {
drop(self); drop(self);
std::fs::remove_file(path).expect("failed to remove the virtual file"); std::fs::remove_file(path).expect("failed to remove the virtual file");
} }
}
impl Drop for VirtualFile { pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Read for VirtualFile {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.read_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl Seek for VirtualFile {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos { match pos {
SeekFrom::Start(offset) => { SeekFrom::Start(offset) => {
self.pos = offset; self.pos = offset;
@@ -392,10 +346,64 @@ impl Seek for VirtualFile {
} }
Ok(self.pos) Ok(self.pos)
} }
}
impl FileExt for VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> { pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.write_at(buf, offset) {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
/// Write the given buffer (which has to be below the kernel's internal page size) and fsync
///
/// This ensures some level of atomicity (not a good one, but it's the best we have).
pub fn write_and_fsync(&mut self, buf: &[u8]) -> Result<(), Error> {
if self.write(buf)? != buf.len() {
return Err(Error::new(
std::io::ErrorKind::Other,
"Could not write all the bytes in a single call",
));
}
self.sync_all()?;
Ok(())
}
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("read", |file| file.read_at(buf, offset))?; let result = self.with_file("read", |file| file.read_at(buf, offset))?;
if let Ok(size) = result { if let Ok(size) = result {
STORAGE_IO_SIZE STORAGE_IO_SIZE
@@ -405,7 +413,7 @@ impl FileExt for VirtualFile {
result result
} }
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> { pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self.with_file("write", |file| file.write_at(buf, offset))?; let result = self.with_file("write", |file| file.write_at(buf, offset))?;
if let Ok(size) = result { if let Ok(size) = result {
STORAGE_IO_SIZE STORAGE_IO_SIZE
@@ -416,6 +424,41 @@ impl FileExt for VirtualFile {
} }
} }
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME
.with_label_values(&["close"])
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}
impl Write for VirtualFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos)?;
self.pos += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
// flush is no-op for File (at least on unix), so we don't need to do
// anything here either.
Ok(())
}
}
impl OpenFiles { impl OpenFiles {
fn new(num_slots: usize) -> OpenFiles { fn new(num_slots: usize) -> OpenFiles {
let mut slots = Box::new(Vec::with_capacity(num_slots)); let mut slots = Box::new(Vec::with_capacity(num_slots));
@@ -472,30 +515,60 @@ mod tests {
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
// Helper function to slurp contents of a file, starting at the current position, enum MaybeVirtualFile {
// into a string VirtualFile(VirtualFile),
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error> File(File),
where
FD: Read,
{
let mut buf = String::new();
vfile.read_to_string(&mut buf)?;
Ok(buf)
} }
// Helper function to slurp a portion of a file into a string impl MaybeVirtualFile {
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error> fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
where match self {
FD: FileExt, MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset),
{ MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
let mut buf = Vec::new(); }
buf.resize(len, 0); }
vfile.read_exact_at(&mut buf, pos)?; async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
Ok(String::from_utf8(buf).unwrap()) match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos),
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf),
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_to_string(&mut buf)?,
MaybeVirtualFile::File(file) => file.read_to_string(&mut buf)?,
}
Ok(buf)
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let mut buf = Vec::new();
buf.resize(len, 0);
self.read_exact_at(&mut buf, pos)?;
Ok(String::from_utf8(buf).unwrap())
}
} }
#[test] #[tokio::test]
fn test_virtual_files() -> Result<(), Error> { async fn test_virtual_files() -> Result<(), Error> {
// The real work is done in the test_files() helper function. This // The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and // allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them, // VirtualFile. We trust the native Files and wouldn't need to test them,
@@ -504,21 +577,23 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit // native files, you will run out of file descriptors if the ulimit
// is low enough.) // is low enough.)
test_files("virtual_files", |path, open_options| { test_files("virtual_files", |path, open_options| {
VirtualFile::open_with_options(path, open_options) let vf = VirtualFile::open_with_options(path, open_options)?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}) })
.await
} }
#[test] #[tokio::test]
fn test_physical_files() -> Result<(), Error> { async fn test_physical_files() -> Result<(), Error> {
test_files("physical_files", |path, open_options| { test_files("physical_files", |path, open_options| {
open_options.open(path) Ok(MaybeVirtualFile::File(open_options.open(path)?))
}) })
.await
} }
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error> async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
where where
FD: Read + Write + Seek + FileExt, OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
{ {
let testdir = crate::config::PageServerConf::test_repo_dir(testname); let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?; std::fs::create_dir_all(&testdir)?;
@@ -528,36 +603,36 @@ mod tests {
&path_a, &path_a,
OpenOptions::new().write(true).create(true).truncate(true), OpenOptions::new().write(true).create(true).truncate(true),
)?; )?;
file_a.write_all(b"foobar")?; file_a.write_all(b"foobar").await?;
// cannot read from a file opened in write-only mode // cannot read from a file opened in write-only mode
assert!(read_string(&mut file_a).is_err()); assert!(file_a.read_string().await.is_err());
// Close the file and re-open for reading // Close the file and re-open for reading
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?; let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
// cannot write to a file opened in read-only mode // cannot write to a file opened in read-only mode
assert!(file_a.write(b"bar").is_err()); assert!(file_a.write_all(b"bar").await.is_err());
// Try simple read // Try simple read
assert_eq!("foobar", read_string(&mut file_a)?); assert_eq!("foobar", file_a.read_string().await?);
// It's positioned at the EOF now. // It's positioned at the EOF now.
assert_eq!("", read_string(&mut file_a)?); assert_eq!("", file_a.read_string().await?);
// Test seeks. // Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4); assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
assert_eq!("ar", read_string(&mut file_a)?); assert_eq!("ar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1); assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3); assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
assert_eq!("bar", read_string(&mut file_a)?); assert_eq!("bar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1); assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
// Test erroneous seeks to before byte 0 // Test erroneous seeks to before byte 0
assert!(file_a.seek(SeekFrom::End(-7)).is_err()); assert!(file_a.seek(SeekFrom::End(-7)).is_err());
@@ -565,7 +640,7 @@ mod tests {
assert!(file_a.seek(SeekFrom::Current(-2)).is_err()); assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
// the erroneous seek should have left the position unchanged // the erroneous seek should have left the position unchanged
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
// Create another test file, and try FileExt functions on it. // Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b"); let path_b = testdir.join("file_b");
@@ -577,10 +652,10 @@ mod tests {
.create(true) .create(true)
.truncate(true), .truncate(true),
)?; )?;
file_b.write_all_at(b"BAR", 3)?; file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0)?; file_b.write_all_at(b"FOO", 0).await?;
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA"); assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise, // Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.) // open the same file many times. The effect is the same.)
@@ -591,7 +666,7 @@ mod tests {
let mut vfiles = Vec::new(); let mut vfiles = Vec::new();
for _ in 0..100 { for _ in 0..100 {
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?; let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
assert_eq!("FOOBAR", read_string(&mut vfile)?); assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile); vfiles.push(vfile);
} }
@@ -600,13 +675,13 @@ mod tests {
// The underlying file descriptor for 'file_a' should be closed now. Try to read // The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above. // from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", read_string(&mut file_a)?); assert_eq!("oobar", file_a.read_string().await?);
// Check that all the other FDs still work too. Use them in random order for // Check that all the other FDs still work too. Use them in random order for
// good measure. // good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng()); vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() { for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?); assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
} }
Ok(()) Ok(())