mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
* Wrappers and drop implementations for image and delta layer writers. * Two regression tests for the image and delta layer files.
This commit is contained in:
committed by
GitHub
parent
6dbf202e0d
commit
78e412b84b
@@ -610,9 +610,9 @@ impl DeltaLayer {
|
||||
///
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
pub struct DeltaLayerWriter {
|
||||
struct DeltaLayerWriterInner {
|
||||
conf: &'static PageServerConf,
|
||||
path: PathBuf,
|
||||
pub path: PathBuf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -624,17 +624,17 @@ pub struct DeltaLayerWriter {
|
||||
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriter {
|
||||
impl DeltaLayerWriterInner {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
pub fn new(
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
) -> Result<DeltaLayerWriter> {
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
// the end key yet, so we cannot form the final filename yet. We will
|
||||
// rename it when we're done.
|
||||
@@ -653,7 +653,7 @@ impl DeltaLayerWriter {
|
||||
let block_buf = BlockBuf::new();
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
Ok(DeltaLayerWriter {
|
||||
Ok(Self {
|
||||
conf,
|
||||
path,
|
||||
timeline_id,
|
||||
@@ -670,17 +670,17 @@ impl DeltaLayerWriter {
|
||||
///
|
||||
/// The values must be appended in key, lsn order.
|
||||
///
|
||||
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
||||
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
|
||||
}
|
||||
|
||||
pub fn put_value_bytes(
|
||||
fn put_value_bytes(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &[u8],
|
||||
will_init: bool,
|
||||
) -> Result<()> {
|
||||
) -> anyhow::Result<()> {
|
||||
assert!(self.lsn_range.start <= lsn);
|
||||
|
||||
let off = self.blob_writer.write_blob(val)?;
|
||||
@@ -693,14 +693,14 @@ impl DeltaLayerWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
fn size(&self) -> u64 {
|
||||
self.blob_writer.size() + self.tree.borrow_writer().size()
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the delta layer.
|
||||
///
|
||||
pub fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -768,6 +768,102 @@ impl DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new delta layer.
|
||||
///
|
||||
/// Usage:
|
||||
///
|
||||
/// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
|
||||
///
|
||||
/// 2. Write the contents by calling `put_value` for every page
|
||||
/// version to store in the layer.
|
||||
///
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// As described in https://github.com/neondatabase/neon/issues/2650, it's
|
||||
/// possible for the writer to drop before `finish` is actually called. So this
|
||||
/// could lead to odd temporary files in the directory, exhausting file system.
|
||||
/// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
|
||||
/// implementation that cleans up the temporary file in failure. It's not
|
||||
/// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
|
||||
/// out some fields, making it impossible to implement `Drop`.
|
||||
///
|
||||
#[must_use]
|
||||
pub struct DeltaLayerWriter {
|
||||
inner: Option<DeltaLayerWriterInner>,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriter {
|
||||
///
|
||||
/// Start building a new delta layer.
|
||||
///
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_start: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Some(DeltaLayerWriterInner::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
key_start,
|
||||
lsn_range,
|
||||
)?),
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Append a key-value pair to the file.
|
||||
///
|
||||
/// The values must be appended in key, lsn order.
|
||||
///
|
||||
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
|
||||
self.inner.as_mut().unwrap().put_value(key, lsn, val)
|
||||
}
|
||||
|
||||
pub fn put_value_bytes(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &[u8],
|
||||
will_init: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_value_bytes(key, lsn, val, will_init)
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.inner.as_ref().unwrap().size()
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the delta layer.
|
||||
///
|
||||
pub fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
|
||||
self.inner.take().unwrap().finish(key_end)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
match inner.blob_writer.into_inner().into_inner() {
|
||||
Ok(vfile) => vfile.remove(),
|
||||
Err(err) => warn!(
|
||||
"error while flushing buffer of image layer temporary file: {}",
|
||||
err
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Iterator over all key-value pairse stored in a delta layer
|
||||
///
|
||||
|
||||
@@ -411,7 +411,7 @@ impl ImageLayer {
|
||||
///
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
pub struct ImageLayerWriter {
|
||||
struct ImageLayerWriterInner {
|
||||
conf: &'static PageServerConf,
|
||||
path: PathBuf,
|
||||
timeline_id: TimelineId,
|
||||
@@ -423,14 +423,17 @@ pub struct ImageLayerWriter {
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
}
|
||||
|
||||
impl ImageLayerWriter {
|
||||
pub fn new(
|
||||
impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
// We'll atomically rename it to the final name when we're done.
|
||||
let path = ImageLayer::temp_path_for(
|
||||
@@ -455,7 +458,7 @@ impl ImageLayerWriter {
|
||||
let block_buf = BlockBuf::new();
|
||||
let tree_builder = DiskBtreeBuilder::new(block_buf);
|
||||
|
||||
let writer = ImageLayerWriter {
|
||||
let writer = Self {
|
||||
conf,
|
||||
path,
|
||||
timeline_id,
|
||||
@@ -474,7 +477,7 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// The page versions must be appended in blknum order.
|
||||
///
|
||||
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
|
||||
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let off = self.blob_writer.write_blob(img)?;
|
||||
|
||||
@@ -485,7 +488,10 @@ impl ImageLayerWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(self) -> anyhow::Result<ImageLayer> {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
fn finish(self) -> anyhow::Result<ImageLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -552,3 +558,76 @@ impl ImageLayerWriter {
|
||||
Ok(layer)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
///
|
||||
/// Usage:
|
||||
///
|
||||
/// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
|
||||
///
|
||||
/// 2. Write the contents by calling `put_page_image` for every key-value
|
||||
/// pair in the key range.
|
||||
///
|
||||
/// 3. Call `finish`.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// As described in https://github.com/neondatabase/neon/issues/2650, it's
|
||||
/// possible for the writer to drop before `finish` is actually called. So this
|
||||
/// could lead to odd temporary files in the directory, exhausting file system.
|
||||
/// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
|
||||
/// implementation that cleans up the temporary file in failure. It's not
|
||||
/// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
|
||||
/// out some fields, making it impossible to implement `Drop`.
|
||||
///
|
||||
#[must_use]
|
||||
pub struct ImageLayerWriter {
|
||||
inner: Option<ImageLayerWriterInner>,
|
||||
}
|
||||
|
||||
impl ImageLayerWriter {
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
Ok(Self {
|
||||
inner: Some(ImageLayerWriterInner::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
key_range,
|
||||
lsn,
|
||||
)?),
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Write next value to the file.
|
||||
///
|
||||
/// The page versions must be appended in blknum order.
|
||||
///
|
||||
pub fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img)
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1541,6 +1541,10 @@ impl Timeline {
|
||||
lsn,
|
||||
)?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
anyhow::bail!("failpoint image-layer-writer-fail-before-finish");
|
||||
});
|
||||
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
@@ -1835,6 +1839,11 @@ impl Timeline {
|
||||
},
|
||||
)?);
|
||||
}
|
||||
|
||||
fail_point!("delta-layer-writer-fail-before-finish", |_| {
|
||||
anyhow::bail!("failpoint delta-layer-writer-fail-before-finish");
|
||||
});
|
||||
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value)?;
|
||||
prev_key = Some(key);
|
||||
}
|
||||
|
||||
@@ -319,6 +319,12 @@ impl VirtualFile {
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VirtualFile {
|
||||
|
||||
@@ -1628,6 +1628,8 @@ class NeonPageserver(PgProtocol):
|
||||
Initializes the repository via `neon init`.
|
||||
"""
|
||||
|
||||
TEMP_FILE_SUFFIX = "___temp"
|
||||
|
||||
def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional[str] = None):
|
||||
super().__init__(host="localhost", port=port.pg, user="cloud_admin")
|
||||
self.env = env
|
||||
|
||||
92
test_runner/regress/test_layer_writers_fail.py
Normal file
92
test_runner/regress/test_layer_writers_fail.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonPageserver
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2703")
|
||||
def test_image_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# small checkpoint distance to create more delta layer files
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
# set the target size to be large to allow the image layer to cover the whole key space
|
||||
"compaction_target_size": f"{1024 ** 3}",
|
||||
# tweak the default settings to allow quickly create image layers and L1 layers
|
||||
"compaction_period": "1 s",
|
||||
"compaction_threshold": "2",
|
||||
"image_creation_threshold": "1",
|
||||
}
|
||||
)
|
||||
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
pg.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g""",
|
||||
]
|
||||
)
|
||||
|
||||
pageserver_http.configure_failpoints(("image-layer-writer-fail-before-finish", "return"))
|
||||
with pytest.raises(Exception, match="image-layer-writer-fail-before-finish"):
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
new_temp_layer_files = list(
|
||||
filter(
|
||||
lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX),
|
||||
[path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()],
|
||||
)
|
||||
)
|
||||
|
||||
assert (
|
||||
len(new_temp_layer_files) == 0
|
||||
), "pageserver should clean its temporary new image layer files on failure"
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2703")
|
||||
def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# small checkpoint distance to create more delta layer files
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
# set the target size to be large to allow the image layer to cover the whole key space
|
||||
"compaction_target_size": f"{1024 ** 3}",
|
||||
# tweak the default settings to allow quickly create image layers and L1 layers
|
||||
"compaction_period": "1 s",
|
||||
"compaction_threshold": "2",
|
||||
"image_creation_threshold": "1",
|
||||
}
|
||||
)
|
||||
|
||||
pg = env.postgres.create_start("main", tenant_id=tenant_id)
|
||||
pg.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
|
||||
"""INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g""",
|
||||
]
|
||||
)
|
||||
|
||||
pageserver_http.configure_failpoints(("delta-layer-writer-fail-before-finish", "return"))
|
||||
# Note: we cannot test whether the exception is exactly 'delta-layer-writer-fail-before-finish'
|
||||
# since our code does it in loop, we cannot get this exact error for our request.
|
||||
with pytest.raises(Exception):
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
new_temp_layer_files = list(
|
||||
filter(
|
||||
lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX),
|
||||
[path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()],
|
||||
)
|
||||
)
|
||||
|
||||
assert (
|
||||
len(new_temp_layer_files) == 0
|
||||
), "pageserver should clean its temporary new delta layer files on failure"
|
||||
Reference in New Issue
Block a user