mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
feat(pageserver): collect aux file tombstones (#7900)
close https://github.com/neondatabase/neon/issues/7800 This is a small change to enable the tombstone -> exclude from image layer path. Most of the pull request is unit tests. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -6469,4 +6469,208 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_vectored_impl_wrapper(
|
||||
tline: &Arc<Timeline>,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, GetVectoredError> {
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let mut res = tline
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(key..key.next()),
|
||||
lsn,
|
||||
&mut reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
Ok(res.pop_last().map(|(k, v)| {
|
||||
assert_eq!(k, key);
|
||||
v.unwrap()
|
||||
}))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_tombstone_reads() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_metadata_tombstone_reads")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
|
||||
let key2 = Key::from_hex("620000000033333333444444445500000002").unwrap();
|
||||
let key3 = Key::from_hex("620000000033333333444444445500000003").unwrap();
|
||||
|
||||
// We emulate the situation that the compaction algorithm creates an image layer that removes the tombstones
|
||||
// Lsn 0x30 key0, key3, no key1+key2
|
||||
// Lsn 0x20 key1+key2 tomestones
|
||||
// Lsn 0x10 key1 in image, key2 in delta
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
// delta layers
|
||||
vec![
|
||||
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
|
||||
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
|
||||
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
|
||||
],
|
||||
// image layers
|
||||
vec![
|
||||
(Lsn(0x10), vec![(key1, test_img("metadata key 1"))]),
|
||||
(
|
||||
Lsn(0x30),
|
||||
vec![
|
||||
(key0, test_img("metadata key 0")),
|
||||
(key3, test_img("metadata key 3")),
|
||||
],
|
||||
),
|
||||
],
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let lsn = Lsn(0x30);
|
||||
let old_lsn = Lsn(0x20);
|
||||
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, key0, lsn, &ctx).await?,
|
||||
Some(test_img("metadata key 0"))
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, key1, lsn, &ctx).await?,
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, key2, lsn, &ctx).await?,
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, key1, old_lsn, &ctx).await?,
|
||||
Some(Bytes::new()),
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, key2, old_lsn, &ctx).await?,
|
||||
Some(Bytes::new()),
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, key3, lsn, &ctx).await?,
|
||||
Some(test_img("metadata key 3"))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_tombstone_image_creation() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
|
||||
let key2 = Key::from_hex("620000000033333333444444445500000002").unwrap();
|
||||
let key3 = Key::from_hex("620000000033333333444444445500000003").unwrap();
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
// delta layers
|
||||
vec![
|
||||
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
|
||||
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
|
||||
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
|
||||
vec![
|
||||
(key0, Lsn(0x30), Value::Image(test_img("metadata key 0"))),
|
||||
(key3, Lsn(0x30), Value::Image(test_img("metadata key 3"))),
|
||||
],
|
||||
],
|
||||
// image layers
|
||||
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x30), &ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|(k, _)| k.is_metadata_key())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(images.len(), 2); // the image layer should only contain two existing keys, tombstones should be removed.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_metadata_tombstone_empty_image_creation() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
|
||||
let key2 = Key::from_hex("620000000033333333444444445500000002").unwrap();
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
// delta layers
|
||||
vec![
|
||||
vec![(key2, Lsn(0x10), Value::Image(test_img("metadata key 2")))],
|
||||
vec![(key1, Lsn(0x20), Value::Image(Bytes::new()))],
|
||||
vec![(key2, Lsn(0x20), Value::Image(Bytes::new()))],
|
||||
],
|
||||
// image layers
|
||||
vec![(Lsn(0x10), vec![(key1, test_img("metadata key 1"))])],
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x30), &ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|(k, _)| k.is_metadata_key())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(images.len(), 0); // the image layer should not contain tombstones, or it is not created
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4312,6 +4312,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
img_range: Range<Key>,
|
||||
mode: ImageLayerCreationMode,
|
||||
start: Key,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
assert!(!matches!(mode, ImageLayerCreationMode::Initial));
|
||||
|
||||
@@ -4320,39 +4321,43 @@ impl Timeline {
|
||||
let data = self
|
||||
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
let (data, total_kb_retrieved, total_key_retrieved) = {
|
||||
let (data, total_kb_retrieved, total_keys_retrieved) = {
|
||||
let mut new_data = BTreeMap::new();
|
||||
let mut total_kb_retrieved = 0;
|
||||
let mut total_key_retrieved = 0;
|
||||
let mut total_keys_retrieved = 0;
|
||||
for (k, v) in data {
|
||||
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
total_kb_retrieved += KEY_SIZE + v.len();
|
||||
total_key_retrieved += 1;
|
||||
total_keys_retrieved += 1;
|
||||
new_data.insert(k, v);
|
||||
}
|
||||
(new_data, total_kb_retrieved / 1024, total_key_retrieved)
|
||||
(new_data, total_kb_retrieved / 1024, total_keys_retrieved)
|
||||
};
|
||||
let delta_file_accessed = reconstruct_state.get_delta_layers_visited();
|
||||
let delta_files_accessed = reconstruct_state.get_delta_layers_visited();
|
||||
|
||||
let trigger_generation = delta_file_accessed as usize >= MAX_AUX_FILE_V2_DELTAS;
|
||||
let trigger_generation = delta_files_accessed as usize >= MAX_AUX_FILE_V2_DELTAS;
|
||||
debug!(
|
||||
"generate image layers for metadata keys: trigger_generation={trigger_generation}, \
|
||||
delta_file_accessed={delta_file_accessed}, total_kb_retrieved={total_kb_retrieved}, \
|
||||
total_key_retrieved={total_key_retrieved}"
|
||||
trigger_generation,
|
||||
delta_files_accessed,
|
||||
total_kb_retrieved,
|
||||
total_keys_retrieved,
|
||||
"generate metadata images"
|
||||
);
|
||||
|
||||
if !trigger_generation && mode == ImageLayerCreationMode::Try {
|
||||
return Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
next_start_key: img_range.end,
|
||||
});
|
||||
}
|
||||
let has_keys = !data.is_empty();
|
||||
let mut wrote_any_image = false;
|
||||
for (k, v) in data {
|
||||
// Even if the value is empty (deleted), we do not delete it for now until we can ensure vectored get
|
||||
// considers this situation properly.
|
||||
// if v.is_empty() {
|
||||
// continue;
|
||||
// }
|
||||
if v.is_empty() {
|
||||
// the key has been deleted, it does not need an image
|
||||
// in metadata keyspace, an empty image == tombstone
|
||||
continue;
|
||||
}
|
||||
wrote_any_image = true;
|
||||
|
||||
// No need to handle sharding b/c metadata keys are always on the 0-th shard.
|
||||
|
||||
@@ -4360,16 +4365,26 @@ impl Timeline {
|
||||
// on the normal data path either.
|
||||
image_layer_writer.put_image(k, v, ctx).await?;
|
||||
}
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: if has_keys {
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
Some(image_layer)
|
||||
} else {
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
None
|
||||
},
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
|
||||
if wrote_any_image {
|
||||
// Normal path: we have written some data into the new image layer for this
|
||||
// partition, so flush it to disk.
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
next_start_key: img_range.end,
|
||||
})
|
||||
} else {
|
||||
// Special case: the image layer may be empty if this is a sharded tenant and the
|
||||
// partition does not cover any keys owned by this shard. In this case, to ensure
|
||||
// we don't leave gaps between image layers, leave `start` where it is, so that the next
|
||||
// layer we write will cover the key range that we just scanned.
|
||||
tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: None,
|
||||
next_start_key: start,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
|
||||
@@ -4479,6 +4494,7 @@ impl Timeline {
|
||||
ctx,
|
||||
img_range,
|
||||
mode,
|
||||
start,
|
||||
)
|
||||
.await?;
|
||||
start = next_start_key;
|
||||
@@ -5448,11 +5464,12 @@ impl Timeline {
|
||||
let min_key = *deltas.first().map(|(k, _, _)| k).unwrap();
|
||||
let max_key = deltas.last().map(|(k, _, _)| k).unwrap().next();
|
||||
let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
|
||||
let max_lsn = Lsn(deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap().0 + 1);
|
||||
let max_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
|
||||
assert!(
|
||||
max_lsn <= last_record_lsn,
|
||||
"advance last record lsn before inserting a layer, max_lsn={max_lsn}, last_record_lsn={last_record_lsn}"
|
||||
);
|
||||
let end_lsn = Lsn(max_lsn.0 + 1);
|
||||
if let Some(check_start_lsn) = check_start_lsn {
|
||||
assert!(min_lsn >= check_start_lsn);
|
||||
}
|
||||
@@ -5461,7 +5478,7 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
min_key,
|
||||
min_lsn..max_lsn,
|
||||
min_lsn..end_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -5477,6 +5494,36 @@ impl Timeline {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return all keys at the LSN in the image layers
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn inspect_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map().iter_historic_layers() {
|
||||
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
let mut reconstruct_data = ValuesReconstructState::default();
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
&mut reconstruct_data,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (k, v) in reconstruct_data.keys {
|
||||
all_data.push((k, v?.img.unwrap().1));
|
||||
}
|
||||
}
|
||||
}
|
||||
all_data.sort();
|
||||
Ok(all_data)
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
|
||||
|
||||
Reference in New Issue
Block a user