mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-06 18:00:37 +00:00
Compare commits
13 Commits
remove_ini
...
arpad/virt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9e5da9613 | ||
|
|
a5acfdaa5a | ||
|
|
70052ae1ca | ||
|
|
fe69dd9a40 | ||
|
|
930eccfcaa | ||
|
|
29c2381fa5 | ||
|
|
7839cda66a | ||
|
|
d565df25d6 | ||
|
|
92b7d7f466 | ||
|
|
cfabd8b598 | ||
|
|
4432094443 | ||
|
|
83babcce30 | ||
|
|
735e20112a |
@@ -469,7 +469,9 @@ impl PageServerHandler {
|
|||||||
// Create empty timeline
|
// Create empty timeline
|
||||||
info!("creating new timeline");
|
info!("creating new timeline");
|
||||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
let timeline = tenant
|
||||||
|
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||||
// We might have some wal to import as well, and we should prevent compute
|
// We might have some wal to import as well, and we should prevent compute
|
||||||
|
|||||||
@@ -448,6 +448,7 @@ impl Tenant {
|
|||||||
up_to_date_metadata,
|
up_to_date_metadata,
|
||||||
first_save,
|
first_save,
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
.context("save_metadata")?;
|
.context("save_metadata")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1450,7 +1451,7 @@ impl Tenant {
|
|||||||
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
||||||
/// minimum amount of keys required to get a writable timeline.
|
/// minimum amount of keys required to get a writable timeline.
|
||||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||||
pub fn create_empty_timeline(
|
pub async fn create_empty_timeline(
|
||||||
&self,
|
&self,
|
||||||
new_timeline_id: TimelineId,
|
new_timeline_id: TimelineId,
|
||||||
initdb_lsn: Lsn,
|
initdb_lsn: Lsn,
|
||||||
@@ -1462,10 +1463,10 @@ impl Tenant {
|
|||||||
"Cannot create empty timelines on inactive tenant"
|
"Cannot create empty timelines on inactive tenant"
|
||||||
);
|
);
|
||||||
|
|
||||||
let timelines = self.timelines.lock().unwrap();
|
let timeline_uninit_mark = {
|
||||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
|
let timelines = self.timelines.lock().unwrap();
|
||||||
drop(timelines);
|
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
|
||||||
|
};
|
||||||
let new_metadata = TimelineMetadata::new(
|
let new_metadata = TimelineMetadata::new(
|
||||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||||
// make it valid, before calling finish_creation()
|
// make it valid, before calling finish_creation()
|
||||||
@@ -1484,6 +1485,7 @@ impl Tenant {
|
|||||||
initdb_lsn,
|
initdb_lsn,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper for unit tests to create an empty timeline.
|
/// Helper for unit tests to create an empty timeline.
|
||||||
@@ -1499,7 +1501,9 @@ impl Tenant {
|
|||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<Arc<Timeline>> {
|
) -> anyhow::Result<Arc<Timeline>> {
|
||||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
let uninit_tl = self
|
||||||
|
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||||
|
.await?;
|
||||||
let tline = uninit_tl.raw_timeline().expect("we just created it");
|
let tline = uninit_tl.raw_timeline().expect("we just created it");
|
||||||
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
|
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
|
||||||
|
|
||||||
@@ -2797,13 +2801,15 @@ impl Tenant {
|
|||||||
src_timeline.pg_version,
|
src_timeline.pg_version,
|
||||||
);
|
);
|
||||||
|
|
||||||
let uninitialized_timeline = self.prepare_new_timeline(
|
let uninitialized_timeline = self
|
||||||
dst_id,
|
.prepare_new_timeline(
|
||||||
&metadata,
|
dst_id,
|
||||||
timeline_uninit_mark,
|
&metadata,
|
||||||
start_lsn + 1,
|
timeline_uninit_mark,
|
||||||
Some(Arc::clone(src_timeline)),
|
start_lsn + 1,
|
||||||
)?;
|
Some(Arc::clone(src_timeline)),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let new_timeline = uninitialized_timeline.finish_creation()?;
|
let new_timeline = uninitialized_timeline.finish_creation()?;
|
||||||
|
|
||||||
@@ -2881,13 +2887,15 @@ impl Tenant {
|
|||||||
pgdata_lsn,
|
pgdata_lsn,
|
||||||
pg_version,
|
pg_version,
|
||||||
);
|
);
|
||||||
let raw_timeline = self.prepare_new_timeline(
|
let raw_timeline = self
|
||||||
timeline_id,
|
.prepare_new_timeline(
|
||||||
&new_metadata,
|
timeline_id,
|
||||||
timeline_uninit_mark,
|
&new_metadata,
|
||||||
pgdata_lsn,
|
timeline_uninit_mark,
|
||||||
None,
|
pgdata_lsn,
|
||||||
)?;
|
None,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let tenant_id = raw_timeline.owning_tenant.tenant_id;
|
let tenant_id = raw_timeline.owning_tenant.tenant_id;
|
||||||
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
||||||
@@ -2958,7 +2966,7 @@ impl Tenant {
|
|||||||
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
|
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
|
||||||
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
|
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
|
||||||
/// uninit mark file.
|
/// uninit mark file.
|
||||||
fn prepare_new_timeline(
|
async fn prepare_new_timeline(
|
||||||
&self,
|
&self,
|
||||||
new_timeline_id: TimelineId,
|
new_timeline_id: TimelineId,
|
||||||
new_metadata: &TimelineMetadata,
|
new_metadata: &TimelineMetadata,
|
||||||
@@ -2986,8 +2994,9 @@ impl Tenant {
|
|||||||
|
|
||||||
timeline_struct.init_empty_layer_map(start_lsn);
|
timeline_struct.init_empty_layer_map(start_lsn);
|
||||||
|
|
||||||
if let Err(e) =
|
if let Err(e) = self
|
||||||
self.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
|
.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
|
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
|
||||||
cleanup_timeline_directory(uninit_mark);
|
cleanup_timeline_directory(uninit_mark);
|
||||||
@@ -3003,7 +3012,7 @@ impl Tenant {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_timeline_files(
|
async fn create_timeline_files(
|
||||||
&self,
|
&self,
|
||||||
timeline_path: &Path,
|
timeline_path: &Path,
|
||||||
new_timeline_id: &TimelineId,
|
new_timeline_id: &TimelineId,
|
||||||
@@ -3022,6 +3031,7 @@ impl Tenant {
|
|||||||
new_metadata,
|
new_metadata,
|
||||||
true,
|
true,
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
.context("Failed to create timeline metadata")?;
|
.context("Failed to create timeline metadata")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -3649,7 +3659,10 @@ mod tests {
|
|||||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
|
match tenant
|
||||||
|
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||||
Err(e) => assert_eq!(
|
Err(e) => assert_eq!(
|
||||||
e.to_string(),
|
e.to_string(),
|
||||||
@@ -4489,8 +4502,9 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let initdb_lsn = Lsn(0x20);
|
let initdb_lsn = Lsn(0x20);
|
||||||
let utline =
|
let utline = tenant
|
||||||
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?;
|
.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
|
||||||
|
.await?;
|
||||||
let tline = utline.raw_timeline().unwrap();
|
let tline = utline.raw_timeline().unwrap();
|
||||||
|
|
||||||
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
|
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
|
||||||
@@ -4555,8 +4569,9 @@ mod tests {
|
|||||||
let harness = TenantHarness::create(name)?;
|
let harness = TenantHarness::create(name)?;
|
||||||
{
|
{
|
||||||
let (tenant, ctx) = harness.load().await;
|
let (tenant, ctx) = harness.load().await;
|
||||||
let tline =
|
let tline = tenant
|
||||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||||
|
.await?;
|
||||||
// Keeps uninit mark in place
|
// Keeps uninit mark in place
|
||||||
std::mem::forget(tline);
|
std::mem::forget(tline);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,18 +96,12 @@ pub trait BlobWriter {
|
|||||||
/// An implementation of BlobWriter to write blobs to anything that
|
/// An implementation of BlobWriter to write blobs to anything that
|
||||||
/// implements std::io::Write.
|
/// implements std::io::Write.
|
||||||
///
|
///
|
||||||
pub struct WriteBlobWriter<W>
|
pub struct WriteBlobWriter<W> {
|
||||||
where
|
|
||||||
W: std::io::Write,
|
|
||||||
{
|
|
||||||
inner: W,
|
inner: W,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W> WriteBlobWriter<W>
|
impl<W> WriteBlobWriter<W> {
|
||||||
where
|
|
||||||
W: std::io::Write,
|
|
||||||
{
|
|
||||||
pub fn new(inner: W, start_offset: u64) -> Self {
|
pub fn new(inner: W, start_offset: u64) -> Self {
|
||||||
WriteBlobWriter {
|
WriteBlobWriter {
|
||||||
inner,
|
inner,
|
||||||
@@ -129,33 +123,38 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<W> BlobWriter for WriteBlobWriter<W>
|
macro_rules! write_blob_impl {
|
||||||
where
|
(WriteBlobWriter<$ty:ty>) => {
|
||||||
W: std::io::Write,
|
impl WriteBlobWriter<$ty> {
|
||||||
{
|
pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
|
||||||
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
|
use std::io::Write;
|
||||||
let offset = self.offset;
|
let offset = self.offset;
|
||||||
|
|
||||||
if srcbuf.len() < 128 {
|
if srcbuf.len() < 128 {
|
||||||
// Short blob. Write a 1-byte length header
|
// Short blob. Write a 1-byte length header
|
||||||
let len_buf = srcbuf.len() as u8;
|
let len_buf = srcbuf.len() as u8;
|
||||||
self.inner.write_all(&[len_buf])?;
|
self.inner.write_all(&[len_buf])?;
|
||||||
self.offset += 1;
|
self.offset += 1;
|
||||||
} else {
|
} else {
|
||||||
// Write a 4-byte length header
|
// Write a 4-byte length header
|
||||||
if srcbuf.len() > 0x7fff_ffff {
|
if srcbuf.len() > 0x7fff_ffff {
|
||||||
return Err(Error::new(
|
return Err(Error::new(
|
||||||
ErrorKind::Other,
|
ErrorKind::Other,
|
||||||
format!("blob too large ({} bytes)", srcbuf.len()),
|
format!("blob too large ({} bytes)", srcbuf.len()),
|
||||||
));
|
));
|
||||||
|
}
|
||||||
|
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
|
||||||
|
len_buf[0] |= 0x80;
|
||||||
|
self.inner.write_all(&len_buf)?;
|
||||||
|
self.offset += 4;
|
||||||
|
}
|
||||||
|
self.inner.write_all(srcbuf)?;
|
||||||
|
self.offset += srcbuf.len() as u64;
|
||||||
|
Ok(offset)
|
||||||
}
|
}
|
||||||
let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
|
|
||||||
len_buf[0] |= 0x80;
|
|
||||||
self.inner.write_all(&len_buf)?;
|
|
||||||
self.offset += 4;
|
|
||||||
}
|
}
|
||||||
self.inner.write_all(srcbuf)?;
|
};
|
||||||
self.offset += srcbuf.len() as u64;
|
|
||||||
Ok(offset)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
write_blob_impl!(WriteBlobWriter<crate::tenant::io::BufWriter<crate::virtual_file::VirtualFile>>);
|
||||||
|
write_blob_impl!(WriteBlobWriter<crate::virtual_file::VirtualFile>);
|
||||||
|
|||||||
@@ -7,9 +7,7 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
|||||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||||
use crate::virtual_file::VirtualFile;
|
use crate::virtual_file::VirtualFile;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use std::fs::File;
|
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::os::unix::fs::FileExt;
|
|
||||||
|
|
||||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||||
/// blocks, using the page cache
|
/// blocks, using the page cache
|
||||||
@@ -74,7 +72,6 @@ impl<'a> Deref for BlockLease<'a> {
|
|||||||
/// Unlike traits, we also support the read function to be async though.
|
/// Unlike traits, we also support the read function to be async though.
|
||||||
pub(crate) enum BlockReaderRef<'a> {
|
pub(crate) enum BlockReaderRef<'a> {
|
||||||
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
|
FileBlockReaderVirtual(&'a FileBlockReader<VirtualFile>),
|
||||||
FileBlockReaderFile(&'a FileBlockReader<std::fs::File>),
|
|
||||||
EphemeralFile(&'a EphemeralFile),
|
EphemeralFile(&'a EphemeralFile),
|
||||||
Adapter(Adapter<&'a DeltaLayerInner>),
|
Adapter(Adapter<&'a DeltaLayerInner>),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -87,7 +84,6 @@ impl<'a> BlockReaderRef<'a> {
|
|||||||
use BlockReaderRef::*;
|
use BlockReaderRef::*;
|
||||||
match self {
|
match self {
|
||||||
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
|
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
|
||||||
FileBlockReaderFile(r) => r.read_blk(blknum).await,
|
|
||||||
EphemeralFile(r) => r.read_blk(blknum).await,
|
EphemeralFile(r) => r.read_blk(blknum).await,
|
||||||
Adapter(r) => r.read_blk(blknum).await,
|
Adapter(r) => r.read_blk(blknum).await,
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -150,18 +146,15 @@ pub struct FileBlockReader<F> {
|
|||||||
file_id: page_cache::FileId,
|
file_id: page_cache::FileId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> FileBlockReader<F>
|
impl FileBlockReader<VirtualFile> {
|
||||||
where
|
pub fn new(file: VirtualFile) -> Self {
|
||||||
F: FileExt,
|
|
||||||
{
|
|
||||||
pub fn new(file: F) -> Self {
|
|
||||||
let file_id = page_cache::next_file_id();
|
let file_id = page_cache::next_file_id();
|
||||||
|
|
||||||
FileBlockReader { file_id, file }
|
FileBlockReader { file_id, file }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read a page from the underlying file into given buffer.
|
/// Read a page from the underlying file into given buffer.
|
||||||
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||||
assert!(buf.len() == PAGE_SZ);
|
assert!(buf.len() == PAGE_SZ);
|
||||||
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||||
}
|
}
|
||||||
@@ -185,7 +178,7 @@ where
|
|||||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||||
ReadBufResult::NotFound(mut write_guard) => {
|
ReadBufResult::NotFound(mut write_guard) => {
|
||||||
// Read the page from disk into the buffer
|
// Read the page from disk into the buffer
|
||||||
self.fill_buffer(write_guard.deref_mut(), blknum)?;
|
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||||
write_guard.mark_valid();
|
write_guard.mark_valid();
|
||||||
|
|
||||||
// Swap for read lock
|
// Swap for read lock
|
||||||
@@ -196,12 +189,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockReader for FileBlockReader<File> {
|
|
||||||
fn block_cursor(&self) -> BlockCursor<'_> {
|
|
||||||
BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockReader for FileBlockReader<VirtualFile> {
|
impl BlockReader for FileBlockReader<VirtualFile> {
|
||||||
fn block_cursor(&self) -> BlockCursor<'_> {
|
fn block_cursor(&self) -> BlockCursor<'_> {
|
||||||
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
|
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ use std::cmp::min;
|
|||||||
use std::fs::OpenOptions;
|
use std::fs::OpenOptions;
|
||||||
use std::io::{self, ErrorKind};
|
use std::io::{self, ErrorKind};
|
||||||
use std::ops::DerefMut;
|
use std::ops::DerefMut;
|
||||||
use std::os::unix::prelude::FileExt;
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
@@ -128,10 +127,15 @@ impl EphemeralFile {
|
|||||||
self.off += n;
|
self.off += n;
|
||||||
src_remaining = &src_remaining[n..];
|
src_remaining = &src_remaining[n..];
|
||||||
if self.off == PAGE_SZ {
|
if self.off == PAGE_SZ {
|
||||||
match self.ephemeral_file.file.write_all_at(
|
match self
|
||||||
&self.ephemeral_file.mutable_tail,
|
.ephemeral_file
|
||||||
self.blknum as u64 * PAGE_SZ as u64,
|
.file
|
||||||
) {
|
.write_all_at(
|
||||||
|
&self.ephemeral_file.mutable_tail,
|
||||||
|
self.blknum as u64 * PAGE_SZ as u64,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
// Pre-warm the page cache with what we just wrote.
|
// Pre-warm the page cache with what we just wrote.
|
||||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||||
|
|||||||
@@ -26,7 +26,7 @@
|
|||||||
//! recovered from this file. This is tracked in
|
//! recovered from this file. This is tracked in
|
||||||
//! <https://github.com/neondatabase/neon/issues/4418>
|
//! <https://github.com/neondatabase/neon/issues/4418>
|
||||||
|
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Write};
|
||||||
|
|
||||||
use crate::virtual_file::VirtualFile;
|
use crate::virtual_file::VirtualFile;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
@@ -151,11 +151,12 @@ impl Manifest {
|
|||||||
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
|
/// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
|
||||||
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
|
/// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
|
||||||
/// backup the current one.
|
/// backup the current one.
|
||||||
pub fn load(
|
pub async fn load(
|
||||||
mut file: VirtualFile,
|
file: VirtualFile,
|
||||||
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
|
file.read_exact_at(&mut buf, 0)
|
||||||
|
.map_err(ManifestLoadError::Io)?;
|
||||||
|
|
||||||
// Read manifest header
|
// Read manifest header
|
||||||
let mut buf = Bytes::from(buf);
|
let mut buf = Bytes::from(buf);
|
||||||
@@ -241,8 +242,8 @@ mod tests {
|
|||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn test_read_manifest() {
|
async fn test_read_manifest() {
|
||||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
|
let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
|
||||||
std::fs::create_dir_all(&testdir).unwrap();
|
std::fs::create_dir_all(&testdir).unwrap();
|
||||||
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
|
let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
|
||||||
@@ -274,7 +275,7 @@ mod tests {
|
|||||||
.truncate(false),
|
.truncate(false),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
let (mut manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
|
||||||
assert!(!corrupted.0);
|
assert!(!corrupted.0);
|
||||||
assert_eq!(operations.len(), 2);
|
assert_eq!(operations.len(), 2);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -306,7 +307,7 @@ mod tests {
|
|||||||
.truncate(false),
|
.truncate(false),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
|
let (_manifest, operations, corrupted) = Manifest::load(file).await.unwrap();
|
||||||
assert!(!corrupted.0);
|
assert!(!corrupted.0);
|
||||||
assert_eq!(operations.len(), 3);
|
assert_eq!(operations.len(), 3);
|
||||||
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
|
assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
|
||||||
|
|||||||
@@ -9,9 +9,9 @@
|
|||||||
//! [`remote_timeline_client`]: super::remote_timeline_client
|
//! [`remote_timeline_client`]: super::remote_timeline_client
|
||||||
|
|
||||||
use std::fs::{File, OpenOptions};
|
use std::fs::{File, OpenOptions};
|
||||||
use std::io::{self, Write};
|
use std::io;
|
||||||
|
|
||||||
use anyhow::{bail, ensure, Context};
|
use anyhow::{ensure, Context};
|
||||||
use serde::{de::Error, Deserialize, Serialize, Serializer};
|
use serde::{de::Error, Deserialize, Serialize, Serializer};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::info_span;
|
use tracing::info_span;
|
||||||
@@ -255,7 +255,7 @@ impl Serialize for TimelineMetadata {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Save timeline metadata to file
|
/// Save timeline metadata to file
|
||||||
pub fn save_metadata(
|
pub async fn save_metadata(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenant_id: &TenantId,
|
tenant_id: &TenantId,
|
||||||
timeline_id: &TimelineId,
|
timeline_id: &TimelineId,
|
||||||
@@ -273,10 +273,7 @@ pub fn save_metadata(
|
|||||||
|
|
||||||
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
|
let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
|
||||||
|
|
||||||
if file.write(&metadata_bytes)? != metadata_bytes.len() {
|
file.write_and_fsync(&metadata_bytes)?;
|
||||||
bail!("Could not write all the metadata bytes in a single call");
|
|
||||||
}
|
|
||||||
file.sync_all()?;
|
|
||||||
|
|
||||||
// fsync the parent directory to ensure the directory entry is durable
|
// fsync the parent directory to ensure the directory entry is durable
|
||||||
if first_save {
|
if first_save {
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
|
|||||||
use crate::context::RequestContext;
|
use crate::context::RequestContext;
|
||||||
use crate::page_cache::PAGE_SZ;
|
use crate::page_cache::PAGE_SZ;
|
||||||
use crate::repository::{Key, Value, KEY_SIZE};
|
use crate::repository::{Key, Value, KEY_SIZE};
|
||||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
use crate::tenant::blob_io::WriteBlobWriter;
|
||||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
|
||||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||||
use crate::tenant::storage_layer::{
|
use crate::tenant::storage_layer::{
|
||||||
@@ -45,8 +45,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
|||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs::{self, File};
|
use std::fs::{self, File};
|
||||||
|
use std::io::SeekFrom;
|
||||||
use std::io::{BufWriter, Write};
|
use std::io::{BufWriter, Write};
|
||||||
use std::io::{Seek, SeekFrom};
|
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::os::unix::fs::FileExt;
|
use std::os::unix::fs::FileExt;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@@ -632,11 +632,12 @@ impl DeltaLayerWriterInner {
|
|||||||
///
|
///
|
||||||
/// The values must be appended in key, lsn order.
|
/// The values must be appended in key, lsn order.
|
||||||
///
|
///
|
||||||
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
||||||
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
|
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_value_bytes(
|
async fn put_value_bytes(
|
||||||
&mut self,
|
&mut self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
@@ -645,7 +646,7 @@ impl DeltaLayerWriterInner {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
assert!(self.lsn_range.start <= lsn);
|
assert!(self.lsn_range.start <= lsn);
|
||||||
|
|
||||||
let off = self.blob_writer.write_blob(val)?;
|
let off = self.blob_writer.write_blob(val).await?;
|
||||||
|
|
||||||
let blob_ref = BlobRef::new(off, will_init);
|
let blob_ref = BlobRef::new(off, will_init);
|
||||||
|
|
||||||
@@ -797,11 +798,11 @@ impl DeltaLayerWriter {
|
|||||||
///
|
///
|
||||||
/// The values must be appended in key, lsn order.
|
/// The values must be appended in key, lsn order.
|
||||||
///
|
///
|
||||||
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
||||||
self.inner.as_mut().unwrap().put_value(key, lsn, val)
|
self.inner.as_mut().unwrap().put_value(key, lsn, val).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_value_bytes(
|
pub async fn put_value_bytes(
|
||||||
&mut self,
|
&mut self,
|
||||||
key: Key,
|
key: Key,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
@@ -812,6 +813,7 @@ impl DeltaLayerWriter {
|
|||||||
.as_mut()
|
.as_mut()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.put_value_bytes(key, lsn, val, will_init)
|
.put_value_bytes(key, lsn, val, will_init)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn size(&self) -> u64 {
|
pub fn size(&self) -> u64 {
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ use crate::config::PageServerConf;
|
|||||||
use crate::context::RequestContext;
|
use crate::context::RequestContext;
|
||||||
use crate::page_cache::PAGE_SZ;
|
use crate::page_cache::PAGE_SZ;
|
||||||
use crate::repository::{Key, KEY_SIZE};
|
use crate::repository::{Key, KEY_SIZE};
|
||||||
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
|
use crate::tenant::blob_io::WriteBlobWriter;
|
||||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||||
use crate::tenant::storage_layer::{
|
use crate::tenant::storage_layer::{
|
||||||
@@ -42,8 +42,8 @@ use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
|
|||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::fs::{self, File};
|
use std::fs::{self, File};
|
||||||
|
use std::io::SeekFrom;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::io::{Seek, SeekFrom};
|
|
||||||
use std::ops::Range;
|
use std::ops::Range;
|
||||||
use std::os::unix::prelude::FileExt;
|
use std::os::unix::prelude::FileExt;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@@ -569,9 +569,9 @@ impl ImageLayerWriterInner {
|
|||||||
///
|
///
|
||||||
/// The page versions must be appended in blknum order.
|
/// The page versions must be appended in blknum order.
|
||||||
///
|
///
|
||||||
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
||||||
ensure!(self.key_range.contains(&key));
|
ensure!(self.key_range.contains(&key));
|
||||||
let off = self.blob_writer.write_blob(img)?;
|
let off = self.blob_writer.write_blob(img).await?;
|
||||||
|
|
||||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||||
key.write_to_byte_slice(&mut keybuf);
|
key.write_to_byte_slice(&mut keybuf);
|
||||||
@@ -710,8 +710,8 @@ impl ImageLayerWriter {
|
|||||||
///
|
///
|
||||||
/// The page versions must be appended in blknum order.
|
/// The page versions must be appended in blknum order.
|
||||||
///
|
///
|
||||||
pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
||||||
self.inner.as_mut().unwrap().put_image(key, img)
|
self.inner.as_mut().unwrap().put_image(key, img).await
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -348,7 +348,9 @@ impl InMemoryLayer {
|
|||||||
for (lsn, pos) in vec_map.as_slice() {
|
for (lsn, pos) in vec_map.as_slice() {
|
||||||
cursor.read_blob_into_buf(*pos, &mut buf).await?;
|
cursor.read_blob_into_buf(*pos, &mut buf).await?;
|
||||||
let will_init = Value::des(&buf)?.will_init();
|
let will_init = Value::des(&buf)?.will_init();
|
||||||
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
|
delta_layer_writer
|
||||||
|
.put_value_bytes(key, *lsn, &buf, will_init)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2735,6 +2735,7 @@ impl Timeline {
|
|||||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||||
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
|
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
|
||||||
|
.await
|
||||||
.context("update_metadata_file")?;
|
.context("update_metadata_file")?;
|
||||||
// Also update the in-memory copy
|
// Also update the in-memory copy
|
||||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||||
@@ -2743,7 +2744,7 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Update metadata file
|
/// Update metadata file
|
||||||
fn update_metadata_file(
|
async fn update_metadata_file(
|
||||||
&self,
|
&self,
|
||||||
disk_consistent_lsn: Lsn,
|
disk_consistent_lsn: Lsn,
|
||||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||||
@@ -2791,6 +2792,7 @@ impl Timeline {
|
|||||||
&metadata,
|
&metadata,
|
||||||
false,
|
false,
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
.context("save_metadata")?;
|
.context("save_metadata")?;
|
||||||
|
|
||||||
if let Some(remote_client) = &self.remote_client {
|
if let Some(remote_client) = &self.remote_client {
|
||||||
@@ -3030,7 +3032,7 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
image_layer_writer.put_image(key, &img)?;
|
image_layer_writer.put_image(key, &img).await?;
|
||||||
key = key.next();
|
key = key.next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -3616,7 +3618,7 @@ impl Timeline {
|
|||||||
)))
|
)))
|
||||||
});
|
});
|
||||||
|
|
||||||
writer.as_mut().unwrap().put_value(key, lsn, value)?;
|
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
|
||||||
prev_key = Some(key);
|
prev_key = Some(key);
|
||||||
}
|
}
|
||||||
if let Some(writer) = writer {
|
if let Some(writer) = writer {
|
||||||
@@ -4122,7 +4124,8 @@ impl Timeline {
|
|||||||
if !layers_to_remove.is_empty() {
|
if !layers_to_remove.is_empty() {
|
||||||
// Persist the new GC cutoff value in the metadata file, before
|
// Persist the new GC cutoff value in the metadata file, before
|
||||||
// we actually remove anything.
|
// we actually remove anything.
|
||||||
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?;
|
self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Actually delete the layers from disk and remove them from the map.
|
// Actually delete the layers from disk and remove them from the map.
|
||||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||||
|
|||||||
@@ -13,7 +13,7 @@
|
|||||||
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
|
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use std::fs::{self, File, OpenOptions};
|
use std::fs::{self, File, OpenOptions};
|
||||||
use std::io::{Error, ErrorKind, Read, Seek, SeekFrom, Write};
|
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
|
||||||
use std::os::unix::fs::FileExt;
|
use std::os::unix::fs::FileExt;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
@@ -321,54 +321,8 @@ impl VirtualFile {
|
|||||||
drop(self);
|
drop(self);
|
||||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for VirtualFile {
|
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||||
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
|
||||||
fn drop(&mut self) {
|
|
||||||
let handle = self.handle.get_mut().unwrap();
|
|
||||||
|
|
||||||
// We could check with a read-lock first, to avoid waiting on an
|
|
||||||
// unrelated I/O.
|
|
||||||
let slot = &get_open_files().slots[handle.index];
|
|
||||||
let mut slot_guard = slot.inner.write().unwrap();
|
|
||||||
if slot_guard.tag == handle.tag {
|
|
||||||
slot.recently_used.store(false, Ordering::Relaxed);
|
|
||||||
// there is also operation "close-by-replace" for closes done on eviction for
|
|
||||||
// comparison.
|
|
||||||
STORAGE_IO_TIME
|
|
||||||
.with_label_values(&["close"])
|
|
||||||
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Read for VirtualFile {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
|
|
||||||
let pos = self.pos;
|
|
||||||
let n = self.read_at(buf, pos)?;
|
|
||||||
self.pos += n as u64;
|
|
||||||
Ok(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Write for VirtualFile {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
|
||||||
let pos = self.pos;
|
|
||||||
let n = self.write_at(buf, pos)?;
|
|
||||||
self.pos += n as u64;
|
|
||||||
Ok(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> Result<(), std::io::Error> {
|
|
||||||
// flush is no-op for File (at least on unix), so we don't need to do
|
|
||||||
// anything here either.
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Seek for VirtualFile {
|
|
||||||
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
|
||||||
match pos {
|
match pos {
|
||||||
SeekFrom::Start(offset) => {
|
SeekFrom::Start(offset) => {
|
||||||
self.pos = offset;
|
self.pos = offset;
|
||||||
@@ -392,10 +346,64 @@ impl Seek for VirtualFile {
|
|||||||
}
|
}
|
||||||
Ok(self.pos)
|
Ok(self.pos)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl FileExt for VirtualFile {
|
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||||
fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||||
|
while !buf.is_empty() {
|
||||||
|
match self.read_at(buf, offset) {
|
||||||
|
Ok(0) => {
|
||||||
|
return Err(Error::new(
|
||||||
|
std::io::ErrorKind::UnexpectedEof,
|
||||||
|
"failed to fill whole buffer",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Ok(n) => {
|
||||||
|
buf = &mut buf[n..];
|
||||||
|
offset += n as u64;
|
||||||
|
}
|
||||||
|
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||||
|
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||||
|
while !buf.is_empty() {
|
||||||
|
match self.write_at(buf, offset) {
|
||||||
|
Ok(0) => {
|
||||||
|
return Err(Error::new(
|
||||||
|
std::io::ErrorKind::WriteZero,
|
||||||
|
"failed to write whole buffer",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(n) => {
|
||||||
|
buf = &buf[n..];
|
||||||
|
offset += n as u64;
|
||||||
|
}
|
||||||
|
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write the given buffer (which has to be below the kernel's internal page size) and fsync
|
||||||
|
///
|
||||||
|
/// This ensures some level of atomicity (not a good one, but it's the best we have).
|
||||||
|
pub fn write_and_fsync(&mut self, buf: &[u8]) -> Result<(), Error> {
|
||||||
|
if self.write(buf)? != buf.len() {
|
||||||
|
return Err(Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
"Could not write all the bytes in a single call",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
self.sync_all()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||||
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
|
let result = self.with_file("read", |file| file.read_at(buf, offset))?;
|
||||||
if let Ok(size) = result {
|
if let Ok(size) = result {
|
||||||
STORAGE_IO_SIZE
|
STORAGE_IO_SIZE
|
||||||
@@ -405,7 +413,7 @@ impl FileExt for VirtualFile {
|
|||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||||
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
|
let result = self.with_file("write", |file| file.write_at(buf, offset))?;
|
||||||
if let Ok(size) = result {
|
if let Ok(size) = result {
|
||||||
STORAGE_IO_SIZE
|
STORAGE_IO_SIZE
|
||||||
@@ -416,6 +424,41 @@ impl FileExt for VirtualFile {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for VirtualFile {
|
||||||
|
/// If a VirtualFile is dropped, close the underlying file if it was open.
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let handle = self.handle.get_mut().unwrap();
|
||||||
|
|
||||||
|
// We could check with a read-lock first, to avoid waiting on an
|
||||||
|
// unrelated I/O.
|
||||||
|
let slot = &get_open_files().slots[handle.index];
|
||||||
|
let mut slot_guard = slot.inner.write().unwrap();
|
||||||
|
if slot_guard.tag == handle.tag {
|
||||||
|
slot.recently_used.store(false, Ordering::Relaxed);
|
||||||
|
// there is also operation "close-by-replace" for closes done on eviction for
|
||||||
|
// comparison.
|
||||||
|
STORAGE_IO_TIME
|
||||||
|
.with_label_values(&["close"])
|
||||||
|
.observe_closure_duration(|| drop(slot_guard.file.take()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Write for VirtualFile {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||||
|
let pos = self.pos;
|
||||||
|
let n = self.write_at(buf, pos)?;
|
||||||
|
self.pos += n as u64;
|
||||||
|
Ok(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||||
|
// flush is no-op for File (at least on unix), so we don't need to do
|
||||||
|
// anything here either.
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl OpenFiles {
|
impl OpenFiles {
|
||||||
fn new(num_slots: usize) -> OpenFiles {
|
fn new(num_slots: usize) -> OpenFiles {
|
||||||
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
let mut slots = Box::new(Vec::with_capacity(num_slots));
|
||||||
@@ -472,30 +515,60 @@ mod tests {
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
// Helper function to slurp contents of a file, starting at the current position,
|
enum MaybeVirtualFile {
|
||||||
// into a string
|
VirtualFile(VirtualFile),
|
||||||
fn read_string<FD>(vfile: &mut FD) -> Result<String, Error>
|
File(File),
|
||||||
where
|
|
||||||
FD: Read,
|
|
||||||
{
|
|
||||||
let mut buf = String::new();
|
|
||||||
vfile.read_to_string(&mut buf)?;
|
|
||||||
Ok(buf)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function to slurp a portion of a file into a string
|
impl MaybeVirtualFile {
|
||||||
fn read_string_at<FD>(vfile: &mut FD, pos: u64, len: usize) -> Result<String, Error>
|
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||||
where
|
match self {
|
||||||
FD: FileExt,
|
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset),
|
||||||
{
|
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
|
||||||
let mut buf = Vec::new();
|
}
|
||||||
buf.resize(len, 0);
|
}
|
||||||
vfile.read_exact_at(&mut buf, pos)?;
|
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||||
Ok(String::from_utf8(buf).unwrap())
|
match self {
|
||||||
|
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
|
||||||
|
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||||
|
match self {
|
||||||
|
MaybeVirtualFile::VirtualFile(file) => file.seek(pos),
|
||||||
|
MaybeVirtualFile::File(file) => file.seek(pos),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
|
||||||
|
match self {
|
||||||
|
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf),
|
||||||
|
MaybeVirtualFile::File(file) => file.write_all(buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to slurp contents of a file, starting at the current position,
|
||||||
|
// into a string
|
||||||
|
async fn read_string(&mut self) -> Result<String, Error> {
|
||||||
|
use std::io::Read;
|
||||||
|
let mut buf = String::new();
|
||||||
|
match self {
|
||||||
|
MaybeVirtualFile::VirtualFile(file) => file.read_to_string(&mut buf)?,
|
||||||
|
MaybeVirtualFile::File(file) => file.read_to_string(&mut buf)?,
|
||||||
|
}
|
||||||
|
Ok(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to slurp a portion of a file into a string
|
||||||
|
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
buf.resize(len, 0);
|
||||||
|
self.read_exact_at(&mut buf, pos)?;
|
||||||
|
Ok(String::from_utf8(buf).unwrap())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn test_virtual_files() -> Result<(), Error> {
|
async fn test_virtual_files() -> Result<(), Error> {
|
||||||
// The real work is done in the test_files() helper function. This
|
// The real work is done in the test_files() helper function. This
|
||||||
// allows us to run the same set of tests against a native File, and
|
// allows us to run the same set of tests against a native File, and
|
||||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||||
@@ -504,21 +577,23 @@ mod tests {
|
|||||||
// native files, you will run out of file descriptors if the ulimit
|
// native files, you will run out of file descriptors if the ulimit
|
||||||
// is low enough.)
|
// is low enough.)
|
||||||
test_files("virtual_files", |path, open_options| {
|
test_files("virtual_files", |path, open_options| {
|
||||||
VirtualFile::open_with_options(path, open_options)
|
let vf = VirtualFile::open_with_options(path, open_options)?;
|
||||||
|
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||||
})
|
})
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn test_physical_files() -> Result<(), Error> {
|
async fn test_physical_files() -> Result<(), Error> {
|
||||||
test_files("physical_files", |path, open_options| {
|
test_files("physical_files", |path, open_options| {
|
||||||
open_options.open(path)
|
Ok(MaybeVirtualFile::File(open_options.open(path)?))
|
||||||
})
|
})
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_files<OF, FD>(testname: &str, openfunc: OF) -> Result<(), Error>
|
async fn test_files<OF>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
FD: Read + Write + Seek + FileExt,
|
OF: Fn(&Path, &OpenOptions) -> Result<MaybeVirtualFile, std::io::Error>,
|
||||||
OF: Fn(&Path, &OpenOptions) -> Result<FD, std::io::Error>,
|
|
||||||
{
|
{
|
||||||
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
|
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
|
||||||
std::fs::create_dir_all(&testdir)?;
|
std::fs::create_dir_all(&testdir)?;
|
||||||
@@ -528,36 +603,36 @@ mod tests {
|
|||||||
&path_a,
|
&path_a,
|
||||||
OpenOptions::new().write(true).create(true).truncate(true),
|
OpenOptions::new().write(true).create(true).truncate(true),
|
||||||
)?;
|
)?;
|
||||||
file_a.write_all(b"foobar")?;
|
file_a.write_all(b"foobar").await?;
|
||||||
|
|
||||||
// cannot read from a file opened in write-only mode
|
// cannot read from a file opened in write-only mode
|
||||||
assert!(read_string(&mut file_a).is_err());
|
assert!(file_a.read_string().await.is_err());
|
||||||
|
|
||||||
// Close the file and re-open for reading
|
// Close the file and re-open for reading
|
||||||
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
|
let mut file_a = openfunc(&path_a, OpenOptions::new().read(true))?;
|
||||||
|
|
||||||
// cannot write to a file opened in read-only mode
|
// cannot write to a file opened in read-only mode
|
||||||
assert!(file_a.write(b"bar").is_err());
|
assert!(file_a.write_all(b"bar").await.is_err());
|
||||||
|
|
||||||
// Try simple read
|
// Try simple read
|
||||||
assert_eq!("foobar", read_string(&mut file_a)?);
|
assert_eq!("foobar", file_a.read_string().await?);
|
||||||
|
|
||||||
// It's positioned at the EOF now.
|
// It's positioned at the EOF now.
|
||||||
assert_eq!("", read_string(&mut file_a)?);
|
assert_eq!("", file_a.read_string().await?);
|
||||||
|
|
||||||
// Test seeks.
|
// Test seeks.
|
||||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
assert_eq!("oobar", file_a.read_string().await?);
|
||||||
|
|
||||||
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
|
assert_eq!(file_a.seek(SeekFrom::End(-2))?, 4);
|
||||||
assert_eq!("ar", read_string(&mut file_a)?);
|
assert_eq!("ar", file_a.read_string().await?);
|
||||||
|
|
||||||
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
assert_eq!(file_a.seek(SeekFrom::Start(1))?, 1);
|
||||||
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
|
assert_eq!(file_a.seek(SeekFrom::Current(2))?, 3);
|
||||||
assert_eq!("bar", read_string(&mut file_a)?);
|
assert_eq!("bar", file_a.read_string().await?);
|
||||||
|
|
||||||
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
|
assert_eq!(file_a.seek(SeekFrom::Current(-5))?, 1);
|
||||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
assert_eq!("oobar", file_a.read_string().await?);
|
||||||
|
|
||||||
// Test erroneous seeks to before byte 0
|
// Test erroneous seeks to before byte 0
|
||||||
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
|
assert!(file_a.seek(SeekFrom::End(-7)).is_err());
|
||||||
@@ -565,7 +640,7 @@ mod tests {
|
|||||||
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
|
assert!(file_a.seek(SeekFrom::Current(-2)).is_err());
|
||||||
|
|
||||||
// the erroneous seek should have left the position unchanged
|
// the erroneous seek should have left the position unchanged
|
||||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
assert_eq!("oobar", file_a.read_string().await?);
|
||||||
|
|
||||||
// Create another test file, and try FileExt functions on it.
|
// Create another test file, and try FileExt functions on it.
|
||||||
let path_b = testdir.join("file_b");
|
let path_b = testdir.join("file_b");
|
||||||
@@ -577,10 +652,10 @@ mod tests {
|
|||||||
.create(true)
|
.create(true)
|
||||||
.truncate(true),
|
.truncate(true),
|
||||||
)?;
|
)?;
|
||||||
file_b.write_all_at(b"BAR", 3)?;
|
file_b.write_all_at(b"BAR", 3).await?;
|
||||||
file_b.write_all_at(b"FOO", 0)?;
|
file_b.write_all_at(b"FOO", 0).await?;
|
||||||
|
|
||||||
assert_eq!(read_string_at(&mut file_b, 2, 3)?, "OBA");
|
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
|
||||||
|
|
||||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||||
// open the same file many times. The effect is the same.)
|
// open the same file many times. The effect is the same.)
|
||||||
@@ -591,7 +666,7 @@ mod tests {
|
|||||||
let mut vfiles = Vec::new();
|
let mut vfiles = Vec::new();
|
||||||
for _ in 0..100 {
|
for _ in 0..100 {
|
||||||
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
|
let mut vfile = openfunc(&path_b, OpenOptions::new().read(true))?;
|
||||||
assert_eq!("FOOBAR", read_string(&mut vfile)?);
|
assert_eq!("FOOBAR", vfile.read_string().await?);
|
||||||
vfiles.push(vfile);
|
vfiles.push(vfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -600,13 +675,13 @@ mod tests {
|
|||||||
|
|
||||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||||
// from it again. We left the file positioned at offset 1 above.
|
// from it again. We left the file positioned at offset 1 above.
|
||||||
assert_eq!("oobar", read_string(&mut file_a)?);
|
assert_eq!("oobar", file_a.read_string().await?);
|
||||||
|
|
||||||
// Check that all the other FDs still work too. Use them in random order for
|
// Check that all the other FDs still work too. Use them in random order for
|
||||||
// good measure.
|
// good measure.
|
||||||
vfiles.as_mut_slice().shuffle(&mut thread_rng());
|
vfiles.as_mut_slice().shuffle(&mut thread_rng());
|
||||||
for vfile in vfiles.iter_mut() {
|
for vfile in vfiles.iter_mut() {
|
||||||
assert_eq!("OOBAR", read_string_at(vfile, 1, 5)?);
|
assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user